PROTON-1493: c-proactor make pn_proactor_interrupt async-signal-safe pn_proactor_interrupt() will often be used from signal handlers so must be async-signal-safe. Updated the documentation and modified the implementations of pn_proactor_interrupt() to use only async-signal-safe calls, no locks.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5f8738f5 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5f8738f5 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5f8738f5 Branch: refs/heads/PROTON-1488 Commit: 5f8738f573c3e9c39608714453b2425e3a105ec7 Parents: 8d862be Author: Alan Conway <[email protected]> Authored: Mon May 29 17:12:36 2017 -0400 Committer: Alan Conway <[email protected]> Committed: Wed May 31 10:49:36 2017 -0400 ---------------------------------------------------------------------- INSTALL.md | 11 +++ examples/c/proactor/broker.c | 8 +-- proton-c/include/proton/proactor.h | 11 +-- proton-c/src/proactor/epoll.c | 122 +++++++++++++++++--------------- proton-c/src/proactor/libuv.c | 37 +++++++--- 5 files changed, 112 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5f8738f5/INSTALL.md ---------------------------------------------------------------------- diff --git a/INSTALL.md b/INSTALL.md index 8de93fe..e5e5db6 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -137,6 +137,17 @@ Note that if you wish to build debug version of proton for use with swig bindings on Windows, you must have the appropriate debug target libraries to link against. +Other platforms +--------------- + +Proton can use the http://libuv.org IO library on any platform where +it is available. Install the libuv library and header files and adapt +the instructions for building on Linux. + +The libuv library is not required on Linux or Windows but if you wish +you can use it instead of the default native IO by running cmake with +`-Dproactor=libuv` + Installing Language Bindings ---------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5f8738f5/examples/c/proactor/broker.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c index 7d95e7f..d9285db 100644 --- a/examples/c/proactor/broker.c +++ b/examples/c/proactor/broker.c @@ -195,9 +195,8 @@ typedef struct broker_t { } broker_t; void broker_stop(broker_t *b) { - /* In this broker an interrupt stops a thread, stopping all threads stops the broker */ - for (size_t i = 0; i < b->threads; ++i) - pn_proactor_interrupt(b->proactor); + /* Interrupt the proactor to stop the working threads. */ + pn_proactor_interrupt(b->proactor); } /* Try to send if link is sender and has credit */ @@ -369,12 +368,13 @@ static void handle(broker_t* b, pn_event_t* e) { break; - case PN_PROACTOR_INACTIVE: /* listener and all connections closed */ + case PN_PROACTOR_INACTIVE: /* listener and all connections closed */ broker_stop(b); break; case PN_PROACTOR_INTERRUPT: b->finished = true; + pn_proactor_interrupt(b->proactor); /* Pass along the interrupt to the other threads */ break; default: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5f8738f5/proton-c/include/proton/proactor.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h index 861afbe..9c7ce59 100644 --- a/proton-c/include/proton/proactor.h +++ b/proton-c/include/proton/proactor.h @@ -196,12 +196,13 @@ PNP_EXTERN void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *even /** * Return a @ref PN_PROACTOR_INTERRUPT event as soon as possible. * - * Exactly one @ref PN_PROACTOR_INTERRUPT event is generated for each call to - * pn_proactor_interrupt(). If threads are blocked in pn_proactor_wait(), one - * of them will be interrupted, otherwise the interrupt will be returned by a - * future call to pn_proactor_wait(). Calling pn_proactor_interrupt(). + * At least one PN_PROACTOR_INTERRUPT event will be returned after this call. + * Interrupts can be "coalesced" - if several pn_proactor_interrupt() calls + * happen close together, there may be only one PN_PROACTOR_INTERRUPT event that + * occurs after all of them. * - * @note Thread safe + * @note Thread-safe and async-signal-safe: can be called in a signal handler. + * This is the only pn_proactor function that is async-signal-safe. */ PNP_EXTERN void pn_proactor_interrupt(pn_proactor_t *proactor); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5f8738f5/proton-c/src/proactor/epoll.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c index 7490ecd..b258da3 100644 --- a/proton-c/src/proactor/epoll.c +++ b/proton-c/src/proactor/epoll.c @@ -302,11 +302,13 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) { * thread. Conversely, a thread must never stop working without * checking if it has newly arrived work. * - * External wake operations, like pn_connection_wake() and - * pn_proactor_interrupt(), are built on top of the internal wake - * mechanism. The former coalesces multiple wakes until event - * delivery, the latter does not. The WAKEABLE implementation can be - * modeled on whichever is more suited. + * External wake operations, like pn_connection_wake() and are built on top of + * the internal wake mechanism. The former coalesces multiple wakes until event + * delivery, the latter does not. The WAKEABLE implementation can be modeled on + * whichever is more suited. + * + * pn_proactor_interrupt() must be async-signal-safe so it has a dedicated + * eventfd to allow a lock-free pn_proactor_interrupt() implementation. */ typedef enum { PROACTOR, @@ -360,10 +362,10 @@ struct pn_proactor_t { pn_collector_t *collector; pcontext_t *contexts; /* in-use contexts for PN_PROACTOR_INACTIVE and cleanup */ epoll_extended_t epoll_wake; + epoll_extended_t epoll_interrupt; pn_event_batch_t batch; - size_t interrupts; /* total pending interrupts */ - size_t deferred_interrupts; /* interrupts for current batch */ size_t disconnects_pending; /* unfinished proactor disconnects*/ + bool interrupt; bool inactive; bool timer_expired; bool timer_cancelled; @@ -375,6 +377,8 @@ struct pn_proactor_t { bool wakes_in_progress; pcontext_t *wake_list_first; pcontext_t *wake_list_last; + // Interrupts have a dedicated eventfd because they must be async-signal safe. + int interruptfd; }; static void rearm(pn_proactor_t *p, epoll_extended_t *ee); @@ -1470,6 +1474,16 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) { // proactor // ======================================================================== +/* Set up an epoll_extended_t to be used for wakeup or interrupts */ +static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd) { + ee->psocket = NULL; + ee->fd = eventfd; + ee->type = WAKE; + ee->wanted = EPOLLIN; + ee->polling = false; + start_polling(ee, epollfd); // TODO: check for error +} + pn_proactor_t *pn_proactor() { pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p)); if (!p) return NULL; @@ -1478,26 +1492,24 @@ pn_proactor_t *pn_proactor() { pmutex_init(&p->eventfd_mutex); ptimer_init(&p->timer, 0); - if ((p->epollfd = epoll_create(1)) >= 0) + if ((p->epollfd = epoll_create(1)) >= 0) { if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) { - if (p->timer.timerfd >= 0) - if ((p->collector = pn_collector()) != NULL) { - p->batch.next_event = &proactor_batch_next; - start_polling(&p->timer.epoll_io, p->epollfd); // TODO: check for error - p->timer_armed = true; - - p->epoll_wake.psocket = NULL; - p->epoll_wake.fd = p->eventfd; - p->epoll_wake.type = WAKE; - p->epoll_wake.wanted = EPOLLIN; - p->epoll_wake.polling = false; - start_polling(&p->epoll_wake, p->epollfd); // TODO: check for error - return p; - } + if ((p->interruptfd = eventfd(0, EFD_NONBLOCK)) >= 0) { + if (p->timer.timerfd >= 0) + if ((p->collector = pn_collector()) != NULL) { + p->batch.next_event = &proactor_batch_next; + start_polling(&p->timer.epoll_io, p->epollfd); // TODO: check for error + p->timer_armed = true; + epoll_wake_init(&p->epoll_wake, p->eventfd, p->epollfd); + epoll_wake_init(&p->epoll_interrupt, p->interruptfd, p->epollfd); + return p; + } + } } - + } if (p->epollfd >= 0) close(p->epollfd); if (p->eventfd >= 0) close(p->eventfd); + if (p->interruptfd >= 0) close(p->eventfd); ptimer_finalize(&p->timer); if (p->collector) pn_free(p->collector); free (p); @@ -1510,6 +1522,8 @@ void pn_proactor_free(pn_proactor_t *p) { p->epollfd = -1; close(p->eventfd); p->eventfd = -1; + close(p->interruptfd); + p->interruptfd = -1; ptimer_finalize(&p->timer); while (p->contexts) { pcontext_t *ctx = p->contexts; @@ -1551,34 +1565,23 @@ static void proactor_add_event(pn_proactor_t *p, pn_event_type_t t) { static bool proactor_update_batch(pn_proactor_t *p) { if (proactor_has_event(p)) return true; - if (p->deferred_interrupts > 0) { - // drain these first - --p->deferred_interrupts; - --p->interrupts; - proactor_add_event(p, PN_PROACTOR_INTERRUPT); - return true; - } if (p->timer_expired) { p->timer_expired = false; proactor_add_event(p, PN_PROACTOR_TIMEOUT); return true; } - - int ec = 0; - if (p->interrupts > 0) { - --p->interrupts; + if (p->interrupt) { + p->interrupt = false; proactor_add_event(p, PN_PROACTOR_INTERRUPT); - ec++; - if (p->interrupts > 0) - p->deferred_interrupts = p->interrupts; + return true; } - if (p->inactive && ec == 0) { + if (p->inactive) { p->inactive = false; - ec++; proactor_add_event(p, PN_PROACTOR_INACTIVE); + return true; } - return ec > 0; + return false; } static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { @@ -1590,10 +1593,12 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { return log_event(p, e); } -static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout) { - bool timer_fired = timeout && ptimer_callback(&p->timer) != 0; +static pn_event_batch_t *proactor_process(pn_proactor_t *p, pn_event_type_t event) { + bool timer_fired = (event == PN_PROACTOR_TIMEOUT) && ptimer_callback(&p->timer) != 0; lock(&p->context.mutex); - if (timeout) { + if (event == PN_PROACTOR_INTERRUPT) { + p->interrupt = true; + } else if (event == PN_PROACTOR_TIMEOUT) { p->timer_armed = false; if (timer_fired && !p->timer_cancelled) p->timer_expired = true; @@ -1667,17 +1672,20 @@ static bool proactor_remove(pcontext_t *ctx) { return can_free; } -static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p) { +static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t *ee) { + if (ee->fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */ + return proactor_process(p, PN_PROACTOR_INTERRUPT); + } pcontext_t *ctx = wake_pop_front(p); if (ctx) { switch (ctx->type) { - case PROACTOR: - return proactor_process(p, false); - case PCONNECTION: + case PROACTOR: + return proactor_process(p, PN_EVENT_NONE); + case PCONNECTION: return pconnection_process((pconnection_t *) ctx->owner, 0, false, false); - case LISTENER: - return listener_process(&((pn_listener_t *) ctx->owner)->psockets[0], 0); - default: + case LISTENER: + return listener_process(&((pn_listener_t *) ctx->owner)->psockets[0], 0); + default: assert(ctx->type == WAKEABLE); // TODO: implement or remove } } @@ -1710,9 +1718,9 @@ static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool can_blo epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr; if (ee->type == WAKE) { - batch = process_inbound_wake(p); + batch = process_inbound_wake(p, ee); } else if (ee->type == PROACTOR_TIMER) { - batch = proactor_process(p, true); + batch = proactor_process(p, PN_PROACTOR_TIMEOUT); } else { pconnection_t *pc = psocket_pconnection(ee->psocket); if (pc) { @@ -1772,11 +1780,11 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { } void pn_proactor_interrupt(pn_proactor_t *p) { - lock(&p->context.mutex); - ++p->interrupts; - bool notify = wake(&p->context); - unlock(&p->context.mutex); - if (notify) wake_notify(&p->context); + if (p->interruptfd == -1) + return; + uint64_t increment = 1; + if (write(p->interruptfd, &increment, sizeof(uint64_t)) != sizeof(uint64_t)) + EPOLL_FATAL("setting eventfd", errno); } void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5f8738f5/proton-c/src/proactor/libuv.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c index cf7a31b..8cd6dd7 100644 --- a/proton-c/src/proactor/libuv.c +++ b/proton-c/src/proactor/libuv.c @@ -52,7 +52,7 @@ libuv functions are thread unsafe, we use a"leader-worker-follower" model as follows: - At most one thread at a time is the "leader". The leader runs the UV loop till there - are events to process and then becomes a "worker"n + are events to process and then becomes a "worker" - Concurrent "worker" threads process events for separate connections or listeners. When they run out of work they become "followers" @@ -227,10 +227,13 @@ struct pn_listener_t { typedef enum { TM_NONE, TM_REQUEST, TM_PENDING, TM_FIRED } timeout_state_t; struct pn_proactor_t { + /* Notification */ + uv_async_t notify; + uv_async_t interrupt; + /* Leader thread */ uv_cond_t cond; uv_loop_t loop; - uv_async_t async; uv_timer_t timer; /* Owner thread: proactor collector and batch can belong to leader or a worker */ @@ -241,7 +244,6 @@ struct pn_proactor_t { uv_mutex_t lock; work_queue_t worker_q; /* ready for work, to be returned via pn_proactor_wait() */ work_queue_t leader_q; /* waiting for attention by the leader thread */ - size_t interrupt; /* pending interrupts */ timeout_state_t timeout_state; pn_millis_t timeout; size_t count; /* connection/listener count for INACTIVE events */ @@ -250,12 +252,21 @@ struct pn_proactor_t { bool inactive; bool has_leader; bool batch_working; /* batch is being processed in a worker thread */ + bool need_interrupt; /* Need a PN_PROACTOR_INTERRUPT event */ }; /* Notify the leader thread that there is something to do outside of uv_run() */ static inline void notify(pn_proactor_t* p) { - uv_async_send(&p->async); + uv_async_send(&p->notify); +} + +/* Set the interrupt flag in the leader thread to avoid race conditions. */ +void on_interrupt(uv_async_t *async) { + if (async->data) { + pn_proactor_t *p = (pn_proactor_t*)async->data; + p->need_interrupt = true; + } } /* Notify that this work item needs attention from the leader at the next opportunity */ @@ -814,8 +825,8 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) { p->inactive = false; return proactor_batch_lh(p, PN_PROACTOR_INACTIVE); } - if (p->interrupt > 0) { - --p->interrupt; + if (p->need_interrupt) { + p->need_interrupt = false; return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT); } if (p->timeout_state == TM_FIRED) { @@ -1072,10 +1083,12 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) { } void pn_proactor_interrupt(pn_proactor_t *p) { - uv_mutex_lock(&p->lock); - ++p->interrupt; - uv_mutex_unlock(&p->lock); - notify(p); + /* NOTE: pn_proactor_interrupt must be async-signal-safe so we cannot use + locks to update shared proactor state here. Instead we use a dedicated + uv_async, the on_interrupt() callback will set the interrupt flag in the + safety of the leader thread. + */ + uv_async_send(&p->interrupt); } void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { @@ -1155,7 +1168,9 @@ pn_proactor_t *pn_proactor() { uv_loop_init(&p->loop); uv_mutex_init(&p->lock); uv_cond_init(&p->cond); - uv_async_init(&p->loop, &p->async, NULL); + uv_async_init(&p->loop, &p->notify, NULL); + uv_async_init(&p->loop, &p->interrupt, on_interrupt); + p->interrupt.data = p; uv_timer_init(&p->loop, &p->timer); p->timer.data = p; p->disconnect_cond = pn_condition(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
