Repository: qpid-proton Updated Branches: refs/heads/master e2be27cef -> 6df8ad351
PROTON-800: proton-c reactor fixes for Windows, destructor ordering and selectables handling Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6df8ad35 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6df8ad35 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6df8ad35 Branch: refs/heads/master Commit: 6df8ad3511e81b3b674e857632af35dd2e3e5883 Parents: e2be27c Author: Clifford Jansen <[email protected]> Authored: Wed Jan 21 01:23:47 2015 -0800 Committer: Clifford Jansen <[email protected]> Committed: Wed Jan 21 01:23:47 2015 -0800 ---------------------------------------------------------------------- proton-c/src/reactor/reactor.c | 4 +- proton-c/src/windows/iocp.c | 11 +++++- proton-c/src/windows/iocp.h | 2 + proton-c/src/windows/selector.c | 72 ++++++++++++++++++++---------------- 4 files changed, 55 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6df8ad35/proton-c/src/reactor/reactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c index bdee2de..b9346d2 100644 --- a/proton-c/src/reactor/reactor.c +++ b/proton-c/src/reactor/reactor.c @@ -76,12 +76,12 @@ static void pn_reactor_initialize(pn_reactor_t *reactor) { static void pn_reactor_finalize(pn_reactor_t *reactor) { pn_decref(reactor->attachments); - pn_decref(reactor->selector); - pn_decref(reactor->io); pn_decref(reactor->collector); pn_decref(reactor->handler); pn_decref(reactor->children); pn_decref(reactor->timer); + pn_decref(reactor->selector); + pn_decref(reactor->io); } #define pn_reactor_hashcode NULL http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6df8ad35/proton-c/src/windows/iocp.c ---------------------------------------------------------------------- diff --git a/proton-c/src/windows/iocp.c b/proton-c/src/windows/iocp.c index 6acff41..3feec01 100644 --- a/proton-c/src/windows/iocp.c +++ b/proton-c/src/windows/iocp.c @@ -733,7 +733,6 @@ static iocpdesc_t *pni_iocpdesc(pn_socket_t s) { static const pn_cid_t CID_pni_iocpdesc = CID_pn_void; static pn_class_t clazz = PN_CLASS(pni_iocpdesc); - assert (s != INVALID_SOCKET); iocpdesc_t *iocpd = (iocpdesc_t *) pn_class_new(&clazz, sizeof(iocpdesc_t)); assert(iocpd); iocpd->socket = s; @@ -749,6 +748,7 @@ static bool is_listener_socket(pn_socket_t s) } iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s, bool external) { + assert (s != INVALID_SOCKET); assert(!pni_iocpdesc_map_get(iocp, s)); bool listening = is_listener_socket(s); iocpdesc_t *iocpd = pni_iocpdesc(s); @@ -767,6 +767,15 @@ iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s, bool external) { return iocpd; } +iocpdesc_t *pni_deadline_desc(iocp_t *iocp) { + // Non IO descriptor for selector deadlines. Do not add to iocpdesc map or + // zombie list. Selector responsible to free/decref object. + iocpdesc_t *iocpd = pni_iocpdesc(PN_INVALID_SOCKET); + iocpd->iocp = iocp; + iocpd->deadline_desc = true; + return iocpd; +} + // === Fast lookup of a socket's iocpdesc_t iocpdesc_t *pni_iocpdesc_map_get(iocp_t *iocp, pn_socket_t s) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6df8ad35/proton-c/src/windows/iocp.h ---------------------------------------------------------------------- diff --git a/proton-c/src/windows/iocp.h b/proton-c/src/windows/iocp.h index 91ded50..adccaab 100644 --- a/proton-c/src/windows/iocp.h +++ b/proton-c/src/windows/iocp.h @@ -74,6 +74,7 @@ struct iocpdesc_t { bool closing; // pn_close called bool read_closed; // EOF or read error bool write_closed; // shutdown sent or write error + bool deadline_desc; // Socket-less deadline descriptor for selectors pn_selector_t *selector; pn_selectable_t *selectable; int events; @@ -104,6 +105,7 @@ struct write_result_t { iocpdesc_t *pni_iocpdesc_create(iocp_t *, pn_socket_t s, bool external); iocpdesc_t *pni_iocpdesc_map_get(iocp_t *, pn_socket_t s); +iocpdesc_t *pni_deadline_desc(iocp_t *); void pni_iocpdesc_map_del(iocp_t *, pn_socket_t s); void pni_iocpdesc_map_push(iocpdesc_t *iocpd); void pni_iocpdesc_start(iocpdesc_t *iocpd); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6df8ad35/proton-c/src/windows/selector.c ---------------------------------------------------------------------- diff --git a/proton-c/src/windows/selector.c b/proton-c/src/windows/selector.c index a59dc9c..6bd8d8c 100644 --- a/proton-c/src/windows/selector.c +++ b/proton-c/src/windows/selector.c @@ -44,8 +44,6 @@ static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t t); struct pn_selector_t { iocp_t *iocp; - pn_timestamp_t *deadlines; - size_t capacity; pn_list_t *selectables; pn_list_t *iocp_descriptors; size_t current; @@ -62,8 +60,6 @@ void pn_selector_initialize(void *obj) { pn_selector_t *selector = (pn_selector_t *) obj; selector->iocp = NULL; - selector->deadlines = NULL; - selector->capacity = 0; selector->selectables = pn_list(PN_WEAKREF, 0); selector->iocp_descriptors = pn_list(PN_OBJECT, 0); selector->current = 0; @@ -79,7 +75,6 @@ void pn_selector_initialize(void *obj) void pn_selector_finalize(void *obj) { pn_selector_t *selector = (pn_selector_t *) obj; - free(selector->deadlines); pn_free(selector->selectables); pn_free(selector->iocp_descriptors); pn_error_free(selector->error); @@ -110,34 +105,13 @@ void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable) assert(selectable); assert(pni_selectable_get_index(selectable) < 0); pn_socket_t sock = pn_selectable_get_fd(selectable); - iocpdesc_t *iocpd = NULL; - if (sock != INVALID_SOCKET) { - iocpd = pni_iocpdesc_map_get(selector->iocp, sock); - if (!iocpd) { - // Socket created outside proton. Hook it up to iocp. - iocpd = pni_iocpdesc_create(selector->iocp, sock, true); - pni_iocpdesc_start(iocpd); - } else { - assert(iocpd->iocp == selector->iocp); - } - } if (pni_selectable_get_index(selectable) < 0) { pn_list_add(selector->selectables, selectable); - pn_list_add(selector->iocp_descriptors, iocpd); + pn_list_add(selector->iocp_descriptors, NULL); size_t size = pn_list_size(selector->selectables); - - if (selector->capacity < size) { - selector->deadlines = (pn_timestamp_t *) realloc(selector->deadlines, size*sizeof(pn_timestamp_t)); - selector->capacity = size; - } - pni_selectable_set_index(selectable, size - 1); - if (iocpd) { - iocpd->selector = selector; - iocpd->selectable = selectable; - } } pn_selector_update(selector, selectable); @@ -145,12 +119,48 @@ void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable) void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable) { + // A selectable's fd may switch from PN_INVALID_SCOKET to a working socket between + // update calls. If a selectable without a valid socket has a deadline, we need + // a dummy iocpdesc_t to participate in the deadlines list. int idx = pni_selectable_get_index(selectable); assert(idx >= 0); - selector->deadlines[idx] = pn_selectable_get_deadline(selectable); - + pn_timestamp_t deadline = pn_selectable_get_deadline(selectable); pn_socket_t sock = pn_selectable_get_fd(selectable); iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx); + + if (!iocpd && deadline && sock == PN_INVALID_SOCKET) { + iocpd = pni_deadline_desc(selector->iocp); + assert(iocpd); + pn_list_set(selector->iocp_descriptors, idx, iocpd); + pn_decref(iocpd); // life is solely tied to iocp_descriptors list + iocpd->selector = selector; + iocpd->selectable = selectable; + } + else if (iocpd && iocpd->deadline_desc && sock != PN_INVALID_SOCKET) { + // Switching to a real socket. Stop using a deadline descriptor. + deadlines_update(iocpd, 0); + // decref descriptor in list and pick up a real iocpd below + pn_list_set(selector->iocp_descriptors, idx, NULL); + iocpd = NULL; + } + + // The selectables socket may be set long after it has been added + if (!iocpd && sock != PN_INVALID_SOCKET) { + iocpd = pni_iocpdesc_map_get(selector->iocp, sock); + if (!iocpd) { + // Socket created outside proton. Hook it up to iocp. + iocpd = pni_iocpdesc_create(selector->iocp, sock, true); + assert(iocpd); + if (iocpd) + pni_iocpdesc_start(iocpd); + } + if (iocpd) { + pn_list_set(selector->iocp_descriptors, idx, iocpd); + iocpd->selector = selector; + iocpd->selectable = selectable; + } + } + if (iocpd) { assert(sock == iocpd->socket || iocpd->closing); int interests = 0; @@ -160,11 +170,11 @@ void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable) if (pn_selectable_is_writing(selectable)) { interests |= PN_WRITABLE; } - if (selector->deadlines[idx]) { + if (deadline) { interests |= PN_EXPIRED; } interests_update(iocpd, interests); - deadlines_update(iocpd, selector->deadlines[idx]); + deadlines_update(iocpd, deadline); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
