This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f1e5f0  PROTON-2203: fix duplicate listener socket rearming and 
rationalize rearming locking
9f1e5f0 is described below

commit 9f1e5f0cd9cd1c146cee94ca4d6ff6ed4b71c139
Author: Cliff Jansen <cliffjan...@apache.org>
AuthorDate: Thu Apr 30 22:47:17 2020 -0700

    PROTON-2203: fix duplicate listener socket rearming and rationalize 
rearming locking
---
 c/src/proactor/epoll-internal.h |  1 -
 c/src/proactor/epoll.c          | 40 +++++++++++++---------------------------
 2 files changed, 13 insertions(+), 28 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 20b01ac..fd02817 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -276,7 +276,6 @@ struct pn_listener_t {
   size_t pending_count;              /* number of pending accepted connections 
*/
   size_t backlog;                 /* size of pending accepted array */
   bool close_dispatched;
-  pmutex rearm_mutex;             /* orders rearms/disarms, nothing else */
   uint32_t sched_io_events;
 };
 
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 76b5c37..6667365 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -848,15 +848,11 @@ static void proactor_rearm_overflow(pn_proactor_t *p) {
     assert(a->overflowed);
     a->overflowed = false;
     if (rearming) {
-      lock(&l->rearm_mutex);
+      rearm(p, &a->psocket.epoll_io);
       a->armed = true;
     }
     else notify = wake(&l->context);
     unlock(&l->context.mutex);
-    if (rearming) {
-      rearm(p, &a->psocket.epoll_io);
-      unlock(&l->rearm_mutex);
-    }
     if (notify) wake_notify(&l->context);
     a = acceptor_list_next(&ovflw);
   }
@@ -1642,7 +1638,6 @@ pn_listener_t *pn_listener() {
     }
     pn_proactor_t *unknown = NULL;  // won't know until pn_proactor_listen
     pcontext_init(&l->context, LISTENER, unknown);
-    pmutex_init(&l->rearm_mutex);
   }
   return l;
 }
@@ -1702,11 +1697,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t 
*l, const char *addr, in
           ps->epoll_io.fd = fd;
           ps->epoll_io.wanted = EPOLLIN;
           ps->epoll_io.polling = false;
-          lock(&l->rearm_mutex);
           start_polling(&ps->epoll_io, ps->proactor->epollfd);  // TODO: check 
for error
           l->active_count++;
           acceptor->armed = true;
-          unlock(&l->rearm_mutex);
         } else {
           close(fd);
         }
@@ -1745,7 +1738,6 @@ static inline bool listener_can_free(pn_listener_t *l) {
 
 static inline void listener_final_free(pn_listener_t *l) {
   pcontext_finalize(&l->context);
-  pmutex_finalize(&l->rearm_mutex);
   free(l->acceptors);
   free(l->pending_accepteds);
   free(l);
@@ -1780,7 +1772,6 @@ static void listener_begin_close(pn_listener_t* l) {
       acceptor_t *a = &l->acceptors[i];
       psocket_t *ps = &a->psocket;
       if (ps->epoll_io.fd >= 0) {
-        lock(&l->rearm_mutex);
         if (a->armed) {
           shutdown(ps->epoll_io.fd, SHUT_RD);  // Force epoll event and 
callback
         } else {
@@ -1789,7 +1780,6 @@ static void listener_begin_close(pn_listener_t* l) {
           ps->epoll_io.fd = -1;
           l->active_count--;
         }
-        unlock(&l->rearm_mutex);
       }
     }
     /* Close all sockets waiting for a pn_listener_accept2() */
@@ -1869,17 +1859,13 @@ static pn_event_batch_t *listener_process(pn_listener_t 
*l, int n_events, bool w
         uint32_t events = ps->working_io_events;
         ps->working_io_events = 0;
         if (l->context.closing) {
-          lock(&l->rearm_mutex);
           l->acceptors[i].armed = false;
           stop_polling(&ps->epoll_io, ps->proactor->epollfd);
-          unlock(&l->rearm_mutex);
           close(ps->epoll_io.fd);
           ps->epoll_io.fd = -1;
           l->active_count--;
         } else {
-          lock(&l->rearm_mutex);
           l->acceptors[i].armed = false;
-          unlock(&l->rearm_mutex);
           if (events & EPOLLRDHUP) {
             /* Calls listener_begin_close which closes all the listener's 
sockets */
             psocket_error(ps, errno, "listener epoll");
@@ -1929,29 +1915,29 @@ static pn_event_t *listener_batch_next(pn_event_batch_t 
*batch) {
 static void listener_done(pn_listener_t *l) {
   pn_proactor_t *p = l->context.proactor;
   tslot_t *ts = l->context.runner;
+  lock(&l->context.mutex);
 
   // Just in case the app didn't accept all the pending accepts
   // Shuffle the list back to start at 0
   memmove(&l->pending_accepteds[0], &l->pending_accepteds[l->pending_first], 
l->pending_count * sizeof(accepted_t));
   l->pending_first = 0;
 
-  for (size_t i = 0; i < l->acceptors_size; i++) {
-    acceptor_t *a = &l->acceptors[i];
-    psocket_t *ps = &a->psocket;
-
-    // Rearm acceptor when appropriate
-    if (ps->epoll_io.polling && l->pending_count==0) {
-      lock(&l->rearm_mutex);
-      if (!a->armed) {
-        rearm(ps->proactor, &ps->epoll_io);
-        a->armed = true;
+  if (!l->context.closing) {
+    for (size_t i = 0; i < l->acceptors_size; i++) {
+      acceptor_t *a = &l->acceptors[i];
+      psocket_t *ps = &a->psocket;
+
+      // Rearm acceptor when appropriate
+      if (ps->epoll_io.polling && l->pending_count==0 && !a->overflowed) {
+        if (!a->armed) {
+          rearm(ps->proactor, &ps->epoll_io);
+          a->armed = true;
+        }
       }
-      unlock(&l->rearm_mutex);
     }
   }
 
   bool notify = false;
-  lock(&l->context.mutex);
   l->context.working = false;
 
   lock(&p->sched_mutex);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to