This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit d31f829baafd3a45208c85e7d791452b4e997235
Author: Cliff Jansen <[email protected]>
AuthorDate: Mon Nov 22 10:25:23 2021 -0800

    PROTON-2362: epoll proactor fix for tsan_tr2.txt. Make scheduling and 
re-scheduling completely separate.
---
 c/src/proactor/epoll-internal.h |  14 ++-
 c/src/proactor/epoll.c          | 226 +++++++++++++++++++++++-----------------
 c/tests/proactor_test.cpp       |  30 ++++--
 3 files changed, 168 insertions(+), 102 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index f0f57af..8e9e1b2 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -88,8 +88,9 @@ typedef struct task_t {
   bool working;
   bool ready;                // ready to run and on ready list.  Poller 
notified by eventfd.
   bool waking;
-  bool on_ready_list;        // todo: protected by eventfd_mutex or sched 
mutex?  needed?
+  unsigned int ready_generation;
   struct task_t *ready_next; // ready list, guarded by proactor eventfd_mutex
+  struct task_t *resched_next; // resched list, guarded by sched mutex
   bool closing;
   // Next 4 are protected by the proactor mutex
   struct task_t* next;  /* Protected by proactor.mutex */
@@ -164,6 +165,8 @@ struct pn_proactor_t {
   bool ready_list_active;
   task_t *ready_list_first;
   task_t *ready_list_last;
+  unsigned int ready_list_count;
+  unsigned int ready_list_generation; // protected by both eventfd_mutex and a 
single p->poller instance
   // Interrupts have a dedicated eventfd because they must be async-signal 
safe.
   int interruptfd;
   // If the process runs out of file descriptors, disarm listening sockets 
temporarily and save them here.
@@ -188,7 +191,14 @@ struct pn_proactor_t {
   tslot_t *last_earmark;
   task_t *sched_ready_first;
   task_t *sched_ready_last;
-  task_t *sched_ready_current;
+  task_t *sched_ready_current; // TODO: remove or use for sceduling priority 
or fairness
+  unsigned int sched_ready_count;
+  task_t *resched_first;
+  task_t *resched_last;
+  task_t *resched_cutoff; // last resched task of current poller work 
snapshot.  TODO: superseded by polled_resched_count?
+  task_t *resched_next;
+  unsigned int resched_count;
+  unsigned int polled_resched_count; 
   pmutex tslot_mutex;
   int earmark_count;
   bool earmark_drain;
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 31edfbe..ea2e25a 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -260,28 +260,31 @@ static void pop_ready_task(task_t *tsk) {
   // !ready .. schedule() .. on ready_list .. on sched_ready_list .. working 
task .. !sched_ready && !ready
   //
   // Intervening locks at each transition ensures ready_next has memory 
coherence throughout the ready task scheduling cycle.
+  // TODO: sched_ready list changed to sequential processing.  Review need for 
sched_ready_current.
   pn_proactor_t *p = tsk->proactor;
   if (tsk == p->sched_ready_current)
     p->sched_ready_current = tsk->ready_next;
-  if (tsk == p->sched_ready_first) {
-    // normal code path
-    if (tsk == p->sched_ready_last) {
-      p->sched_ready_first = p->sched_ready_last = NULL;
-    } else {
-      p->sched_ready_first = tsk->ready_next;
-    }
-    if (!p->sched_ready_first)
-      p->sched_ready_last = NULL;
+  assert (tsk == p->sched_ready_first);
+  assert (p->sched_ready_count);
+  p->sched_ready_count--;
+  if (tsk == p->sched_ready_last) {
+    p->sched_ready_first = p->sched_ready_last = NULL;
   } else {
-    // tsk is not first in a multi-element list
-    task_t *prev = NULL;
-    for (task_t *i = p->sched_ready_first; i != tsk; i = i->ready_next)
-      prev = i;
-    prev->ready_next = tsk->ready_next;
-    if (tsk == p->sched_ready_last)
-      p->sched_ready_last = prev;
+    p->sched_ready_first = tsk->ready_next;
   }
-  tsk->on_ready_list = false;
+  if (!p->sched_ready_first) {
+    p->sched_ready_last = NULL;
+    assert(p->sched_ready_count == 0);
+  }
+}
+
+// Call only as the poller task that has already called schedule_ready_list() 
and already
+// incremented p->ready_list_generation.  All list elements before 
sched_ready_last have
+// correct generation from mutex barrier and cannot have tsk->ready_generation 
set to a
+// new generation until after the poller task releases the sched lock and 
allows tsk to
+// run again.
+inline static bool on_sched_ready_list(task_t *tsk, pn_proactor_t *p) {
+  return tsk->ready_generation && (tsk->ready_generation != 
p->ready_list_generation);
 }
 
 // part1: call with tsk->owner lock held, return true if notify_poller 
required by caller.
@@ -294,8 +297,10 @@ bool schedule(task_t *tsk) {
       tsk->ready = true;
       pn_proactor_t *p = tsk->proactor;
       lock(&p->eventfd_mutex);
+      assert(tsk->ready_generation == 0);  // Can't be on list twice
       tsk->ready_next = NULL;
-      tsk->on_ready_list = true;
+      tsk->ready_generation = p->ready_list_generation;
+      p->ready_list_count++;
       if (!p->ready_list_first) {
         p->ready_list_first = p->ready_list_last = tsk;
       } else {
@@ -323,7 +328,11 @@ void notify_poller(pn_proactor_t *p) {
 
 // call with task lock held from xxx_process().
 void schedule_done(task_t *tsk) {
-//  assert(tsk->ready > 0);
+  assert(tsk->ready);
+  lock(&tsk->proactor->eventfd_mutex);
+  assert(tsk->ready_generation != 0);
+  tsk->ready_generation = 0;
+  unlock(&tsk->proactor->eventfd_mutex);
   tsk->ready = false;
 }
 
@@ -439,29 +448,19 @@ static void assign_thread(tslot_t *ts, task_t *tsk) {
 }
 
 // call with sched lock
-static bool reschedule(task_t *tsk) {
-  // Special case schedule() where task is done/unassigned but sched_pending 
work has arrived.
-  // Should be an infrequent corner case.
-  bool notify = false;
-  pn_proactor_t *p = tsk->proactor;
-  lock(&p->eventfd_mutex);
-  assert(tsk->ready);
-  assert(!tsk->on_ready_list);
-  tsk->ready_next = NULL;
-  tsk->on_ready_list = true;
-  if (!p->ready_list_first) {
-    p->ready_list_first = p->ready_list_last = tsk;
-  } else {
-    p->ready_list_last->ready_next = tsk;
-    p->ready_list_last = tsk;
+static inline task_t *resched_pop_front(pn_proactor_t *p) {
+  assert(p->resched_cutoff);
+  task_t *tsk = p->resched_first;
+  p->resched_first = tsk->resched_next;
+  p->polled_resched_count--;
+  if (!p->resched_first)
+    p->resched_last = NULL;
+  if (tsk == p->resched_cutoff) {
+    assert(p->polled_resched_count == 0);
+    p->resched_cutoff = NULL;
   }
-  if (!p->ready_list_active) {
-    // unblock the poller via the eventfd
-    p->ready_list_active = true;
-    notify = true;
-  }
-  unlock(&p->eventfd_mutex);
-  return notify;
+  tsk->resched_next = NULL;
+  return tsk;
 }
 
 // Call with sched lock.
@@ -484,22 +483,23 @@ bool unassign_thread(pn_proactor_t *p, tslot_t *ts, 
tslot_state new_state, tslot
   if (tsk && !deleting) {
     ts->prev_task = tsk;
     if (tsk->sched_pending) {
-      // Make sure the task is already scheduled or put it on the ready list
-      if (tsk->sched_ready) {
-        if (!tsk->on_ready_list) {
-          // Remember it for next poller
-          tsk->sched_ready = false;
-          notify = reschedule(tsk);     // back on ready list for poller to see
-        }
-        // else already scheduled
+      // New work arrived, reschedule it:
+      tsk->runner = RESCHEDULE_PLACEHOLDER;  // Block tsk from being scheduled 
untl resched list is processed.
+      assert(!tsk->resched_next);
+      if (p->resched_last) {
+        p->resched_last->resched_next = tsk;
+        p->resched_last = tsk;
       } else {
-        // bad corner case.  Block tsk from being scheduled again until a 
later post_ready()
-        tsk->runner = RESCHEDULE_PLACEHOLDER;
-        unlock(&p->sched_mutex);
-        lock(&tsk->mutex);
-        notify = schedule(tsk);
-        unlock(&tsk->mutex);
-        lock(&p->sched_mutex);
+        p->resched_first = p->resched_last = tsk;
+      }
+      p->resched_count++;
+      if (p->poller_suspended) {
+        lock(&p->eventfd_mutex);
+        if (!p->ready_list_active) {
+          p->ready_list_active = true;
+          notify = true;
+        }
+        unlock(&p->eventfd_mutex);
       }
     }
   }
@@ -1992,6 +1992,7 @@ pn_proactor_t *pn_proactor() {
             epoll_eventfd_init(&p->epoll_interrupt, p->interruptfd, 
p->epollfd, false);
             p->tslot_map = pn_hash(PN_VOID, 0, 0.75);
             grow_poller_bufs(p);
+            p->ready_list_generation = 1;
             return p;
           }
       }
@@ -2284,26 +2285,26 @@ static void schedule_ready_list(pn_proactor_t *p) {
       p->sched_ready_current = p->sched_ready_first;
     p->ready_list_first = p->ready_list_last = NULL;
   }
+
+  // Track sched_ready_count to know how many threads may be needed.
+  p->sched_ready_count = p->ready_list_count;
+  p->ready_list_count = 0;
 }
 
-// Call with schedule lock held.  Called only by poller thread.
+// Call with schedule lock and eventfd lock held.  Called only by poller 
thread.
+// Needs to be quick.
 static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
   epoll_extended_t *ee = (epoll_extended_t *) evp->data.ptr;
   task_t *tsk = NULL;
 
   switch (ee->type) {
   case EVENT_FD:
-    if  (ee->fd == p->interruptfd) {        /* Interrupts have their own 
dedicated eventfd */
+    if (ee->fd == p->interruptfd) {        /* Interrupts have their own 
dedicated eventfd */
       p->sched_interrupt = true;
       tsk = &p->task;
       tsk->sched_pending = true;
-    } else {
-      // main ready tasks eventfd
-      lock(&p->eventfd_mutex);
-      schedule_ready_list(p);
-      tsk = p->sched_ready_current;
-      unlock(&p->eventfd_mutex);
     }
+    // else if (ee->fd == p->eventfd)... schedule_ready_list already performed 
by poller task.
     break;
   case PCONNECTION_IO: {
     psocket_t *ps = containerof(ee, psocket_t, epoll_io);
@@ -2338,13 +2339,13 @@ static task_t *post_event(pn_proactor_t *p, struct 
epoll_event *evp) {
     break;
   }
   }
-  if (tsk && !tsk->runnable && !tsk->runner)
+  if (tsk && !tsk->runnable && !tsk->runner && !on_sched_ready_list(tsk, p))
     return tsk;
   return NULL;
 }
 
 
-static task_t *post_ready(pn_proactor_t *p, task_t *tsk) {
+static inline task_t *post_ready(pn_proactor_t *p, task_t *tsk) {
   tsk->sched_ready = true;
   tsk->sched_pending = true;
   if (!tsk->runnable && !tsk->runner)
@@ -2387,7 +2388,7 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t 
*ts) {
 
   // warm pairing ?
   task_t *tsk = ts->prev_task;
-  if (tsk && (tsk->runnable)) { // or tsk->sched_ready too?
+  if (tsk && (tsk->runnable)) {
     assign_thread(ts, tsk);
     return tsk;
   }
@@ -2406,10 +2407,24 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t 
*ts) {
     }
   }
 
-  if (p->sched_ready_current) {
+  while (p->sched_ready_count) {
     tsk = p->sched_ready_current;
-    pop_ready_task(tsk);  // updates sched_ready_current
-    assert(!tsk->runnable && !tsk->runner);
+    assert(tsk->ready); // eventfd_mutex required post ready set and pre move 
to sched_ready_list
+    if (post_ready(p, tsk)) {
+      pop_ready_task(tsk);  // updates sched_ready_current
+      assert(!tsk->runnable && !tsk->runner);
+      assign_thread(ts, tsk);
+      return tsk;
+    } else {
+      pop_ready_task(tsk);
+    }
+  }
+
+  if (p->polled_resched_count) {
+    // Unprocessed resched tasks remain.
+    tsk = resched_pop_front(p);
+    assert(tsk->sched_pending && !tsk->runnable && tsk->runner == 
RESCHEDULE_PLACEHOLDER);
+    tsk->runner = NULL;
     assign_thread(ts, tsk);
     return tsk;
   }
@@ -2489,6 +2504,7 @@ static bool poller_do_epoll(struct pn_proactor_t* p, 
tslot_t *ts, bool can_block
   // As poller with lots to do, be mindful of hogging the sched lock.  Release 
when making kernel calls.
   int n_events;
   task_t *tsk;
+  assert(!p->resched_cutoff);
 
   while (true) {
     assert(p->n_runnables == 0);
@@ -2499,19 +2515,21 @@ static bool poller_do_epoll(struct pn_proactor_t* p, 
tslot_t *ts, bool can_block
     p->last_earmark = NULL;
 
     bool unfinished_earmarks = p->earmark_count > 0;
-    bool new_ready_tasks = false;
-    bool epoll_immediate = unfinished_earmarks || !can_block;
+    bool epoll_immediate = p->resched_first || unfinished_earmarks || 
!can_block;
     assert(!p->sched_ready_first);
+
+    // Determine if notify_poller() can be avoided.
     if (!epoll_immediate) {
       lock(&p->eventfd_mutex);
       if (p->ready_list_first) {
         epoll_immediate = true;
-        new_ready_tasks = true;
       } else {
+        // Poller may sleep.  Enable eventfd wakeup.
         p->ready_list_active = false;
       }
       unlock(&p->eventfd_mutex);
     }
+
     int timeout = (epoll_immediate) ? 0 : -1;
     p->poller_suspended = (timeout == -1);
     unlock(&p->sched_mutex);
@@ -2522,16 +2540,26 @@ static bool poller_do_epoll(struct pn_proactor_t* p, 
tslot_t *ts, bool can_block
     p->poller_suspended = false;
 
     bool unpolled_work = false;
+    if (p->resched_first) {
+      // Defer future resched tasks until next do_epoll()
+      p->resched_cutoff = p->resched_last;
+      assert(p->polled_resched_count == 0);
+      p->polled_resched_count = p->resched_count;
+      p->resched_count = 0;
+      unpolled_work = true;
+    }
     if (p->earmark_count > 0) {
       p->earmark_drain = true;
       unpolled_work = true;
     }
-    if (new_ready_tasks) {
-      lock(&p->eventfd_mutex);
-      schedule_ready_list(p);
-      unlock(&p->eventfd_mutex);
+    // Take stock of ready list before any post_event()
+    lock(&p->eventfd_mutex);
+    schedule_ready_list(p);
+    if (p->sched_ready_first)
       unpolled_work = true;
-    }
+    if (++p->ready_list_generation == 0) // wrapping OK, but 0 means unset
+      p->ready_list_generation = 1;
+    unlock(&p->eventfd_mutex);
 
     if (n_events < 0) {
       if (errno != EINTR)
@@ -2556,37 +2584,46 @@ static bool poller_do_epoll(struct pn_proactor_t* p, 
tslot_t *ts, bool can_block
 
   // We have unpolled work or at least one new epoll event
 
-
+  lock(&p->eventfd_mutex);
+  // Longest hold of eventfd_mutex.  The following must be quick with no 
external calls:
+  // post_event(), make_runnable(), assign_thread(), earmark_thread().
   for (int i = 0; i < n_events; i++) {
     tsk = post_event(p, &p->kevents[i]);
     if (tsk)
       make_runnable(tsk);
   }
+  unlock(&p->eventfd_mutex);
+
   if (n_events > 0)
     memset(p->kevents, 0, sizeof(struct epoll_event) * n_events);
 
-  // The list of ready tasks can be very long.  Traverse part of it looking 
for warm pairings.
+  // The list of ready tasks can be very long.  Tradeoff between slow walk 
through linked
+  // list looking for more warm pairings (while holding the sched lock), or 
letting
+  // threads looking for work grab from the front.  Search less when busy.  
TODO:
+  // instrument an optimal value or heuristic.
+  int warm_tries = p->suspend_list_count - p->n_warm_runnables;
+  if (warm_tries < 0)
+    warm_tries = 0;
+
   task_t *ctsk = p->sched_ready_current;
   int max_runnables = p->runnables_capacity;
-  while (ctsk && p->n_runnables < max_runnables) {
-    if (ctsk->runner == RESCHEDULE_PLACEHOLDER)
-      ctsk->runner = NULL;  // Allow task to run again.
+  while (p->sched_ready_count && p->n_runnables < max_runnables && warm_tries) 
{
+    assert(ctsk);
     tsk = post_ready(p, ctsk);
+    pop_ready_task(ctsk);
+    warm_tries--;
     if (tsk)
       make_runnable(tsk);
-    pop_ready_task(ctsk);
     ctsk = ctsk->ready_next;
   }
   p->sched_ready_current = ctsk;
-  // More ready tasks than places on the runnables list
-  while (ctsk) {
-    if (ctsk->runner == RESCHEDULE_PLACEHOLDER)
-      ctsk->runner = NULL;  // Allow task to run again.
-    ctsk->sched_ready = true;
-    ctsk->sched_pending = true;
-    if (ctsk->runnable || ctsk->runner)
-      pop_ready_task(ctsk);
-    ctsk = ctsk->ready_next;
+
+  while (p->resched_cutoff && p->n_runnables < max_runnables && warm_tries) {
+    ctsk = resched_pop_front(p);
+    assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnable);
+    ctsk->runner = NULL;  // Allow task to run again.
+    warm_tries--;
+    make_runnable(ctsk);
   }
 
   if (pni_immediate && !ts->task) {
@@ -2598,6 +2635,7 @@ static bool poller_do_epoll(struct pn_proactor_t* p, 
tslot_t *ts, bool can_block
       if (++p->next_runnable == p->n_runnables)
         p->n_runnables = 0;
     } else if (p->n_warm_runnables) {
+      // Immediate doesn't contemplate some other (warm) thread running instead
       ptsk = p->warm_runnables[--p->n_warm_runnables];
       tslot_t *ts2 = ptsk->runner;
       ts2->prev_task = ts2->task = NULL;
@@ -2625,7 +2663,7 @@ static void poller_done(struct pn_proactor_t* p, tslot_t 
*ts) {
   tslot_t **resume_list2 = NULL;
 
   if (p->suspend_list_count) {
-    int max_resumes = p->n_warm_runnables + p->n_runnables;
+    int max_resumes = p->n_warm_runnables + p->n_runnables + 
p->sched_ready_count + p->polled_resched_count;
     max_resumes = pn_min(max_resumes, p->suspend_list_count);
     if (max_resumes) {
       resume_list2 = (tslot_t **) alloca(max_resumes * sizeof(tslot_t *));
diff --git a/c/tests/proactor_test.cpp b/c/tests/proactor_test.cpp
index 73e285e..07a13ba 100644
--- a/c/tests/proactor_test.cpp
+++ b/c/tests/proactor_test.cpp
@@ -538,9 +538,18 @@ TEST_CASE("proactor_ssl") {
   REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
   CHECK_THAT(*client.last_condition,
              cond_matches("amqp:connection:framing-error", "SSL"));
-  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
-  REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
-  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+  int errs = 1;
+  int closes = 0;
+  while (errs < 2 && closes < 2) {
+    pn_event_type_t et = p.run(PN_TRANSPORT_CLOSED);
+    switch(et) {
+    case PN_TRANSPORT_ERROR: errs++; break;
+    case PN_TRANSPORT_CLOSED: closes++; break;
+    default:
+      FAIL( "bad stop event " << pn_event_type_name(et)) ;
+      break;
+    }
+  }
 
   /* Deliberate use of Anonymous */
   pn_ssl_domain_t *cd = client.ssl_domain;
@@ -576,9 +585,18 @@ TEST_CASE("proactor_ssl") {
   REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
   CHECK_THAT(*client.last_condition,
              cond_matches("amqp:connection:framing-error", "SSL"));
-  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
-  REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
-  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+  errs = 1;
+  closes = 0;
+  while (errs < 2 && closes < 2) {
+    pn_event_type_t et = p.run(PN_TRANSPORT_CLOSED);
+    switch(et) {
+    case PN_TRANSPORT_ERROR: errs++; break;
+    case PN_TRANSPORT_CLOSED: closes++; break;
+    default:
+      FAIL( "bad stop event " << pn_event_type_name(et)) ;
+      break;
+    }
+  }
 
   /* Can ignore bad hostname */
   REQUIRE(0 == pn_ssl_domain_set_peer_authentication(

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to