PROTON-1460: C epoll proactor, deterministic socket IO callbacks on close
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d25089bf Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d25089bf Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d25089bf Branch: refs/heads/PROTON-1488 Commit: d25089bf2834dc4f650697c39d09dfb95c5509a2 Parents: ec1d1a3 Author: Clifford Jansen <[email protected]> Authored: Wed May 24 23:48:34 2017 -0700 Committer: Clifford Jansen <[email protected]> Committed: Wed May 24 23:59:41 2017 -0700 ---------------------------------------------------------------------- proton-c/src/proactor/epoll.c | 55 +++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d25089bf/proton-c/src/proactor/epoll.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c index 2f99cd9..c65fa44 100644 --- a/proton-c/src/proactor/epoll.c +++ b/proton-c/src/proactor/epoll.c @@ -682,6 +682,7 @@ static void pconnection_final_free(pconnection_t *pc) { // call without lock, but only if pconnection_is_final() is true static void pconnection_cleanup(pconnection_t *pc) { + stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd); if (pc->psocket.sockfd != -1) close(pc->psocket.sockfd); stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd); @@ -698,8 +699,16 @@ static void pconnection_cleanup(pconnection_t *pc) { static void pconnection_begin_close(pconnection_t *pc) { if (!pc->context.closing) { pc->context.closing = true; - stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd); - pc->current_arm = 0; + if (pc->current_arm != 0 && !pc->new_events) { + // Force io callback via hangup + if (pc->current_arm != (EPOLLIN | EPOLLOUT)) { + pc->current_arm = (EPOLLIN | EPOLLOUT); + pc->psocket.epoll_io.wanted = pc->current_arm;; + rearm(pc->psocket.proactor, &pc->psocket.epoll_io); + } + shutdown(pc->psocket.sockfd, SHUT_RDWR); + } + pn_connection_driver_close(&pc->driver); if (ptimer_shutdown(&pc->timer, pc->timer_armed)) pc->timer_armed = false; // disarmed in the sense that the timer will never fire again @@ -713,6 +722,8 @@ static void pconnection_begin_close(pconnection_t *pc) { static void pconnection_forced_shutdown(pconnection_t *pc) { // Called by proactor_free, no competing threads, no epoll activity. + pc->current_arm = 0; + pc->new_events = 0; pconnection_begin_close(pc); // pconnection_process will never be called again. Zero everything. pc->timer_armed = false; @@ -886,12 +897,6 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, // Confirmed as working thread. Review state and unlock ASAP. - if (pc->context.closing && pconnection_is_final(pc)) { - unlock(&pc->context.mutex); - pconnection_cleanup(pc); - return NULL; - } - retry: if (pc->queued_disconnect) { // From pn_proactor_disconnect() @@ -919,28 +924,36 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } if (pc->new_events) { - if ((pc->new_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) && !pconnection_wclosed(pc)) - pconnection_maybe_connect_lh(pc); - else - pconnection_connected_lh(pc); /* Non error event means we are connected */ - if (pc->new_events & EPOLLOUT) - pc->write_blocked = false; - if (pc->new_events & EPOLLIN) - pc->read_blocked = false; + if (!pc->context.closing) { + if ((pc->new_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) && !pconnection_wclosed(pc)) + pconnection_maybe_connect_lh(pc); + else + pconnection_connected_lh(pc); /* Non error event means we are connected */ + if (pc->new_events & EPOLLOUT) + pc->write_blocked = false; + if (pc->new_events & EPOLLIN) + pc->read_blocked = false; + } pc->current_arm = 0; pc->new_events = 0; } bool unarmed = (pc->current_arm == 0); - if (!pc->timer_armed) { + + if (pc->context.closing && pconnection_is_final(pc)) { + unlock(&pc->context.mutex); + pconnection_cleanup(pc); + return NULL; + } + + if (!pc->timer_armed && !pc->timer.shutting_down) { pc->timer_armed = true; // about to rearm outside the lock timer_unarmed = true; // so we remember } - bool timer_shutting_down = pc->timer.shutting_down; unlock(&pc->context.mutex); pc->hog_count++; // working context doing work - if (timer_unarmed && !timer_shutting_down) { + if (timer_unarmed) { rearm(pc->psocket.proactor, &pc->timer.epoll_io); timer_unarmed = false; } @@ -1279,7 +1292,6 @@ void pn_listener_free(pn_listener_t *l) { /* Note at this point either the listener has never been used (freed by user) or it has been closed, so all its sockets are closed. */ - // TODO: do we need a QPID DeletionManager equivalent to be safe from inbound connection (accept) epoll events? if (l) { bool can_free = true; if (l->collector) pn_collector_free(l->collector); @@ -1296,6 +1308,7 @@ void pn_listener_free(pn_listener_t *l) { } static void listener_begin_close(pn_listener_t* l) { + // TODO: switch to shutdown(sock, SHUT_RD) and wait for HUP callback per listener socket (analogous to pconnection) if (!l->context.closing) { l->context.closing = true; /* Close all listening sockets */ @@ -1338,7 +1351,7 @@ static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) { pn_listener_t *l = psocket_listener(ps); lock(&l->context.mutex); if (events) { - l->armed = false; + l->armed = false; // TODO: armed logic should be per socket not per aggregate listener if (events & EPOLLRDHUP) { /* Calls listener_begin_close which closes all the listener's sockets */ psocket_error(ps, errno, "listener epoll"); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
