PROTON-1766: [c] Remove use of pn_record_t in libuv proactor.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/391397f5 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/391397f5 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/391397f5 Branch: refs/heads/master Commit: 391397f59d53e4f6ed72e0bf97b114bc96e2f695 Parents: 103cdd5 Author: Alan Conway <[email protected]> Authored: Wed Feb 21 16:49:28 2018 -0500 Committer: Alan Conway <[email protected]> Committed: Wed Feb 21 17:02:00 2018 -0500 ---------------------------------------------------------------------- proton-c/src/proactor/libuv.c | 52 +++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/391397f5/proton-c/src/proactor/libuv.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c index edbe214..3d6c9a4 100644 --- a/proton-c/src/proactor/libuv.c +++ b/proton-c/src/proactor/libuv.c @@ -32,7 +32,6 @@ #include <proton/engine.h> #include <proton/listener.h> #include <proton/message.h> -#include <proton/object.h> #include <proton/proactor.h> #include <proton/transport.h> @@ -73,9 +72,6 @@ const char *AMQP_PORT = "5672"; const char *AMQP_PORT_NAME = "amqp"; -/* Record id for proactor attachments to pn_connection_t */ -PN_HANDLE(PN_PROACTOR) - /* pn_proactor_t and pn_listener_t are plain C structs with normal memory management. CLASSDEF is for identification when used as a pn_event_t context. */ @@ -298,6 +294,36 @@ static void parse_addr(addr_t *addr, const char *str) { pni_parse_addr(str, addr->addr_buf, sizeof(addr->addr_buf), &addr->host, &addr->port); } +/* Protect read/update of pn_connnection_t pointer to it's pconnection_t + * + * Global because pn_connection_wake()/pn_connection_proactor() navigate from + * the pn_connection_t before we know the proactor or driver. Critical sections + * are small: only get/set of the pn_connection_t driver pointer. + * + * TODO: replace mutex with atomic load/store + */ +static pthread_mutex_t driver_ptr_mutex; + +static uv_once_t global_init_once = UV_ONCE_INIT; +static void global_init_fn() { /* Call via uv_once(&global_init_once, global_init_fn) */ + uv_mutex_init(&driver_ptr_mutex); +} + +static pconnection_t *get_pconnection(pn_connection_t* c) { + if (!c) return NULL; + uv_mutex_lock(&driver_ptr_mutex); + pn_connection_driver_t *d = *pn_connection_driver_ptr(c); + uv_mutex_unlock(&driver_ptr_mutex); + if (!d) return NULL; + return (pconnection_t*)((char*)d-offsetof(pconnection_t, driver)); +} + +static void set_pconnection(pn_connection_t* c, pconnection_t *pc) { + uv_mutex_lock(&driver_ptr_mutex); + *pn_connection_driver_ptr(c) = pc ? &pc->driver : NULL; + uv_mutex_unlock(&driver_ptr_mutex); +} + static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, bool server) { pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc)); if (!pc || pn_connection_driver_init(&pc->driver, c, t) != 0) { @@ -310,15 +336,13 @@ static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, pn_trans if (server) { pn_transport_set_server(pc->driver.transport); } - pn_record_t *r = pn_connection_attachments(pc->driver.connection); - pn_record_def(r, PN_PROACTOR, PN_VOID); - pn_record_set(r, PN_PROACTOR, pc); + set_pconnection(pc->driver.connection, pc); return pc; } static void pconnection_free(pconnection_t *pc) { pn_connection_t *c = pc->driver.connection; - if (c) pn_record_set(pn_connection_attachments(c), PN_PROACTOR, NULL); + if (c) set_pconnection(c, NULL); pn_connection_driver_destroy(&pc->driver); if (pc->addr.getaddrinfo.addrinfo) { uv_freeaddrinfo(pc->addr.getaddrinfo.addrinfo); /* Interrupted after resolve */ @@ -420,14 +444,6 @@ static void on_close_lsocket(uv_handle_t *h) { free(ls); } -static pconnection_t *get_pconnection(pn_connection_t* c) { - if (!c) { - return NULL; - } - pn_record_t *r = pn_connection_attachments(c); - return (pconnection_t*) pn_record_get(r, PN_PROACTOR); -} - /* Remember the first error code from a bad connect attempt. * This is not yet a full-blown error as we might succeed connecting * to a different address if there are several. @@ -1175,6 +1191,7 @@ static void work_free(work_t *w) { } pn_proactor_t *pn_proactor() { + uv_once(&global_init_once, global_init_fn); pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(pn_proactor_t)); p->collector = pn_collector(); if (!p->collector) { @@ -1240,8 +1257,7 @@ void pn_connection_wake(pn_connection_t* c) { void pn_proactor_release_connection(pn_connection_t *c) { pconnection_t *pc = get_pconnection(c); if (pc) { - pn_record_t *r = pn_connection_attachments(c); - pn_record_set(r, PN_PROACTOR, NULL); /* Clear the reference from the connection */ + set_pconnection(c, NULL); pn_connection_driver_release_connection(&pc->driver); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
