Repository: qpid-proton Updated Branches: refs/heads/master 48e75e304 -> e504ce12f
PROTON-1504: epoll proactor: no PN_LISTENER_ACCEPT events if no FDs Epoll proactor now generates PN_LISTENER_ACCEPT events only if a socket accept() succeeds. Simplifies the code and makes it consistent with the libuv proactor. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e504ce12 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e504ce12 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e504ce12 Branch: refs/heads/master Commit: e504ce12f148842e43c8584ff96baf2e984e4d10 Parents: 48e75e3 Author: Alan Conway <[email protected]> Authored: Thu Jun 15 17:28:32 2017 -0400 Committer: Alan Conway <[email protected]> Committed: Thu Jun 15 17:38:31 2017 -0400 ---------------------------------------------------------------------- proton-c/src/proactor/epoll.c | 72 ++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e504ce12/proton-c/src/proactor/epoll.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c index c4d0c73..a173086 100644 --- a/proton-c/src/proactor/epoll.c +++ b/proton-c/src/proactor/epoll.c @@ -528,7 +528,8 @@ struct pn_listener_t { pn_record_t *attachments; void *listener_context; size_t backlog; - psocket_t *acceptable, *accepted; + int accepted_fd; /* fd accepted but not yet handled by pn_listener_accept() */ + psocket_t *accepted; /* psocket from which we accpeted accepted_fd */ bool close_dispatched; bool armed; pn_listener_t *overflow; /* Next overflowed listener */ @@ -710,7 +711,7 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo pc->read_blocked = true; pc->write_blocked = true; pc->disconnected = false; - pc->hog_count = 0;; + pc->hog_count = 0; pc->batch.next_event = pconnection_batch_next; if (server) { @@ -1269,6 +1270,8 @@ pn_listener_t *pn_event_listener(pn_event_t *e) { pn_listener_t *pn_listener() { pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t)); if (l) { + l->accepted_fd = -1; + l->accepted = NULL; l->batch.next_event = listener_batch_next; l->collector = pn_collector(); l->condition = pn_condition(); @@ -1394,7 +1397,6 @@ static void listener_begin_close(pn_listener_t* l) { } } pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE); - l->acceptable = l->accepted = NULL; } } @@ -1419,6 +1421,24 @@ static void listener_forced_shutdown(pn_listener_t *l) { pn_listener_free(l); } +/* Accept a connection as part of listener_process(). Called with listener context lock held. */ +static void listener_accept_lh(psocket_t *ps) { + pn_listener_t *l = psocket_listener(ps); + assert(l->accepted_fd < 0); /* Shouldn't already have an accepted_fd */ + l->accepted_fd = accept(ps->sockfd, NULL, 0); + l->accepted = ps; + if (l->accepted_fd >= 0) { + pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT); + } else { + int err = errno; + if (err == ENFILE || err == EMFILE) { + listener_set_overflow(l); + } else { + psocket_error(ps, err, "accept"); + } + } +} + /* Process a listening socket */ static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) { // TODO: some parallelization of the accept mechanism. @@ -1430,8 +1450,7 @@ static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) { /* Calls listener_begin_close which closes all the listener's sockets */ psocket_error(ps, errno, "listener epoll"); } else if (!l->context.closing && events & EPOLLIN) { - l->acceptable = ps; - pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT); + listener_accept_lh(ps); } } else { wake_done(&l->context); // callback accounting @@ -1472,7 +1491,7 @@ static void listener_done(pn_listener_t *l) { } else if (listener_has_event(l)) { notify = wake(&l->context); } else if (l->overflow == NO_OVERFLOW && - !l->context.closing && !l->armed && !l->acceptable && l->accepted) + !l->context.closing && !l->armed && l->accepted_fd < 0 && l->accepted) { /* Don't rearm until the current socket is accepted */ rearm(l->accepted->proactor, &l->accepted->epoll_io); @@ -1507,42 +1526,19 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) { // TODO: fuller sanity check on input args pconnection_t *pc = new_pconnection_t(l->psockets[0].proactor, c, true, ""); assert(pc); // TODO: memory safety - int err = 0; - int newfd = -1; - bool need_done = false; lock(&l->context.mutex); + int fd = l->accepted_fd; + l->accepted_fd = -1; proactor_add(&pc->context); - if (l->context.closing) { - err = EBADF; - } else if (l->acceptable == 0) { - err = EAGAIN; - } else { - l->accepted = l->acceptable; - l->acceptable = 0; - newfd = accept(l->accepted->sockfd, NULL, 0); - if (newfd < 0) err = errno; - } - if (err) { - lock(&pc->context.mutex); - psocket_error(&pc->psocket, err, "accepting from"); /* Always signal error on the connection */ - pconnection_begin_close(pc); - need_done = true; - unlock(&pc->context.mutex); - if (err == EMFILE || err == ENFILE) { /* Out of FDs does not close the listener */ - listener_set_overflow(l); - } else { - psocket_error(l->accepted, err, "accepting from"); - } - } else { /* No errors */ - lock(&pc->context.mutex); - configure_socket(newfd); - pc->psocket.sockfd = newfd; - pconnection_start(pc); - unlock(&pc->context.mutex); - } + + lock(&pc->context.mutex); + configure_socket(fd); + pc->psocket.sockfd = fd; + pconnection_start(pc); + unlock(&pc->context.mutex); + unlock(&l->context.mutex); - if (need_done) pconnection_done(pc); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
