On Friday 13 May 2011 09:43:25 Andriy Gapon wrote:
> This is a change in vein of what I've been doing in the xcpu branch and
> it's supposed to fix the issue by the recent commit that (probably
> unintentionally) stress-tests smp_rendezvous in TSC code.
> 
> Non-essential changes:
> - ditch initial, and in my opinion useless, pre-setup rendezvous in
> smp_rendezvous_action()
> - shift smp_rv_waiters indexes by one because of the above
> 
> Essential changes (the fix):
> - re-use freed smp_rv_waiters[2] to indicate that a slave/target is really
> fully done with rendezvous (i.e. it's not going to access any members of
> smp_rv_* pseudo-structure)
> - spin on smp_rv_waiters[2] upon _entry_ to smp_rendezvous_cpus() to not
> re-use the smp_rv_* pseudo-structure too early
> 
> Index: sys/kern/subr_smp.c
> ===================================================================
> --- sys/kern/subr_smp.c       (revision 221835)
> +++ sys/kern/subr_smp.c       (working copy)
> @@ -316,19 +316,14 @@
>       void (*local_action_func)(void*)   = smp_rv_action_func;
>       void (*local_teardown_func)(void*) = smp_rv_teardown_func;
> 

You really need a read/load with a memory fence here in order to make sure you 
get up to date values on all CPUs.

> -     /* Ensure we have up-to-date values. */
> -     atomic_add_acq_int(&smp_rv_waiters[0], 1);
> -     while (smp_rv_waiters[0] < smp_rv_ncpus)
> -             cpu_spinwait();
> -
>       /* setup function */
>       if (local_setup_func != smp_no_rendevous_barrier) {
>               if (smp_rv_setup_func != NULL)
>                       smp_rv_setup_func(smp_rv_func_arg);
> 
>               /* spin on entry rendezvous */
> -             atomic_add_int(&smp_rv_waiters[1], 1);
> -             while (smp_rv_waiters[1] < smp_rv_ncpus)
> +             atomic_add_int(&smp_rv_waiters[0], 1);
> +             while (smp_rv_waiters[0] < smp_rv_ncpus)
>                       cpu_spinwait();
>       }
> 
> @@ -337,12 +332,16 @@
>               local_action_func(local_func_arg);
> 
>       /* spin on exit rendezvous */
> -     atomic_add_int(&smp_rv_waiters[2], 1);
> -     if (local_teardown_func == smp_no_rendevous_barrier)
> +     atomic_add_int(&smp_rv_waiters[1], 1);
> +     if (local_teardown_func == smp_no_rendevous_barrier) {
> +             atomic_add_int(&smp_rv_waiters[2], 1);
>                  return;
> -     while (smp_rv_waiters[2] < smp_rv_ncpus)
> +     }
> +     while (smp_rv_waiters[1] < smp_rv_ncpus)
>               cpu_spinwait();
> 
> +     atomic_add_int(&smp_rv_waiters[2], 1);
> +
>       /* teardown function */
>       if (local_teardown_func != NULL)
>               local_teardown_func(local_func_arg);
> @@ -377,6 +376,10 @@
>       /* obtain rendezvous lock */
>       mtx_lock_spin(&smp_ipi_mtx);
> 
> +     /* Wait for any previous unwaited rendezvous to finish. */
> +     while (atomic_load_acq_int(&smp_rv_waiters[2]) < ncpus)
> +             cpu_spinwait();
> +

This is not the ncpus you are looking for.  I have a patch of my own, that 
might be overdoing it ... but it attached nonetheless ...

The rmlock is a separate issue, but what lead me to discover the problem with 
the rendevouz.

>       /* set static function pointers */
>       smp_rv_ncpus = ncpus;
>       smp_rv_setup_func = setup_func;
> @@ -395,7 +398,7 @@
>               smp_rendezvous_action();
> 
>       if (teardown_func == smp_no_rendevous_barrier)
> -             while (atomic_load_acq_int(&smp_rv_waiters[2]) < ncpus)
> +             while (atomic_load_acq_int(&smp_rv_waiters[1]) < ncpus)
>                       cpu_spinwait();
> 
>       /* release lock */
commit 48f16f01c28cc3006faefa024b0d3e4bcf02fb60
Author: ndire <ndire@b72e2a10-2d34-0410-9a71-d3beadf02b57>
Date:   Tue Dec 14 18:49:55 2010 +0000

    Reintegrate BR_CHOPUVILLE.
    
    
    git-svn-id: https://svn.west.isilon.com/repo/onefs/head@169088 b72e2a10-2d34-0410-9a71-d3beadf02b57

diff --git a/src/sys/kern/kern_rmlock.c b/src/sys/kern/kern_rmlock.c
index f91e61a..a30c16b 100644
--- a/src/sys/kern/kern_rmlock.c
+++ b/src/sys/kern/kern_rmlock.c
@@ -154,6 +154,7 @@ rm_tracker_remove(struct pcpu *pc, struct rm_priotracker *tracker)
 static void
 rm_cleanIPI(void *arg)
 {
+	TAILQ_HEAD(,rm_priotracker) tmp_list = TAILQ_HEAD_INITIALIZER(tmp_list);
 	struct pcpu *pc;
 	struct rmlock *rm = arg;
 	struct rm_priotracker *tracker;
@@ -165,12 +166,12 @@ rm_cleanIPI(void *arg)
 		tracker = (struct rm_priotracker *)queue;
 		if (tracker->rmp_rmlock == rm && tracker->rmp_flags == 0) {
 			tracker->rmp_flags = RMPF_ONQUEUE;
-			mtx_lock_spin(&rm_spinlock);
-			LIST_INSERT_HEAD(&rm->rm_activeReaders, tracker,
-			    rmp_qentry);
-			mtx_unlock_spin(&rm_spinlock);
+			TAILQ_INSERT_HEAD(&tmp_list, tracker, rmp_qentry);
 		}
 	}
+	mtx_lock_spin(&rm_spinlock);
+	TAILQ_CONCAT(&rm->rm_activeReaders, &tmp_list, rmp_qentry);
+	mtx_unlock_spin(&rm_spinlock);
 }
 
 CTASSERT((RM_SLEEPABLE & LO_CLASSFLAGS) == RM_SLEEPABLE);
@@ -186,7 +187,7 @@ rm_init_flags(struct rmlock *rm, const char *name, int opts)
 	if (opts & RM_RECURSE)
 		liflags |= LO_RECURSABLE;
 	rm->rm_writecpus = all_cpus;
-	LIST_INIT(&rm->rm_activeReaders);
+	TAILQ_INIT(&rm->rm_activeReaders);
 	if (opts & RM_SLEEPABLE) {
 		liflags |= RM_SLEEPABLE;
 		sx_init_flags(&rm->rm_lock_sx, "rmlock_sx", SX_RECURSE);
@@ -281,7 +282,7 @@ _rm_rlock_hard(struct rmlock *rm, struct rm_priotracker *tracker, int trylock)
 			if ((atracker->rmp_rmlock == rm) &&
 			    (atracker->rmp_thread == tracker->rmp_thread)) {
 				mtx_lock_spin(&rm_spinlock);
-				LIST_INSERT_HEAD(&rm->rm_activeReaders,
+				TAILQ_INSERT_HEAD(&rm->rm_activeReaders,
 				    tracker, rmp_qentry);
 				tracker->rmp_flags = RMPF_ONQUEUE;
 				mtx_unlock_spin(&rm_spinlock);
@@ -373,7 +374,8 @@ _rm_unlock_hard(struct thread *td,struct rm_priotracker *tracker)
 		return;
 
 	mtx_lock_spin(&rm_spinlock);
-	LIST_REMOVE(tracker, rmp_qentry);
+	TAILQ_REMOVE(&tracker->rmp_rmlock->rm_activeReaders, tracker,
+	    rmp_qentry);
 
 	if (tracker->rmp_flags & RMPF_SIGNAL) {
 		struct rmlock *rm;
@@ -445,7 +447,7 @@ _rm_wlock(struct rmlock *rm)
 #endif
 
 		mtx_lock_spin(&rm_spinlock);
-		while ((prio = LIST_FIRST(&rm->rm_activeReaders)) != NULL) {
+		while ((prio = TAILQ_FIRST(&rm->rm_activeReaders)) != NULL) {
 			ts = turnstile_trywait(&rm->lock_object);
 			prio->rmp_flags = RMPF_ONQUEUE | RMPF_SIGNAL;
 			mtx_unlock_spin(&rm_spinlock);
diff --git a/src/sys/kern/subr_smp.c b/src/sys/kern/subr_smp.c
index 98c81cc..e61d6f8 100644
--- a/src/sys/kern/subr_smp.c
+++ b/src/sys/kern/subr_smp.c
@@ -112,7 +112,8 @@ static void (*volatile smp_rv_setup_func)(void *arg);
 static void (*volatile smp_rv_action_func)(void *arg);
 static void (*volatile smp_rv_teardown_func)(void *arg);
 static void * volatile smp_rv_func_arg;
-static volatile int smp_rv_waiters[3];
+static volatile int smp_rv_waiters[4];
+static volatile cpumask_t smp_rv_targets;
 
 /* 
  * Shared mutex to restrict busywaits between smp_rendezvous() and
@@ -347,15 +348,41 @@ restart_cpus(cpumask_t map)
 void
 smp_rendezvous_action(void)
 {
-	void* local_func_arg = smp_rv_func_arg;
-	void (*local_setup_func)(void*)   = smp_rv_setup_func;
-	void (*local_action_func)(void*)   = smp_rv_action_func;
-	void (*local_teardown_func)(void*) = smp_rv_teardown_func;
+	int tmp, local_ncpu;
+	cpumask_t local_mask;
+	void* local_func_arg;
+	void (*local_setup_func)(void*);
+	void (*local_action_func)(void*);
+	void (*local_teardown_func)(void*);
 
 	/* Ensure we have up-to-date values. */
-	atomic_add_acq_int(&smp_rv_waiters[0], 1);
-	while (smp_rv_waiters[0] < smp_rv_ncpus)
+	curthread->td_critnest++;	/* critical_enter */
+	tmp = 0;
+	while ((local_mask = atomic_load_acq_int(&smp_rv_targets)) == 0) {
 		cpu_spinwait();
+		tmp++;
+	}
+#ifdef INVARIANTS
+	if (tmp != 0)
+		printf("%s: CPU%d got IPI too early and spun %d times\n",
+		    __func__, curcpu, tmp);
+#endif
+	if ((local_mask & (1 << curcpu)) == 0) {
+#ifdef INVARIANTS
+		printf("%s: %d not in %x\n", __func__, curcpu, local_mask);
+#endif
+		curthread->td_critnest--;	/* critical_exit */
+		return;
+	}
+	atomic_add_int(&smp_rv_waiters[0], 1);
+	local_ncpu = smp_rv_ncpus;
+	while ((tmp = smp_rv_waiters[0]) < local_ncpu)
+		cpu_spinwait();
+	KASSERT(tmp == local_ncpu, ("stale value for smp_rv_waiters"));
+	local_func_arg = smp_rv_func_arg;
+	local_setup_func = smp_rv_setup_func;
+	local_action_func = smp_rv_action_func;
+	local_teardown_func = smp_rv_teardown_func;
 
 	/* setup function */
 	if (local_setup_func != smp_no_rendevous_barrier) {
@@ -364,7 +391,7 @@ smp_rendezvous_action(void)
 
 		/* spin on entry rendezvous */
 		atomic_add_int(&smp_rv_waiters[1], 1);
-		while (smp_rv_waiters[1] < smp_rv_ncpus)
+		while (smp_rv_waiters[1] < local_ncpu)
                 	cpu_spinwait();
 	}
 
@@ -372,16 +399,22 @@ smp_rendezvous_action(void)
 	if (local_action_func != NULL)
 		local_action_func(local_func_arg);
 
-	/* spin on exit rendezvous */
 	atomic_add_int(&smp_rv_waiters[2], 1);
-	if (local_teardown_func == smp_no_rendevous_barrier)
-                return;
-	while (smp_rv_waiters[2] < smp_rv_ncpus)
+	if (local_teardown_func == smp_no_rendevous_barrier) {
+		atomic_add_int(&smp_rv_waiters[3], 1);
+		curthread->td_critnest--;	/* critical_exit */
+		return;
+	}
+	/* wait until all CPUs are done with the main action */
+	while (smp_rv_waiters[2] < local_ncpu)
 		cpu_spinwait();
 
+	atomic_add_int(&smp_rv_waiters[3], 1);
 	/* teardown function */
 	if (local_teardown_func != NULL)
 		local_teardown_func(local_func_arg);
+
+	curthread->td_critnest--;	/* critical_exit */
 }
 
 void
@@ -418,21 +451,31 @@ smp_rendezvous_cpus(cpumask_t map,
 	smp_rv_action_func = action_func;
 	smp_rv_teardown_func = teardown_func;
 	smp_rv_func_arg = arg;
+	smp_rv_waiters[0] = 0;
 	smp_rv_waiters[1] = 0;
 	smp_rv_waiters[2] = 0;
-	atomic_store_rel_int(&smp_rv_waiters[0], 0);
+	smp_rv_waiters[3] = 0;
+	atomic_store_rel_int(&smp_rv_targets, map);
 
 	/* signal other processors, which will enter the IPI with interrupts off */
 	ipi_selected(map & ~(1 << curcpu), IPI_RENDEZVOUS);
 
 	/* Check if the current CPU is in the map */
-	if ((map & (1 << curcpu)) != 0)
+	if ((map & (1 << curcpu)) != 0) {
 		smp_rendezvous_action();
-
-	if (teardown_func == smp_no_rendevous_barrier)
-		while (atomic_load_acq_int(&smp_rv_waiters[2]) < ncpus)
+		/* wait until all CPUs will exit smp_rendezvous_action() */
+		while (atomic_load_acq_int(&smp_rv_waiters[3]) < ncpus)
 			cpu_spinwait();
+	} else {
+		/* wait until all CPUs will exit smp_rendezvous_action() */
+		while (smp_rv_waiters[2] < ncpus)
+			cpu_spinwait();
+	}
 
+#ifdef INVARIANTS
+	smp_rv_waiters[0] = 0xdead;
+	smp_rv_targets = 0;
+#endif
 	/* release lock */
 	mtx_unlock_spin(&smp_ipi_mtx);
 }
diff --git a/src/sys/sys/_rmlock.h b/src/sys/sys/_rmlock.h
index 75a159c..1f1e36e 100644
--- a/src/sys/sys/_rmlock.h
+++ b/src/sys/sys/_rmlock.h
@@ -41,12 +41,12 @@
  * Mostly reader/occasional writer lock.
  */
 
-LIST_HEAD(rmpriolist,rm_priotracker);
+TAILQ_HEAD(rmpriolist,rm_priotracker);
 
 struct rmlock {
 	struct lock_object lock_object; 
 	volatile cpumask_t rm_writecpus;
-	LIST_HEAD(,rm_priotracker) rm_activeReaders;
+	TAILQ_HEAD(,rm_priotracker) rm_activeReaders;
 	union {
 		struct mtx _rm_lock_mtx;
 		struct sx _rm_lock_sx;
@@ -60,7 +60,7 @@ struct rm_priotracker {
 	struct rmlock *rmp_rmlock;
 	struct thread *rmp_thread;
 	int rmp_flags;
-	LIST_ENTRY(rm_priotracker) rmp_qentry;
+	TAILQ_ENTRY(rm_priotracker) rmp_qentry;
 };
 
 #endif /* !_SYS__RMLOCK_H_ */
_______________________________________________
freebsd-current@freebsd.org mailing list
http://lists.freebsd.org/mailman/listinfo/freebsd-current
To unsubscribe, send any mail to "freebsd-current-unsubscr...@freebsd.org"

Reply via email to