This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit d31f829baafd3a45208c85e7d791452b4e997235 Author: Cliff Jansen <[email protected]> AuthorDate: Mon Nov 22 10:25:23 2021 -0800 PROTON-2362: epoll proactor fix for tsan_tr2.txt. Make scheduling and re-scheduling completely separate. --- c/src/proactor/epoll-internal.h | 14 ++- c/src/proactor/epoll.c | 226 +++++++++++++++++++++++----------------- c/tests/proactor_test.cpp | 30 ++++-- 3 files changed, 168 insertions(+), 102 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index f0f57af..8e9e1b2 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -88,8 +88,9 @@ typedef struct task_t { bool working; bool ready; // ready to run and on ready list. Poller notified by eventfd. bool waking; - bool on_ready_list; // todo: protected by eventfd_mutex or sched mutex? needed? + unsigned int ready_generation; struct task_t *ready_next; // ready list, guarded by proactor eventfd_mutex + struct task_t *resched_next; // resched list, guarded by sched mutex bool closing; // Next 4 are protected by the proactor mutex struct task_t* next; /* Protected by proactor.mutex */ @@ -164,6 +165,8 @@ struct pn_proactor_t { bool ready_list_active; task_t *ready_list_first; task_t *ready_list_last; + unsigned int ready_list_count; + unsigned int ready_list_generation; // protected by both eventfd_mutex and a single p->poller instance // Interrupts have a dedicated eventfd because they must be async-signal safe. int interruptfd; // If the process runs out of file descriptors, disarm listening sockets temporarily and save them here. @@ -188,7 +191,14 @@ struct pn_proactor_t { tslot_t *last_earmark; task_t *sched_ready_first; task_t *sched_ready_last; - task_t *sched_ready_current; + task_t *sched_ready_current; // TODO: remove or use for sceduling priority or fairness + unsigned int sched_ready_count; + task_t *resched_first; + task_t *resched_last; + task_t *resched_cutoff; // last resched task of current poller work snapshot. TODO: superseded by polled_resched_count? + task_t *resched_next; + unsigned int resched_count; + unsigned int polled_resched_count; pmutex tslot_mutex; int earmark_count; bool earmark_drain; diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 31edfbe..ea2e25a 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -260,28 +260,31 @@ static void pop_ready_task(task_t *tsk) { // !ready .. schedule() .. on ready_list .. on sched_ready_list .. working task .. !sched_ready && !ready // // Intervening locks at each transition ensures ready_next has memory coherence throughout the ready task scheduling cycle. + // TODO: sched_ready list changed to sequential processing. Review need for sched_ready_current. pn_proactor_t *p = tsk->proactor; if (tsk == p->sched_ready_current) p->sched_ready_current = tsk->ready_next; - if (tsk == p->sched_ready_first) { - // normal code path - if (tsk == p->sched_ready_last) { - p->sched_ready_first = p->sched_ready_last = NULL; - } else { - p->sched_ready_first = tsk->ready_next; - } - if (!p->sched_ready_first) - p->sched_ready_last = NULL; + assert (tsk == p->sched_ready_first); + assert (p->sched_ready_count); + p->sched_ready_count--; + if (tsk == p->sched_ready_last) { + p->sched_ready_first = p->sched_ready_last = NULL; } else { - // tsk is not first in a multi-element list - task_t *prev = NULL; - for (task_t *i = p->sched_ready_first; i != tsk; i = i->ready_next) - prev = i; - prev->ready_next = tsk->ready_next; - if (tsk == p->sched_ready_last) - p->sched_ready_last = prev; + p->sched_ready_first = tsk->ready_next; } - tsk->on_ready_list = false; + if (!p->sched_ready_first) { + p->sched_ready_last = NULL; + assert(p->sched_ready_count == 0); + } +} + +// Call only as the poller task that has already called schedule_ready_list() and already +// incremented p->ready_list_generation. All list elements before sched_ready_last have +// correct generation from mutex barrier and cannot have tsk->ready_generation set to a +// new generation until after the poller task releases the sched lock and allows tsk to +// run again. +inline static bool on_sched_ready_list(task_t *tsk, pn_proactor_t *p) { + return tsk->ready_generation && (tsk->ready_generation != p->ready_list_generation); } // part1: call with tsk->owner lock held, return true if notify_poller required by caller. @@ -294,8 +297,10 @@ bool schedule(task_t *tsk) { tsk->ready = true; pn_proactor_t *p = tsk->proactor; lock(&p->eventfd_mutex); + assert(tsk->ready_generation == 0); // Can't be on list twice tsk->ready_next = NULL; - tsk->on_ready_list = true; + tsk->ready_generation = p->ready_list_generation; + p->ready_list_count++; if (!p->ready_list_first) { p->ready_list_first = p->ready_list_last = tsk; } else { @@ -323,7 +328,11 @@ void notify_poller(pn_proactor_t *p) { // call with task lock held from xxx_process(). void schedule_done(task_t *tsk) { -// assert(tsk->ready > 0); + assert(tsk->ready); + lock(&tsk->proactor->eventfd_mutex); + assert(tsk->ready_generation != 0); + tsk->ready_generation = 0; + unlock(&tsk->proactor->eventfd_mutex); tsk->ready = false; } @@ -439,29 +448,19 @@ static void assign_thread(tslot_t *ts, task_t *tsk) { } // call with sched lock -static bool reschedule(task_t *tsk) { - // Special case schedule() where task is done/unassigned but sched_pending work has arrived. - // Should be an infrequent corner case. - bool notify = false; - pn_proactor_t *p = tsk->proactor; - lock(&p->eventfd_mutex); - assert(tsk->ready); - assert(!tsk->on_ready_list); - tsk->ready_next = NULL; - tsk->on_ready_list = true; - if (!p->ready_list_first) { - p->ready_list_first = p->ready_list_last = tsk; - } else { - p->ready_list_last->ready_next = tsk; - p->ready_list_last = tsk; +static inline task_t *resched_pop_front(pn_proactor_t *p) { + assert(p->resched_cutoff); + task_t *tsk = p->resched_first; + p->resched_first = tsk->resched_next; + p->polled_resched_count--; + if (!p->resched_first) + p->resched_last = NULL; + if (tsk == p->resched_cutoff) { + assert(p->polled_resched_count == 0); + p->resched_cutoff = NULL; } - if (!p->ready_list_active) { - // unblock the poller via the eventfd - p->ready_list_active = true; - notify = true; - } - unlock(&p->eventfd_mutex); - return notify; + tsk->resched_next = NULL; + return tsk; } // Call with sched lock. @@ -484,22 +483,23 @@ bool unassign_thread(pn_proactor_t *p, tslot_t *ts, tslot_state new_state, tslot if (tsk && !deleting) { ts->prev_task = tsk; if (tsk->sched_pending) { - // Make sure the task is already scheduled or put it on the ready list - if (tsk->sched_ready) { - if (!tsk->on_ready_list) { - // Remember it for next poller - tsk->sched_ready = false; - notify = reschedule(tsk); // back on ready list for poller to see - } - // else already scheduled + // New work arrived, reschedule it: + tsk->runner = RESCHEDULE_PLACEHOLDER; // Block tsk from being scheduled untl resched list is processed. + assert(!tsk->resched_next); + if (p->resched_last) { + p->resched_last->resched_next = tsk; + p->resched_last = tsk; } else { - // bad corner case. Block tsk from being scheduled again until a later post_ready() - tsk->runner = RESCHEDULE_PLACEHOLDER; - unlock(&p->sched_mutex); - lock(&tsk->mutex); - notify = schedule(tsk); - unlock(&tsk->mutex); - lock(&p->sched_mutex); + p->resched_first = p->resched_last = tsk; + } + p->resched_count++; + if (p->poller_suspended) { + lock(&p->eventfd_mutex); + if (!p->ready_list_active) { + p->ready_list_active = true; + notify = true; + } + unlock(&p->eventfd_mutex); } } } @@ -1992,6 +1992,7 @@ pn_proactor_t *pn_proactor() { epoll_eventfd_init(&p->epoll_interrupt, p->interruptfd, p->epollfd, false); p->tslot_map = pn_hash(PN_VOID, 0, 0.75); grow_poller_bufs(p); + p->ready_list_generation = 1; return p; } } @@ -2284,26 +2285,26 @@ static void schedule_ready_list(pn_proactor_t *p) { p->sched_ready_current = p->sched_ready_first; p->ready_list_first = p->ready_list_last = NULL; } + + // Track sched_ready_count to know how many threads may be needed. + p->sched_ready_count = p->ready_list_count; + p->ready_list_count = 0; } -// Call with schedule lock held. Called only by poller thread. +// Call with schedule lock and eventfd lock held. Called only by poller thread. +// Needs to be quick. static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) { epoll_extended_t *ee = (epoll_extended_t *) evp->data.ptr; task_t *tsk = NULL; switch (ee->type) { case EVENT_FD: - if (ee->fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */ + if (ee->fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */ p->sched_interrupt = true; tsk = &p->task; tsk->sched_pending = true; - } else { - // main ready tasks eventfd - lock(&p->eventfd_mutex); - schedule_ready_list(p); - tsk = p->sched_ready_current; - unlock(&p->eventfd_mutex); } + // else if (ee->fd == p->eventfd)... schedule_ready_list already performed by poller task. break; case PCONNECTION_IO: { psocket_t *ps = containerof(ee, psocket_t, epoll_io); @@ -2338,13 +2339,13 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) { break; } } - if (tsk && !tsk->runnable && !tsk->runner) + if (tsk && !tsk->runnable && !tsk->runner && !on_sched_ready_list(tsk, p)) return tsk; return NULL; } -static task_t *post_ready(pn_proactor_t *p, task_t *tsk) { +static inline task_t *post_ready(pn_proactor_t *p, task_t *tsk) { tsk->sched_ready = true; tsk->sched_pending = true; if (!tsk->runnable && !tsk->runner) @@ -2387,7 +2388,7 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) { // warm pairing ? task_t *tsk = ts->prev_task; - if (tsk && (tsk->runnable)) { // or tsk->sched_ready too? + if (tsk && (tsk->runnable)) { assign_thread(ts, tsk); return tsk; } @@ -2406,10 +2407,24 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) { } } - if (p->sched_ready_current) { + while (p->sched_ready_count) { tsk = p->sched_ready_current; - pop_ready_task(tsk); // updates sched_ready_current - assert(!tsk->runnable && !tsk->runner); + assert(tsk->ready); // eventfd_mutex required post ready set and pre move to sched_ready_list + if (post_ready(p, tsk)) { + pop_ready_task(tsk); // updates sched_ready_current + assert(!tsk->runnable && !tsk->runner); + assign_thread(ts, tsk); + return tsk; + } else { + pop_ready_task(tsk); + } + } + + if (p->polled_resched_count) { + // Unprocessed resched tasks remain. + tsk = resched_pop_front(p); + assert(tsk->sched_pending && !tsk->runnable && tsk->runner == RESCHEDULE_PLACEHOLDER); + tsk->runner = NULL; assign_thread(ts, tsk); return tsk; } @@ -2489,6 +2504,7 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block // As poller with lots to do, be mindful of hogging the sched lock. Release when making kernel calls. int n_events; task_t *tsk; + assert(!p->resched_cutoff); while (true) { assert(p->n_runnables == 0); @@ -2499,19 +2515,21 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block p->last_earmark = NULL; bool unfinished_earmarks = p->earmark_count > 0; - bool new_ready_tasks = false; - bool epoll_immediate = unfinished_earmarks || !can_block; + bool epoll_immediate = p->resched_first || unfinished_earmarks || !can_block; assert(!p->sched_ready_first); + + // Determine if notify_poller() can be avoided. if (!epoll_immediate) { lock(&p->eventfd_mutex); if (p->ready_list_first) { epoll_immediate = true; - new_ready_tasks = true; } else { + // Poller may sleep. Enable eventfd wakeup. p->ready_list_active = false; } unlock(&p->eventfd_mutex); } + int timeout = (epoll_immediate) ? 0 : -1; p->poller_suspended = (timeout == -1); unlock(&p->sched_mutex); @@ -2522,16 +2540,26 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block p->poller_suspended = false; bool unpolled_work = false; + if (p->resched_first) { + // Defer future resched tasks until next do_epoll() + p->resched_cutoff = p->resched_last; + assert(p->polled_resched_count == 0); + p->polled_resched_count = p->resched_count; + p->resched_count = 0; + unpolled_work = true; + } if (p->earmark_count > 0) { p->earmark_drain = true; unpolled_work = true; } - if (new_ready_tasks) { - lock(&p->eventfd_mutex); - schedule_ready_list(p); - unlock(&p->eventfd_mutex); + // Take stock of ready list before any post_event() + lock(&p->eventfd_mutex); + schedule_ready_list(p); + if (p->sched_ready_first) unpolled_work = true; - } + if (++p->ready_list_generation == 0) // wrapping OK, but 0 means unset + p->ready_list_generation = 1; + unlock(&p->eventfd_mutex); if (n_events < 0) { if (errno != EINTR) @@ -2556,37 +2584,46 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block // We have unpolled work or at least one new epoll event - + lock(&p->eventfd_mutex); + // Longest hold of eventfd_mutex. The following must be quick with no external calls: + // post_event(), make_runnable(), assign_thread(), earmark_thread(). for (int i = 0; i < n_events; i++) { tsk = post_event(p, &p->kevents[i]); if (tsk) make_runnable(tsk); } + unlock(&p->eventfd_mutex); + if (n_events > 0) memset(p->kevents, 0, sizeof(struct epoll_event) * n_events); - // The list of ready tasks can be very long. Traverse part of it looking for warm pairings. + // The list of ready tasks can be very long. Tradeoff between slow walk through linked + // list looking for more warm pairings (while holding the sched lock), or letting + // threads looking for work grab from the front. Search less when busy. TODO: + // instrument an optimal value or heuristic. + int warm_tries = p->suspend_list_count - p->n_warm_runnables; + if (warm_tries < 0) + warm_tries = 0; + task_t *ctsk = p->sched_ready_current; int max_runnables = p->runnables_capacity; - while (ctsk && p->n_runnables < max_runnables) { - if (ctsk->runner == RESCHEDULE_PLACEHOLDER) - ctsk->runner = NULL; // Allow task to run again. + while (p->sched_ready_count && p->n_runnables < max_runnables && warm_tries) { + assert(ctsk); tsk = post_ready(p, ctsk); + pop_ready_task(ctsk); + warm_tries--; if (tsk) make_runnable(tsk); - pop_ready_task(ctsk); ctsk = ctsk->ready_next; } p->sched_ready_current = ctsk; - // More ready tasks than places on the runnables list - while (ctsk) { - if (ctsk->runner == RESCHEDULE_PLACEHOLDER) - ctsk->runner = NULL; // Allow task to run again. - ctsk->sched_ready = true; - ctsk->sched_pending = true; - if (ctsk->runnable || ctsk->runner) - pop_ready_task(ctsk); - ctsk = ctsk->ready_next; + + while (p->resched_cutoff && p->n_runnables < max_runnables && warm_tries) { + ctsk = resched_pop_front(p); + assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnable); + ctsk->runner = NULL; // Allow task to run again. + warm_tries--; + make_runnable(ctsk); } if (pni_immediate && !ts->task) { @@ -2598,6 +2635,7 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block if (++p->next_runnable == p->n_runnables) p->n_runnables = 0; } else if (p->n_warm_runnables) { + // Immediate doesn't contemplate some other (warm) thread running instead ptsk = p->warm_runnables[--p->n_warm_runnables]; tslot_t *ts2 = ptsk->runner; ts2->prev_task = ts2->task = NULL; @@ -2625,7 +2663,7 @@ static void poller_done(struct pn_proactor_t* p, tslot_t *ts) { tslot_t **resume_list2 = NULL; if (p->suspend_list_count) { - int max_resumes = p->n_warm_runnables + p->n_runnables; + int max_resumes = p->n_warm_runnables + p->n_runnables + p->sched_ready_count + p->polled_resched_count; max_resumes = pn_min(max_resumes, p->suspend_list_count); if (max_resumes) { resume_list2 = (tslot_t **) alloca(max_resumes * sizeof(tslot_t *)); diff --git a/c/tests/proactor_test.cpp b/c/tests/proactor_test.cpp index 73e285e..07a13ba 100644 --- a/c/tests/proactor_test.cpp +++ b/c/tests/proactor_test.cpp @@ -538,9 +538,18 @@ TEST_CASE("proactor_ssl") { REQUIRE_RUN(p, PN_TRANSPORT_ERROR); CHECK_THAT(*client.last_condition, cond_matches("amqp:connection:framing-error", "SSL")); - REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); - REQUIRE_RUN(p, PN_TRANSPORT_ERROR); - REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); + int errs = 1; + int closes = 0; + while (errs < 2 && closes < 2) { + pn_event_type_t et = p.run(PN_TRANSPORT_CLOSED); + switch(et) { + case PN_TRANSPORT_ERROR: errs++; break; + case PN_TRANSPORT_CLOSED: closes++; break; + default: + FAIL( "bad stop event " << pn_event_type_name(et)) ; + break; + } + } /* Deliberate use of Anonymous */ pn_ssl_domain_t *cd = client.ssl_domain; @@ -576,9 +585,18 @@ TEST_CASE("proactor_ssl") { REQUIRE_RUN(p, PN_TRANSPORT_ERROR); CHECK_THAT(*client.last_condition, cond_matches("amqp:connection:framing-error", "SSL")); - REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); - REQUIRE_RUN(p, PN_TRANSPORT_ERROR); - REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); + errs = 1; + closes = 0; + while (errs < 2 && closes < 2) { + pn_event_type_t et = p.run(PN_TRANSPORT_CLOSED); + switch(et) { + case PN_TRANSPORT_ERROR: errs++; break; + case PN_TRANSPORT_CLOSED: closes++; break; + default: + FAIL( "bad stop event " << pn_event_type_name(et)) ; + break; + } + } /* Can ignore bad hostname */ REQUIRE(0 == pn_ssl_domain_set_peer_authentication( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
