Repository: qpid-proton
Updated Branches:
  refs/heads/master a80d54e62 -> fc1df0551


Revert "PROTON-1771: [c] locking around epoll_extended_t"

This reverts commit 188ce28066df8f5e965fb63593f419f49c950760.
The fix caused hangs due to mutex deadlocks.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/fc1df055
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/fc1df055
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/fc1df055

Branch: refs/heads/master
Commit: fc1df0551947c357e9fa0bf9da4b836c5a97c11f
Parents: a80d54e
Author: Alan Conway <acon...@redhat.com>
Authored: Mon Apr 16 13:21:08 2018 -0400
Committer: Alan Conway <acon...@redhat.com>
Committed: Mon Apr 16 13:21:08 2018 -0400

----------------------------------------------------------------------
 c/src/proactor/epoll.c | 68 +++++++++++++++++++--------------------------
 1 file changed, 28 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fc1df055/c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index d0db0a7..752e6e0 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -128,9 +128,20 @@ typedef struct epoll_extended_t {
   epoll_type_t type;   // io/timer/wakeup
   uint32_t wanted;     // events to poll for
   bool polling;
-  pmutex mutex;
+  pmutex barrier_mutex;
 } epoll_extended_t;
 
+/* epoll_ctl()/epoll_wait() do not form a memory barrier, so cached memory
+   writes to struct epoll_extended_t in the EPOLL_ADD thread might not be
+   visible to epoll_wait() thread. This function creates a memory barrier,
+   called before epoll_ctl() and after epoll_wait()
+*/
+static void memory_barrier(epoll_extended_t *ee) {
+  // Mutex lock/unlock has the side-effect of being a memory barrier.
+  lock(&ee->barrier_mutex);
+  unlock(&ee->barrier_mutex);
+}
+
 /*
  * This timerfd logic assumes EPOLLONESHOT and there never being two
  * active timeout callbacks.  There can be multiple (or zero)
@@ -276,38 +287,28 @@ PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
 PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
 
 static bool start_polling(epoll_extended_t *ee, int epollfd) {
-  lock(&ee->mutex);
-  if (ee->polling) {
-    unlock(&ee->mutex);
+  if (ee->polling)
     return false;
-  }
   ee->polling = true;
   struct epoll_event ev;
   ev.data.ptr = ee;
   ev.events = ee->wanted | EPOLLONESHOT;
-  int fd = ee->fd;
-  unlock(&ee->mutex);
-  return (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) == 0);
+  memory_barrier(ee);
+  return (epoll_ctl(epollfd, EPOLL_CTL_ADD, ee->fd, &ev) == 0);
 }
 
 static void stop_polling(epoll_extended_t *ee, int epollfd) {
   // TODO: check for error, return bool or just log?
-  lock(&ee->mutex);
-  if (ee->fd == -1 || !ee->polling || epollfd == -1) {
-    unlock(&ee->mutex);
+  if (ee->fd == -1 || !ee->polling || epollfd == -1)
     return;
-  }
   struct epoll_event ev;
   ev.data.ptr = ee;
   ev.events = 0;
-  int fd = ee->fd;
-  unlock(&ee->mutex);
-  if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) == -1)
+  memory_barrier(ee);
+  if (epoll_ctl(epollfd, EPOLL_CTL_DEL, ee->fd, &ev) == -1)
     EPOLL_FATAL("EPOLL_CTL_DEL", errno);
-  lock(&ee->mutex);
   ee->fd = -1;
   ee->polling = false;
-  unlock(&ee->mutex);
 }
 
 /*
@@ -681,12 +682,10 @@ static void psocket_gai_error(psocket_t *ps, int gai_err, 
const char* what) {
 
 static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
   struct epoll_event ev;
-  lock(&ee->mutex);
   ev.data.ptr = ee;
   ev.events = ee->wanted | EPOLLONESHOT;
-  int fd = ee->fd;
-  unlock(&ee->mutex);
-  if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, fd, &ev) == -1)
+  memory_barrier(ee);
+  if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1)
     EPOLL_FATAL("arming polled file descriptor", errno);
 }
 
@@ -1230,7 +1229,6 @@ static void pconnection_start(pconnection_t *pc) {
   (void)getsockname(pc->psocket.sockfd, (struct sockaddr*)&pc->local.ss, &len);
 
   epoll_extended_t *ee = &pc->psocket.epoll_io;
-  lock(&ee->mutex);
   if (ee->polling) {     /* This is not the first attempt, stop polling and 
close the old FD */
     int fd = ee->fd;     /* Save fd, it will be set to -1 by stop_polling */
     stop_polling(ee, efd);
@@ -1238,7 +1236,6 @@ static void pconnection_start(pconnection_t *pc) {
   }
   ee->fd = pc->psocket.sockfd;
   pc->current_arm = ee->wanted = EPOLLIN | EPOLLOUT;
-  unlock(&ee->mutex);
   start_polling(ee, efd);  // TODO: check for error
 }
 
@@ -1762,13 +1759,11 @@ void pn_listener_accept2(pn_listener_t *l, 
pn_connection_t *c, pn_transport_t *t
 
 /* Set up an epoll_extended_t to be used for wakeup or interrupts */
 static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd) {
-  lock(&ee->mutex);
   ee->psocket = NULL;
   ee->fd = eventfd;
   ee->type = WAKE;
   ee->wanted = EPOLLIN;
   ee->polling = false;
-  unlock(&ee->mutex);
   start_polling(ee, epollfd);  // TODO: check for error
 }
 
@@ -1959,10 +1954,7 @@ static bool proactor_remove(pcontext_t *ctx) {
 }
 
 static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, 
epoll_extended_t *ee) {
-  lock(&ee->mutex);
-  int fd = ee->fd;
-  unlock(&ee->mutex);
-  if  (fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd 
*/
+  if  (ee->fd == p->interruptfd) {        /* Interrupts have their own 
dedicated eventfd */
     (void)read_uint64(p->interruptfd);
     rearm(p, &p->epoll_interrupt);
     return proactor_process(p, PN_PROACTOR_INTERRUPT);
@@ -2007,29 +1999,25 @@ static pn_event_batch_t *proactor_do_epoll(struct 
pn_proactor_t* p, bool can_blo
     }
     assert(n == 1);
     epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr;
+    memory_barrier(ee);
 
-    lock(&ee->mutex);
-    epoll_type_t type = ee->type;
-    struct psocket_t *psocket = ee->psocket;
-    unlock(&ee->mutex);
-
-    if (type == WAKE) {
+    if (ee->type == WAKE) {
       batch = process_inbound_wake(p, ee);
-    } else if (type == PROACTOR_TIMER) {
+    } else if (ee->type == PROACTOR_TIMER) {
       batch = proactor_process(p, PN_PROACTOR_TIMEOUT);
     } else {
-      pconnection_t *pc = psocket_pconnection(psocket);
+      pconnection_t *pc = psocket_pconnection(ee->psocket);
       if (pc) {
-        if (type == PCONNECTION_IO) {
+        if (ee->type == PCONNECTION_IO) {
           batch = pconnection_process(pc, ev.events, false, false);
         } else {
-          assert(type == PCONNECTION_TIMER);
+          assert(ee->type == PCONNECTION_TIMER);
           batch = pconnection_process(pc, 0, true, false);
         }
       }
       else {
         // TODO: can any of the listener processing be parallelized like IOCP?
-        batch = listener_process(psocket, ev.events);
+        batch = listener_process(ee->psocket, ev.events);
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to