This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/master by this push: new 9f1e5f0 PROTON-2203: fix duplicate listener socket rearming and rationalize rearming locking 9f1e5f0 is described below commit 9f1e5f0cd9cd1c146cee94ca4d6ff6ed4b71c139 Author: Cliff Jansen <cliffjan...@apache.org> AuthorDate: Thu Apr 30 22:47:17 2020 -0700 PROTON-2203: fix duplicate listener socket rearming and rationalize rearming locking --- c/src/proactor/epoll-internal.h | 1 - c/src/proactor/epoll.c | 40 +++++++++++++--------------------------- 2 files changed, 13 insertions(+), 28 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 20b01ac..fd02817 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -276,7 +276,6 @@ struct pn_listener_t { size_t pending_count; /* number of pending accepted connections */ size_t backlog; /* size of pending accepted array */ bool close_dispatched; - pmutex rearm_mutex; /* orders rearms/disarms, nothing else */ uint32_t sched_io_events; }; diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 76b5c37..6667365 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -848,15 +848,11 @@ static void proactor_rearm_overflow(pn_proactor_t *p) { assert(a->overflowed); a->overflowed = false; if (rearming) { - lock(&l->rearm_mutex); + rearm(p, &a->psocket.epoll_io); a->armed = true; } else notify = wake(&l->context); unlock(&l->context.mutex); - if (rearming) { - rearm(p, &a->psocket.epoll_io); - unlock(&l->rearm_mutex); - } if (notify) wake_notify(&l->context); a = acceptor_list_next(&ovflw); } @@ -1642,7 +1638,6 @@ pn_listener_t *pn_listener() { } pn_proactor_t *unknown = NULL; // won't know until pn_proactor_listen pcontext_init(&l->context, LISTENER, unknown); - pmutex_init(&l->rearm_mutex); } return l; } @@ -1702,11 +1697,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in ps->epoll_io.fd = fd; ps->epoll_io.wanted = EPOLLIN; ps->epoll_io.polling = false; - lock(&l->rearm_mutex); start_polling(&ps->epoll_io, ps->proactor->epollfd); // TODO: check for error l->active_count++; acceptor->armed = true; - unlock(&l->rearm_mutex); } else { close(fd); } @@ -1745,7 +1738,6 @@ static inline bool listener_can_free(pn_listener_t *l) { static inline void listener_final_free(pn_listener_t *l) { pcontext_finalize(&l->context); - pmutex_finalize(&l->rearm_mutex); free(l->acceptors); free(l->pending_accepteds); free(l); @@ -1780,7 +1772,6 @@ static void listener_begin_close(pn_listener_t* l) { acceptor_t *a = &l->acceptors[i]; psocket_t *ps = &a->psocket; if (ps->epoll_io.fd >= 0) { - lock(&l->rearm_mutex); if (a->armed) { shutdown(ps->epoll_io.fd, SHUT_RD); // Force epoll event and callback } else { @@ -1789,7 +1780,6 @@ static void listener_begin_close(pn_listener_t* l) { ps->epoll_io.fd = -1; l->active_count--; } - unlock(&l->rearm_mutex); } } /* Close all sockets waiting for a pn_listener_accept2() */ @@ -1869,17 +1859,13 @@ static pn_event_batch_t *listener_process(pn_listener_t *l, int n_events, bool w uint32_t events = ps->working_io_events; ps->working_io_events = 0; if (l->context.closing) { - lock(&l->rearm_mutex); l->acceptors[i].armed = false; stop_polling(&ps->epoll_io, ps->proactor->epollfd); - unlock(&l->rearm_mutex); close(ps->epoll_io.fd); ps->epoll_io.fd = -1; l->active_count--; } else { - lock(&l->rearm_mutex); l->acceptors[i].armed = false; - unlock(&l->rearm_mutex); if (events & EPOLLRDHUP) { /* Calls listener_begin_close which closes all the listener's sockets */ psocket_error(ps, errno, "listener epoll"); @@ -1929,29 +1915,29 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { static void listener_done(pn_listener_t *l) { pn_proactor_t *p = l->context.proactor; tslot_t *ts = l->context.runner; + lock(&l->context.mutex); // Just in case the app didn't accept all the pending accepts // Shuffle the list back to start at 0 memmove(&l->pending_accepteds[0], &l->pending_accepteds[l->pending_first], l->pending_count * sizeof(accepted_t)); l->pending_first = 0; - for (size_t i = 0; i < l->acceptors_size; i++) { - acceptor_t *a = &l->acceptors[i]; - psocket_t *ps = &a->psocket; - - // Rearm acceptor when appropriate - if (ps->epoll_io.polling && l->pending_count==0) { - lock(&l->rearm_mutex); - if (!a->armed) { - rearm(ps->proactor, &ps->epoll_io); - a->armed = true; + if (!l->context.closing) { + for (size_t i = 0; i < l->acceptors_size; i++) { + acceptor_t *a = &l->acceptors[i]; + psocket_t *ps = &a->psocket; + + // Rearm acceptor when appropriate + if (ps->epoll_io.polling && l->pending_count==0 && !a->overflowed) { + if (!a->armed) { + rearm(ps->proactor, &ps->epoll_io); + a->armed = true; + } } - unlock(&l->rearm_mutex); } } bool notify = false; - lock(&l->context.mutex); l->context.working = false; lock(&p->sched_mutex); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org