c proactor: improved robustness and testing Added assert self-tests to the libuv.c proactor. The assertions are kept in a release build to fail fast and aid debugging.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ec70d73d Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ec70d73d Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ec70d73d Branch: refs/heads/master Commit: ec70d73dd5e5b58eaf64f5e137104fd9d4042e70 Parents: afacb16 Author: Alan Conway <[email protected]> Authored: Fri Feb 10 21:44:25 2017 -0500 Committer: Alan Conway <[email protected]> Committed: Fri Feb 10 21:49:59 2017 -0500 ---------------------------------------------------------------------- proton-c/src/proactor/libuv.c | 557 ++++++++++++++++++++----------------- 1 file changed, 306 insertions(+), 251 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ec70d73d/proton-c/src/proactor/libuv.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c index 42bbfab..6064bd6 100644 --- a/proton-c/src/proactor/libuv.c +++ b/proton-c/src/proactor/libuv.c @@ -30,7 +30,10 @@ #include <proton/transport.h> #include <proton/url.h> +/* All asserts are cheap and should remain in a release build for debugability */ +#undef NDEBUG #include <assert.h> + #include <stddef.h> #include <stdio.h> #include <stdlib.h> @@ -58,12 +61,10 @@ wake-up to be processed in a single thread with no context switches. Function naming: - - on_ - called in leader thread via uv_run(). - - leader_ - called in leader thread, while processing the leader_q. - - owner_ - called in owning thread, leader or worker but not concurrently. - - Note on_ and leader_ functions can call each other, the prefix indicates the - path they are most often called on. + - on_* - called in leader thread by uv_run(). + - leader_* - called in leader thread (either leader_q processing or from an on_ function) + - worker_* - called in worker thread + - *_lh - called with the relevant lock held */ const char *COND_NAME = "proactor"; @@ -80,6 +81,13 @@ PN_HANDLE(PN_PROACTOR) PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor) PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener) +/* A psocket (connection or listener) has the following *mutually exclusive* states. */ +typedef enum { + ON_WORKER, /* On worker_q or in use by user code in worker thread */ + ON_LEADER, /* On leader_q or in use the leader loop */ + ON_UV /* Scheduled for a UV event, or in use by leader thread in on_ handler*/ +} psocket_state_t; + /* common to connection and listener */ typedef struct psocket_t { /* Immutable */ @@ -87,14 +95,16 @@ typedef struct psocket_t { /* Protected by proactor.lock */ struct psocket_t* next; - void (*wakeup)(struct psocket_t*); /* interrupting action for leader */ + psocket_state_t state; + void (*action)(struct psocket_t*); /* deferred action for leader */ + void (*wakeup)(struct psocket_t*); /* wakeup action for leader */ - /* Only used by leader */ + /* Only used by leader when it owns the psocket */ uv_tcp_t tcp; - void (*action)(struct psocket_t*); /* deferred action for leader */ - bool is_conn:1; char host[NI_MAXHOST]; char port[NI_MAXSERV]; + bool is_conn; + } psocket_t; /* Special value for psocket.next pointer when socket is not on any any list. */ @@ -105,11 +115,12 @@ static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const ch ps->next = &UNLISTED; ps->is_conn = is_conn; ps->tcp.data = ps; + ps->state = ON_WORKER; /* For platforms that don't know about "amqp" and "amqps" service names. */ - if (strcmp(port, AMQP_PORT_NAME) == 0) + if (port && strcmp(port, AMQP_PORT_NAME) == 0) port = AMQP_PORT; - else if (strcmp(port, AMQPS_PORT_NAME) == 0) + else if (port && strcmp(port, AMQPS_PORT_NAME) == 0) port = AMQPS_PORT; /* Set to "\001" to indicate a NULL as opposed to an empty string "" */ strncpy(ps->host, host ? host : "\001", sizeof(ps->host)); @@ -132,24 +143,27 @@ typedef struct pconnection_t { uv_timer_t timer; uv_write_t write; uv_shutdown_t shutdown; - size_t writing; - bool reading:1; - bool server:1; /* accept, not connect */ + size_t writing; /* size of pending write request, 0 if none pending */ + bool reading; /* true if a read request is pending */ + bool server; /* accept, not connect */ } pconnection_t; struct pn_listener_t { psocket_t psocket; /* Only used by owner thread */ - pconnection_t *accepting; /* accept in progress */ + pconnection_t *accepting; /* set in worker, used in UV loop for accept */ pn_condition_t *condition; pn_collector_t *collector; pn_event_batch_t batch; pn_record_t *attachments; void *context; size_t backlog; -}; + bool closing; /* close requested or closed by error */ + /* Only used in leader thread */ + size_t connections; /* number of connections waiting to be accepted */ +}; typedef struct queue { psocket_t *front, *back; } queue; @@ -166,17 +180,16 @@ struct pn_proactor_t { /* Protected by lock */ uv_mutex_t lock; - queue start_q; - queue worker_q; - queue leader_q; + queue worker_q; /* psockets ready for work, to be returned via pn_proactor_wait() */ + queue leader_q; /* psockets waiting for attention by the leader thread */ size_t interrupt; /* pending interrupts */ pn_millis_t timeout; size_t count; /* psocket count */ - bool inactive:1; - bool timeout_request:1; - bool timeout_elapsed:1; - bool has_leader:1; - bool batch_working:1; /* batch belongs to a worker. */ + bool inactive; + bool timeout_request; + bool timeout_elapsed; + bool has_leader; + bool batch_working; /* batch is being processed in a worker thread */ }; static bool push_lh(queue *q, psocket_t *ps) { @@ -201,90 +214,46 @@ static psocket_t* pop_lh(queue *q) { return ps; } -static inline pconnection_t *as_pconnection_t(psocket_t* ps) { - return ps->is_conn ? (pconnection_t*)ps : NULL; -} - -static inline pn_listener_t *as_listener(psocket_t* ps) { - return ps->is_conn ? NULL: (pn_listener_t*)ps; -} - -/* Put ps on the leader queue for processing. Thread safe. */ -static void to_leader_lh(psocket_t *ps) { - push_lh(&ps->proactor->leader_q, ps); - uv_async_send(&ps->proactor->async); /* Wake leader */ -} - -static void to_leader(psocket_t *ps) { - uv_mutex_lock(&ps->proactor->lock); - to_leader_lh(ps); - uv_mutex_unlock(&ps->proactor->lock); -} - -/* Detach from IO and put ps on the worker queue */ -static void leader_to_worker(psocket_t *ps) { - if (ps->is_conn) { - pconnection_t *pc = as_pconnection_t(ps); - /* Don't detach if there are no events yet. */ - if (pn_connection_driver_has_event(&pc->driver)) { - if (pc->writing) { - pc->writing = 0; - uv_cancel((uv_req_t*)&pc->write); - } - if (pc->reading) { - pc->reading = false; - uv_read_stop((uv_stream_t*)&pc->psocket.tcp); - } - if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) { - uv_timer_stop(&pc->timer); - } - } - } else { - pn_listener_t *l = as_listener(ps); - uv_read_stop((uv_stream_t*)&l->psocket.tcp); - } - uv_mutex_lock(&ps->proactor->lock); - push_lh(&ps->proactor->worker_q, ps); - uv_mutex_unlock(&ps->proactor->lock); -} - -/* Set a deferred action for leader, if not already set. */ -static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) { - uv_mutex_lock(&ps->proactor->lock); - if (!ps->action) { +/* Set state and action and push to relevant queue */ +static inline void set_state_lh(psocket_t *ps, psocket_state_t state, void (*action)(psocket_t*)) { + /* Illegal if ps is already listed under a different state */ + assert(ps->next == &UNLISTED || ps->state == state); + ps->state = state; + if (action && !ps->action) { ps->action = action; } - to_leader_lh(ps); - uv_mutex_unlock(&ps->proactor->lock); + switch(state) { + case ON_LEADER: push_lh(&ps->proactor->leader_q, ps); break; + case ON_WORKER: push_lh(&ps->proactor->worker_q, ps); break; + case ON_UV: + assert(ps->next == &UNLISTED); + break; /* No queue for UV loop */ + } } -/* Owner thread send to worker thread. Set deferred action if not already set. */ -static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) { +/* Set state and action, push to queue and notify leader. Thread safe. */ +static void set_state(psocket_t *ps, psocket_state_t state, void (*action)(psocket_t*)) { uv_mutex_lock(&ps->proactor->lock); - if (!ps->action) { - ps->action = action; - } - push_lh(&ps->proactor->worker_q, ps); - uv_async_send(&ps->proactor->async); /* Wake leader */ + set_state_lh(ps, state, action); + uv_async_send(&ps->proactor->async); uv_mutex_unlock(&ps->proactor->lock); } +static inline pconnection_t *as_pconnection(psocket_t* ps) { + return ps->is_conn ? (pconnection_t*)ps : NULL; +} -/* Re-queue for further work */ -static void worker_requeue(psocket_t* ps) { - uv_mutex_lock(&ps->proactor->lock); - push_lh(&ps->proactor->worker_q, ps); - uv_async_send(&ps->proactor->async); /* Wake leader */ - uv_mutex_unlock(&ps->proactor->lock); +static inline pn_listener_t *as_listener(psocket_t* ps) { + return ps->is_conn ? NULL: (pn_listener_t*)ps; } -static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) { +static pconnection_t *new_pconnection(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) { pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc)); - if (!pc) return NULL; - if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) { + if (!pc || pn_connection_driver_init(&pc->driver, c, NULL) != 0) { return NULL; } psocket_init(&pc->psocket, p, true, host, port); + pc->write.data = &pc->psocket; if (server) { pn_transport_set_server(pc->driver.transport); } @@ -319,74 +288,49 @@ static void leader_count(pn_proactor_t *p, int change) { uv_mutex_unlock(&p->lock); } -/* Free if there are no uv callbacks pending and no events */ -static void leader_pconnection_t_maybe_free(pconnection_t *pc) { - if (pn_connection_driver_has_event(&pc->driver)) { - leader_to_worker(&pc->psocket); /* Return to worker */ - } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) { - /* All UV requests are finished */ - pn_connection_driver_destroy(&pc->driver); - leader_count(pc->psocket.proactor, -1); - free(pc); - } +/* Final close event for a a pconnection_t */ +static void on_close_pconnection_final(uv_handle_t *h) { + pconnection_t *pc = (pconnection_t*)h->data; + free(pc); } -/* Free if there are no uv callbacks pending and no events */ -static void leader_listener_maybe_free(pn_listener_t *l) { - if (pn_collector_peek(l->collector)) { - leader_to_worker(&l->psocket); /* Return to worker */ - } else if (!l->psocket.tcp.data) { - pn_condition_free(l->condition); - leader_count(l->psocket.proactor, -1); - free(l); - } -} - -/* Free if there are no uv callbacks pending and no events */ -static void leader_maybe_free(psocket_t *ps) { - if (ps->is_conn) { - leader_pconnection_t_maybe_free(as_pconnection_t(ps)); - } else { - leader_listener_maybe_free(as_listener(ps)); - } +/* Close event for uv_tcp_t of a pconnection_t */ +static void on_close_pconnection(uv_handle_t *h) { + pconnection_t *pc = (pconnection_t*)h->data; + assert(pc->psocket.state == ON_UV); + leader_count(pc->psocket.proactor, -1); + pn_connection_driver_destroy(&pc->driver); + uv_timer_stop(&pc->timer); + /* Close the timer with the final event to free the pconnection_t */ + uv_close((uv_handle_t*)&pc->timer, on_close_pconnection_final); } -static void on_close(uv_handle_t *h) { - psocket_t *ps = (psocket_t*)h->data; - h->data = NULL; /* Mark closed */ - leader_maybe_free(ps); +/* Close event for uv_tcp_t of a pn_listener_t */ +static void on_close_listener(uv_handle_t *h) { + pn_listener_t *l = (pn_listener_t*)h->data; + pn_condition_free(l->condition); + free(l); } -static void on_shutdown(uv_shutdown_t *shutdown, int err) { - psocket_t *ps = (psocket_t*)shutdown->data; - shutdown->data = NULL; /* Mark closed */ - leader_maybe_free(ps); +static inline void leader_finished(psocket_t *ps) { + set_state(ps, ON_UV, NULL); + uv_close((uv_handle_t*)&ps->tcp, ps->is_conn ? on_close_pconnection : on_close_listener); } -static inline void leader_close(psocket_t *ps) { - if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) { - uv_close((uv_handle_t*)&ps->tcp, on_close); - } - pconnection_t *pc = as_pconnection_t(ps); - if (pc) { - pn_connection_driver_close(&pc->driver); - if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) { - uv_timer_stop(&pc->timer); - uv_close((uv_handle_t*)&pc->timer, on_close); - } +static pconnection_t *get_pconnection(pn_connection_t* c) { + if (!c) { + return NULL; } - leader_maybe_free(ps); -} - -static pconnection_t *get_pconnection_t(pn_connection_t* c) { - if (!c) return NULL; pn_record_t *r = pn_connection_attachments(c); return (pconnection_t*) pn_record_get(r, PN_PROACTOR); } +static void leader_unwatch(psocket_t *ps); + static void leader_error(psocket_t *ps, int err, const char* what) { + assert(ps->state != ON_WORKER); if (ps->is_conn) { - pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver; + pn_connection_driver_t *driver = &as_pconnection(ps)->driver; pn_connection_driver_bind(driver); /* Bind so errors will be reported */ pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s", what, fixstr(ps->host), fixstr(ps->port), @@ -398,16 +342,18 @@ static void leader_error(psocket_t *ps, int err, const char* what) { what, fixstr(ps->host), fixstr(ps->port), uv_strerror(err)); pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE); + l->closing = true; } - leader_to_worker(ps); /* Worker to handle the error */ + leader_unwatch(ps); /* Worker to handle the error */ } /* uv-initialization */ static int leader_init(psocket_t *ps) { + ps->state = ON_LEADER; leader_count(ps->proactor, +1); int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp); if (!err) { - pconnection_t *pc = as_pconnection_t(ps); + pconnection_t *pc = as_pconnection(ps); if (pc) { pc->connect.data = ps; int err = uv_timer_init(&ps->proactor->loop, &pc->timer); @@ -422,40 +368,53 @@ static int leader_init(psocket_t *ps) { return err; } -/* Common logic for on_connect and on_accept */ -static void leader_connect_accept(pconnection_t *pc, int err, const char *what) { +/* Outgoing connection */ +static void on_connect(uv_connect_t *connect, int err) { + pconnection_t *pc = (pconnection_t*)connect->data; + assert(pc->psocket.state == ON_UV); if (!err) { - leader_to_worker(&pc->psocket); + leader_unwatch(&pc->psocket); } else { - leader_error(&pc->psocket, err, what); + leader_error(&pc->psocket, err, "on connect to"); } } -static void on_connect(uv_connect_t *connect, int err) { - leader_connect_accept((pconnection_t*)connect->data, err, "on connect"); -} - -static void on_accept(uv_stream_t* server, int err) { +/* Incoming connection ready to be accepted */ +static void on_connection(uv_stream_t* server, int err) { + /* Unlike most on_* functions, this one can be called by the leader thrad when the + * listener is ON_WORKER, because there's no way to stop libuv from calling + * on_connection() in leader_unwatch(). Just increase a counter and deal with it in the + * worker thread. + */ pn_listener_t *l = (pn_listener_t*) server->data; - if (err) { - leader_error(&l->psocket, err, "on accept"); + assert(l->psocket.state == ON_UV); + if (!err) { + ++l->connections; + leader_unwatch(&l->psocket); + } else { + leader_error(&l->psocket, err, "on incoming connection from"); } - pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT); - leader_to_worker(&l->psocket); /* Let user call pn_listener_accept */ } -static void leader_accept(psocket_t *ps) { - pn_listener_t * l = as_listener(ps); +static void leader_accept(pn_listener_t * l) { + assert(l->psocket.state == ON_UV); + assert(l->accepting); pconnection_t *pc = l->accepting; l->accepting = NULL; - if (pc) { - int err = leader_init(&pc->psocket); - if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp); - leader_connect_accept(pc, err, "on accept"); + int err = leader_init(&pc->psocket); + if (!err) { + err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp); + } + if (!err) { + leader_unwatch(&pc->psocket); + } else { + leader_error(&pc->psocket, err, "accepting from"); + leader_error(&l->psocket, err, "accepting from"); } } static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) { + assert(ps->state == ON_LEADER); int err = leader_init(ps); struct addrinfo hints = { 0 }; if (server) hints.ai_flags = AI_PASSIVE; @@ -466,55 +425,75 @@ static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) { } static void leader_connect(psocket_t *ps) { - pconnection_t *pc = as_pconnection_t(ps); + assert(ps->state == ON_LEADER); + pconnection_t *pc = as_pconnection(ps); uv_getaddrinfo_t info; int err = leader_resolve(ps, &info, false); if (!err) { err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect); uv_freeaddrinfo(info.addrinfo); } - if (err) { - leader_error(ps, err, "connect to"); + if (!err) { + ps->state = ON_UV; + } else { + leader_error(ps, err, "connecting to"); } } static void leader_listen(psocket_t *ps) { + assert(ps->state == ON_LEADER); pn_listener_t *l = as_listener(ps); - uv_getaddrinfo_t info; + uv_getaddrinfo_t info; int err = leader_resolve(ps, &info, true); if (!err) { err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0); uv_freeaddrinfo(info.addrinfo); } - if (!err) err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept); - if (err) { - leader_error(ps, err, "listen on "); + if (!err) { + err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_connection); + } + if (!err) { + set_state(ps, ON_UV, NULL); + } else { + leader_error(ps, err, "listening on"); } } -static void on_tick(uv_timer_t *timer) { - pconnection_t *pc = (pconnection_t*)timer->data; +/* Generate tick events and return millis till next tick or 0 if no tick is required */ +static pn_millis_t leader_tick(pconnection_t *pc) { + assert(pc->psocket.state != ON_WORKER); pn_transport_t *t = pc->driver.transport; if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) { - uv_timer_stop(&pc->timer); uint64_t now = uv_now(pc->timer.loop); uint64_t next = pn_transport_tick(t, now); - if (next) { - uv_timer_start(&pc->timer, on_tick, next - now, 0); - } + return next ? next - now : 0; + } + return 0; +} + +static void on_tick(uv_timer_t *timer) { + if (!timer->data) return; /* timer closed */ + pconnection_t *pc = (pconnection_t*)timer->data; + assert(pc->psocket.state == ON_UV); + uv_timer_stop(&pc->timer); + pn_millis_t next = leader_tick(pc); + if (pn_connection_driver_has_event(&pc->driver)) { + leader_unwatch(&pc->psocket); + } else if (next) { + uv_timer_start(&pc->timer, on_tick, next, 0); } } static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { pconnection_t *pc = (pconnection_t*)stream->data; + assert(pc->psocket.state == ON_UV); if (nread >= 0) { pn_connection_driver_read_done(&pc->driver, nread); on_tick(&pc->timer); /* check for tick changes. */ - leader_to_worker(&pc->psocket); /* Reading continues automatically until stopped. */ } else if (nread == UV_EOF) { /* hangup */ pn_connection_driver_read_close(&pc->driver); - leader_maybe_free(&pc->psocket); + leader_unwatch(&pc->psocket); } else { leader_error(&pc->psocket, nread, "on read from"); } @@ -522,16 +501,17 @@ static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { static void on_write(uv_write_t* write, int err) { pconnection_t *pc = (pconnection_t*)write->data; - write->data = NULL; + assert(pc->psocket.state == ON_UV); + size_t writing = pc->writing; + pc->writing = 0; /* This write is done regardless of outcome */ if (err == 0) { - pn_connection_driver_write_done(&pc->driver, pc->writing); - leader_to_worker(&pc->psocket); + pn_connection_driver_write_done(&pc->driver, writing); + leader_unwatch(&pc->psocket); } else if (err == UV_ECANCELED) { - leader_maybe_free(&pc->psocket); + leader_unwatch(&pc->psocket); /* cancelled by leader_unwatch, complete the job */ } else { leader_error(&pc->psocket, err, "on write to"); } - pc->writing = 0; /* Need to send a new write request */ } static void on_timeout(uv_timer_t *timer) { @@ -544,47 +524,93 @@ static void on_timeout(uv_timer_t *timer) { // Read buffer allocation function for uv, just returns the transports read buffer. static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) { pconnection_t *pc = (pconnection_t*)stream->data; + assert(pc->psocket.state == ON_UV); pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); *buf = uv_buf_init(rbuf.start, rbuf.size); } -static void leader_rewatch(psocket_t *ps) { +/* Monitor a socket in the UV loop */ +static void leader_watch(psocket_t *ps) { + assert(ps->state == ON_LEADER); int err = 0; + set_state(ps, ON_UV, NULL); /* Assume we are going to UV loop unless sent to worker or leader. */ + if (ps->is_conn) { - pconnection_t *pc = as_pconnection_t(ps); - if (pc->timer.data) { /* uv-initialized */ - on_tick(&pc->timer); /* Re-enable ticks if required */ + pconnection_t *pc = as_pconnection(ps); + if (pn_connection_driver_finished(&pc->driver)) { + leader_finished(ps); + return; } + pn_millis_t next_tick = leader_tick(pc); pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); - - /* Ticks and checking buffers can generate events, process before proceeding */ if (pn_connection_driver_has_event(&pc->driver)) { - leader_to_worker(ps); - } else { /* Re-watch for IO */ - if (wbuf.size > 0 && !pc->writing) { - pc->writing = wbuf.size; - uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size); - pc->write.data = ps; - uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write); - } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) { - pc->shutdown.data = ps; - uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown); - } - if (rbuf.size > 0 && !pc->reading) { - pc->reading = true; - err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read); - } + /* Ticks and checking buffers have generated events, send back to worker to process */ + set_state(ps, ON_WORKER, NULL); + return; + } + if (next_tick) { + uv_timer_start(&pc->timer, on_tick, next_tick, 0); + } + if (wbuf.size > 0 && !pc->writing) { + pc->writing = wbuf.size; + uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size); + err = uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write); + } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) { + pc->shutdown.data = ps; + err = uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL); + } + if (rbuf.size > 0 && !pc->reading) { + pc->reading = true; + err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read); } } else { pn_listener_t *l = as_listener(ps); - err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept); + if (l->closing && pn_collector_peek(l->collector)) { + leader_finished(&l->psocket); + } else { + if (l->accepting) { + leader_accept(l); + } + if (l->connections) { + leader_unwatch(ps); + } + } } if (err) { - leader_error(ps, err, "rewatch"); + leader_error(ps, err, "re-watching"); } } +/* Detach a socket from IO and put it on the worker queue */ +static void leader_unwatch(psocket_t *ps) { + assert(ps->state != ON_WORKER); /* From ON_UV or ON_LEADER */ + if (ps->is_conn) { + pconnection_t *pc = as_pconnection(ps); + if (!pn_connection_driver_has_event(&pc->driver)) { + /* Don't return an empty event batch */ + if (ps->state == ON_UV) { + return; /* Just leave it in the UV loop */ + } else { + leader_watch(ps); /* Re-attach to UV loop */ + } + return; + } else { + if (pc->writing) { + uv_cancel((uv_req_t*)&pc->write); + } + if (pc->reading) { + pc->reading = false; + uv_read_stop((uv_stream_t*)&pc->psocket.tcp); + } + if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) { + uv_timer_stop(&pc->timer); + } + } + } + set_state(ps, ON_WORKER, NULL); +} + /* Set the event in the proactor's batch */ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) { pn_collector_put(p->collector, pn_proactor__class(), p, t); @@ -609,23 +635,32 @@ static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) { } } for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) { + assert(ps->state == ON_WORKER); if (ps->is_conn) { - pconnection_t *pc = as_pconnection_t(ps); + pconnection_t *pc = as_pconnection(ps); return &pc->driver.batch; } else { /* Listener */ pn_listener_t *l = as_listener(ps); + /* Generate accept events one at a time */ + if (l->connections && !pn_collector_peek(l->collector)) { + --l->connections; + pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT); + } return &l->batch; } - to_leader(ps); /* No event, back to leader */ + set_state_lh(ps, ON_LEADER, NULL); /* No event, back to leader */ } return 0; } -/* Called in any thread to set a wakeup action. Replaces any previous wakeup action. */ +/* Called in any thread to set a wakeup action */ static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) { uv_mutex_lock(&ps->proactor->lock); - ps->wakeup = action; - to_leader_lh(ps); + if (action && !ps->wakeup) { + ps->wakeup = action; + } + set_state_lh(ps, ON_LEADER, NULL); + uv_async_send(&ps->proactor->async); /* Wake leader */ uv_mutex_unlock(&ps->proactor->lock); } @@ -634,30 +669,36 @@ pn_listener_t *pn_event_listener(pn_event_t *e) { } pn_proactor_t *pn_event_proactor(pn_event_t *e) { - if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e); + if (pn_event_class(e) == pn_proactor__class()) { + return (pn_proactor_t*)pn_event_context(e); + } pn_listener_t *l = pn_event_listener(e); - if (l) return l->psocket.proactor; + if (l) { + return l->psocket.proactor; + } pn_connection_t *c = pn_event_connection(e); - if (c) return pn_connection_proactor(pn_event_connection(e)); + if (c) { + return pn_connection_proactor(pn_event_connection(e)); + } return NULL; } void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { pconnection_t *pc = batch_pconnection(batch); if (pc) { + assert(pc->psocket.state == ON_WORKER); if (pn_connection_driver_has_event(&pc->driver)) { - /* Process all events before going back to IO. */ - worker_requeue(&pc->psocket); - } else if (pn_connection_driver_finished(&pc->driver)) { - owner_to_leader(&pc->psocket, leader_close); + /* Process all events before going back to leader */ + set_state(&pc->psocket, ON_WORKER, NULL); } else { - owner_to_leader(&pc->psocket, leader_rewatch); + set_state(&pc->psocket, ON_LEADER, leader_watch); } return; } pn_listener_t *l = batch_listener(batch); if (l) { - owner_to_leader(&l->psocket, leader_rewatch); + assert(l->psocket.state == ON_WORKER); + set_state(&l->psocket, ON_LEADER, leader_watch); return; } pn_proactor_t *bp = batch_proactor(batch); @@ -692,14 +733,16 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { } } for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) { - void (*action)(psocket_t*) = ps->action; - void (*wakeup)(psocket_t*) = ps->wakeup; - ps->action = NULL; - ps->wakeup = NULL; - if (action || wakeup) { + assert(ps->state == ON_LEADER); + if (ps->wakeup) { + uv_mutex_unlock(&p->lock); + ps->wakeup(ps); + ps->wakeup = NULL; + uv_mutex_lock(&p->lock); + } else if (ps->action) { uv_mutex_unlock(&p->lock); - if (action) action(ps); - if (wakeup) wakeup(ps); + ps->action(ps); + ps->action = NULL; uv_mutex_lock(&p->lock); } } @@ -734,12 +777,11 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) { } int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) { - pconnection_t *pc = new_pconnection_t(p, c, false, host, port); + pconnection_t *pc = new_pconnection(p, c, false, host, port); if (!pc) { return PN_OUT_OF_MEMORY; } - /* Process PN_CONNECTION_INIT before binding */ - owner_to_worker(&pc->psocket, leader_connect); + set_state(&pc->psocket, ON_LEADER, leader_connect); return 0; } @@ -747,24 +789,26 @@ int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, con { psocket_init(&l->psocket, p, false, host, port); l->backlog = backlog; - owner_to_leader(&l->psocket, leader_listen); + set_state(&l->psocket, ON_LEADER, leader_listen); return 0; } pn_proactor_t *pn_connection_proactor(pn_connection_t* c) { - pconnection_t *pc = get_pconnection_t(c); + pconnection_t *pc = get_pconnection(c); return pc ? pc->psocket.proactor : NULL; } void leader_wake_connection(psocket_t *ps) { - pconnection_t *pc = as_pconnection_t(ps); + assert(ps->state == ON_LEADER); + pconnection_t *pc = as_pconnection(ps); pn_connection_t *c = pc->driver.connection; pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE); - leader_to_worker(ps); + leader_unwatch(ps); } void pn_connection_wake(pn_connection_t* c) { - wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection); + /* May be called from any thread */ + wakeup(&get_pconnection(c)->psocket, leader_wake_connection); } pn_proactor_t *pn_proactor() { @@ -782,9 +826,11 @@ pn_proactor_t *pn_proactor() { } static void on_stopping(uv_handle_t* h, void* v) { - uv_close(h, NULL); /* Close this handle */ + if (!uv_is_closing(h)) { + uv_close(h, NULL); /* Close this handle */ + } if (!uv_loop_alive(h->loop)) /* Everything closed */ - uv_stop(h->loop); /* Stop the loop, pn_proactor_destroy() can return */ + uv_stop(h->loop); /* Stop the loop, pn_proactor_destroy() can return */ } void pn_proactor_free(pn_proactor_t *p) { @@ -799,10 +845,7 @@ void pn_proactor_free(pn_proactor_t *p) { static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { pn_listener_t *l = batch_listener(batch); - pn_event_t *handled = pn_collector_prev(l->collector); - if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) { - owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */ - } + assert(l->psocket.state == ON_WORKER); return pn_collector_next(l->collector); } @@ -811,6 +854,7 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { } static void pn_listener_free(pn_listener_t *l) { + assert(l->psocket.state == ON_WORKER); if (l) { if (!l->collector) pn_collector_free(l->collector); if (!l->condition) pn_condition_free(l->condition); @@ -834,40 +878,51 @@ pn_listener_t *pn_listener() { return l; } +void leader_listener_close(psocket_t *ps) { + assert(ps->state = ON_LEADER); + pn_listener_t *l = (pn_listener_t*)ps; + l->closing = true; + leader_watch(ps); +} + void pn_listener_close(pn_listener_t* l) { - wakeup(&l->psocket, leader_close); + /* This can be called from any thread, not just the owner of l */ + wakeup(&l->psocket, leader_listener_close); } pn_proactor_t *pn_listener_proactor(pn_listener_t* l) { + assert(l->psocket.state == ON_WORKER); return l ? l->psocket.proactor : NULL; } pn_condition_t* pn_listener_condition(pn_listener_t* l) { + assert(l->psocket.state == ON_WORKER); return l->condition; } void *pn_listener_get_context(pn_listener_t *l) { + assert(l->psocket.state == ON_WORKER); return l->context; } void pn_listener_set_context(pn_listener_t *l, void *context) { + assert(l->psocket.state == ON_WORKER); l->context = context; } pn_record_t *pn_listener_attachments(pn_listener_t *l) { + assert(l->psocket.state == ON_WORKER); return l->attachments; } int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) { + assert(l->psocket.state == ON_WORKER); if (l->accepting) { return PN_STATE_ERR; /* Only one at a time */ } - l->accepting = new_pconnection_t( - l->psocket.proactor, c, true, l->psocket.host, l->psocket.port); + l->accepting = new_pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port); if (!l->accepting) { return UV_ENOMEM; } - owner_to_leader(&l->psocket, leader_accept); return 0; } - --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
