Hi,

On Fri, May 04, 2018 at 05:32:24PM +0200, Olivier Houchard wrote:
> Hi,
> 
> When the code was changed to use one poller per thread, we overlooked the
> fact that some fds can be shared between multiple threads, and when one
> event occured, that required the fd to be added or removed from the poller,
> the information would be sent only to the current thread, the other threads
> may totally miss it, and thus either miss events, or report spurious events.
> The attached patches is an attempt at fixing this, by adding a new global
> update list, in addition to the local thread update list.
> 
> This can't be applied to 1.8, as it uses code that was not, and probably won't
> be, backported, so a different patch, similar in spirit, will be developed.
> 

Here is a patch that should do the same for 1.8.

Regards,

Olivier
>From 3f8fe65b4433f4f8c543ff9b11c48554fe862f45 Mon Sep 17 00:00:00 2001
From: Olivier Houchard <ohouch...@haproxy.com>
Date: Thu, 17 May 2018 18:34:02 +0200
Subject: [PATCH] BUG/MEDIUM: pollers: Use a global list for fd shared between
 threads.

With the old model, any fd shared by multiple threads, such as listeners
or dns sockets, would only be updated on one threads, so that could lead
to missed event, or spurious wakeups.
To avoid this, add a global list for fd that are shared, and only remove
entries from this list when every thread as updated its poller.
This subtly changes the semantics of updt_fd_polling(), as it now unlocks
the FD_LOCK on exit.

This is similar in spirit to commit 6b96f7289c2f401deef4bdc6e20792360807dde4
(with the bugfix from c55b88ece616afe0b28dc81eb39bad37b5f9c33f) applied,
but had to be rewrote, because of the differences between 1.8 and master.

This should only be applied to 1.8.
---
 include/common/hathreads.h |   4 ++
 include/proto/fd.h         | 130 ++++++++++++++++++++++++++++++++++-----------
 include/types/fd.h         |  13 +++++
 src/ev_epoll.c             |  90 +++++++++++++++++++++----------
 src/ev_kqueue.c            |  83 +++++++++++++++++++++--------
 src/ev_poll.c              |  45 ++++++++++++++++
 src/ev_select.c            |  37 +++++++++++++
 src/fd.c                   |   6 +++
 src/hathreads.c            |   2 +-
 9 files changed, 327 insertions(+), 83 deletions(-)

diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 325a869a..86db4d5c 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -201,6 +201,8 @@ void thread_exit_sync(void);
 int  thread_no_sync(void);
 int  thread_need_sync(void);
 
+extern unsigned long all_threads_mask;
+
 #if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
 
 /* WARNING!!! if you update this enum, please also keep lock_label() up to 
date below */
@@ -209,6 +211,7 @@ enum lock_label {
        FDTAB_LOCK,
        FDCACHE_LOCK,
        FD_LOCK,
+       FD_UPDATE_LOCK,
        POLL_LOCK,
        TASK_RQ_LOCK,
        TASK_WQ_LOCK,
@@ -330,6 +333,7 @@ static inline const char *lock_label(enum lock_label label)
        case FDCACHE_LOCK:         return "FDCACHE";
        case FD_LOCK:              return "FD";
        case FDTAB_LOCK:           return "FDTAB";
+       case FD_UPDATE_LOCK:       return "FD_UPDATE";
        case POLL_LOCK:            return "POLL";
        case TASK_RQ_LOCK:         return "TASK_RQ";
        case TASK_WQ_LOCK:         return "TASK_WQ";
diff --git a/include/proto/fd.h b/include/proto/fd.h
index bb91bb2c..b6199ccf 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -43,6 +43,9 @@ extern THREAD_LOCAL int fd_nbupdt; // number of updates in 
the list
 __decl_hathreads(extern HA_SPINLOCK_T __attribute__((aligned(64))) 
fdtab_lock);      /* global lock to protect fdtab array */
 __decl_hathreads(extern HA_RWLOCK_T   __attribute__((aligned(64))) 
fdcache_lock);    /* global lock to protect fd_cache array */
 __decl_hathreads(extern HA_SPINLOCK_T __attribute__((aligned(64))) poll_lock); 
      /* global lock to protect poll info */
+__decl_hathreads(extern HA_SPINLOCK_T __attribute__((aligned(64))) 
fd_updt_lock); /* global lock to protect the update list */
+
+extern struct fdlist update_list; // Global update list
 
 /* Deletes an FD from the fdsets, and recomputes the maxfd limit.
  * The file descriptor is also closed.
@@ -96,14 +99,70 @@ void fd_process_cached_events();
 
 /* Mark fd <fd> as updated for polling and allocate an entry in the update list
  * for this if it was not already there. This can be done at any time.
+ * This function expects the FD lock to be locked, and returns with the
+ * FD lock unlocked.
  */
 static inline void updt_fd_polling(const int fd)
 {
-       if (fdtab[fd].update_mask & tid_bit)
+       if ((fdtab[fd].update_mask & fdtab[fd].thread_mask) ==
+           fdtab[fd].thread_mask) {
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
                /* already scheduled for update */
                return;
-       fdtab[fd].update_mask |= tid_bit;
-       fd_updt[fd_nbupdt++] = fd;
+       }
+       if (fdtab[fd].thread_mask == tid_bit) {
+               fdtab[fd].update_mask |= tid_bit;
+               fd_updt[fd_nbupdt++] = fd;
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+       } else {
+               /* This is ugly, but we can afford to unlock the FD lock
+                * before we acquire the fd_updt_lock, to prevent a
+                * lock order reversal, because this function is only called
+                * from fd_update_cache(), and all users of fd_update_cache()
+                * used to just unlock the fd lock just after, anyway.
+                */
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+               HA_SPIN_LOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+               /* If update_mask is non-nul, then it's already in the list
+                * so we don't have to add it.
+                */
+               if (fdtab[fd].update_mask == 0) {
+                       if (update_list.first == -1) {
+                               update_list.first = update_list.last = fd;
+                               fdtab[fd].update.next = fdtab[fd].update.prev = 
-1;
+                       } else {
+                               fdtab[update_list.last].update.next = fd;
+                               fdtab[fd].update.prev = update_list.last;
+                               fdtab[fd].update.next = -1;
+                               update_list.last = fd;
+                       }
+               }
+               fdtab[fd].update_mask |= fdtab[fd].thread_mask;
+               HA_SPIN_UNLOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+
+       }
+}
+
+/* Called from the poller to acknoledge we read an entry from the global
+ * update list, to remove our bit from the update_mask, and remove it from
+ * the list if we were the last one.
+ */
+/* Expects to be called with the FD lock and the FD update lock held */
+static inline void done_update_polling(int fd)
+{
+       fdtab[fd].update_mask &= ~tid_bit;
+       if ((fdtab[fd].update_mask & all_threads_mask) == 0) {
+               if (fdtab[fd].update.prev != -1)
+                       fdtab[fdtab[fd].update.prev].update.next =
+                           fdtab[fd].update.next;
+               else
+                       update_list.first = fdtab[fd].update.next;
+               if (fdtab[fd].update.next != -1)
+                       fdtab[fdtab[fd].update.next].update.prev =
+                           fdtab[fd].update.prev;
+               else
+                       update_list.last = fdtab[fd].update.prev;
+       }
 }
 
 
@@ -175,13 +234,6 @@ static inline int fd_compute_new_polled_status(int state)
  */
 static inline void fd_update_cache(int fd)
 {
-       /* 3 states for each direction require a polling update */
-       if ((fdtab[fd].state & (FD_EV_POLLED_R |                 
FD_EV_ACTIVE_R)) == FD_EV_POLLED_R ||
-           (fdtab[fd].state & (FD_EV_POLLED_R | FD_EV_READY_R | 
FD_EV_ACTIVE_R)) == FD_EV_ACTIVE_R ||
-           (fdtab[fd].state & (FD_EV_POLLED_W |                 
FD_EV_ACTIVE_W)) == FD_EV_POLLED_W ||
-           (fdtab[fd].state & (FD_EV_POLLED_W | FD_EV_READY_W | 
FD_EV_ACTIVE_W)) == FD_EV_ACTIVE_W)
-               updt_fd_polling(fd);
-
        /* only READY and ACTIVE states (the two with both flags set) require a 
cache entry */
        if (((fdtab[fd].state & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == 
(FD_EV_READY_R | FD_EV_ACTIVE_R)) ||
            ((fdtab[fd].state & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == 
(FD_EV_READY_W | FD_EV_ACTIVE_W))) {
@@ -190,6 +242,14 @@ static inline void fd_update_cache(int fd)
        else {
                fd_release_cache_entry(fd);
        }
+       /* 3 states for each direction require a polling update */
+       if ((fdtab[fd].state & (FD_EV_POLLED_R |                 
FD_EV_ACTIVE_R)) == FD_EV_POLLED_R ||
+           (fdtab[fd].state & (FD_EV_POLLED_R | FD_EV_READY_R | 
FD_EV_ACTIVE_R)) == FD_EV_ACTIVE_R ||
+           (fdtab[fd].state & (FD_EV_POLLED_W |                 
FD_EV_ACTIVE_W)) == FD_EV_POLLED_W ||
+           (fdtab[fd].state & (FD_EV_POLLED_W | FD_EV_READY_W | 
FD_EV_ACTIVE_W)) == FD_EV_ACTIVE_W)
+               updt_fd_polling(fd);
+       else
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /*
@@ -271,8 +331,9 @@ static inline void fd_stop_recv(int fd)
        if (fd_recv_active(fd)) {
                fdtab[fd].state &= ~FD_EV_ACTIVE_R;
                fd_update_cache(fd); /* need an update entry to change the 
state */
-       }
-       HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+               /* the FD lock is unlocked by fd_update_cache() */
+       } else
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Disable processing send events on fd <fd> */
@@ -282,8 +343,9 @@ static inline void fd_stop_send(int fd)
        if (fd_send_active(fd)) {
                fdtab[fd].state &= ~FD_EV_ACTIVE_W;
                fd_update_cache(fd); /* need an update entry to change the 
state */
-       }
-       HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+               /* the FD lock is unlocked by fd_update_cache() */
+       } else
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Disable processing of events on fd <fd> for both directions. */
@@ -293,8 +355,9 @@ static inline void fd_stop_both(int fd)
        if (fd_active(fd)) {
                fdtab[fd].state &= ~FD_EV_ACTIVE_RW;
                fd_update_cache(fd); /* need an update entry to change the 
state */
-       }
-       HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+               /* the FD lock is unlocked by fd_update_cache() */
+       } else
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Report that FD <fd> cannot receive anymore without polling (EAGAIN 
detected). */
@@ -304,8 +367,9 @@ static inline void fd_cant_recv(const int fd)
        if (fd_recv_ready(fd)) {
                fdtab[fd].state &= ~FD_EV_READY_R;
                fd_update_cache(fd); /* need an update entry to change the 
state */
-       }
-       HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+               /* the FD lock is unlocked by fd_update_cache() */
+       } else
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Report that FD <fd> can receive anymore without polling. */
@@ -315,8 +379,9 @@ static inline void fd_may_recv(const int fd)
        if (!fd_recv_ready(fd)) {
                fdtab[fd].state |= FD_EV_READY_R;
                fd_update_cache(fd); /* need an update entry to change the 
state */
-       }
-       HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+               /* the FD lock is unlocked by fd_update_cache() */
+       } else
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Disable readiness when polled. This is useful to interrupt reading when it
@@ -330,8 +395,9 @@ static inline void fd_done_recv(const int fd)
        if (fd_recv_polled(fd) && fd_recv_ready(fd)) {
                fdtab[fd].state &= ~FD_EV_READY_R;
                fd_update_cache(fd); /* need an update entry to change the 
state */
-       }
-       HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+               /* the FD lock is unlocked by fd_update_cache() */
+       } else
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Report that FD <fd> cannot send anymore without polling (EAGAIN detected). 
*/
@@ -341,8 +407,9 @@ static inline void fd_cant_send(const int fd)
        if (fd_send_ready(fd)) {
                fdtab[fd].state &= ~FD_EV_READY_W;
                fd_update_cache(fd); /* need an update entry to change the 
state */
-       }
-       HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+               /* the FD lock is unlocked by fd_update_cache() */
+       } else
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Report that FD <fd> can send anymore without polling (EAGAIN detected). */
@@ -352,8 +419,9 @@ static inline void fd_may_send(const int fd)
        if (!fd_send_ready(fd)) {
                fdtab[fd].state |= FD_EV_READY_W;
                fd_update_cache(fd); /* need an update entry to change the 
state */
-       }
-       HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+               /* the FD lock is unlocked by fd_update_cache() */
+       } else
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Prepare FD <fd> to try to receive */
@@ -363,8 +431,9 @@ static inline void fd_want_recv(int fd)
        if (!fd_recv_active(fd)) {
                fdtab[fd].state |= FD_EV_ACTIVE_R;
                fd_update_cache(fd); /* need an update entry to change the 
state */
-       }
-       HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+               /* the FD lock is unlocked by fd_update_cache() */
+       } else
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Prepare FD <fd> to try to send */
@@ -374,8 +443,9 @@ static inline void fd_want_send(int fd)
        if (!fd_send_active(fd)) {
                fdtab[fd].state |= FD_EV_ACTIVE_W;
                fd_update_cache(fd); /* need an update entry to change the 
state */
-       }
-       HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+               /* the FD lock is unlocked by fd_update_cache() */
+       } else
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Update events seen for FD <fd> and its state if needed. This should be 
called
diff --git a/include/types/fd.h b/include/types/fd.h
index 9f2c5fee..8e34c624 100644
--- a/include/types/fd.h
+++ b/include/types/fd.h
@@ -90,11 +90,24 @@ enum fd_states {
  */
 #define DEAD_FD_MAGIC 0xFDDEADFD
 
+struct fdlist_entry {
+       int next;
+       int prev;
+} __attribute__ ((aligned(8)));
+
+/* head of the fd list */
+struct fdlist {
+       int first;
+       int last;
+} __attribute__ ((aligned(8)));
+
+
 /* info about one given fd */
 struct fdtab {
        __decl_hathreads(HA_SPINLOCK_T lock);
        unsigned long thread_mask;           /* mask of thread IDs authorized 
to process the task */
        unsigned long polled_mask;           /* mask of thread IDs currently 
polling this fd */
+       struct fdlist_entry update;          /* Entry in the global update list 
*/
        unsigned long update_mask;           /* mask of thread IDs having an 
update for fd */
        void (*iocb)(int fd);                /* I/O handler */
        void *owner;                         /* the connection or listener 
associated with this fd, NULL if closed */
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index 124b8163..adc15acd 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -59,13 +59,51 @@ REGPRM1 static void __fd_clo(int fd)
        }
 }
 
+static void _update_fd(int fd)
+{
+       int en, opcode;
+
+       en = fdtab[fd].state;
+
+       if (fdtab[fd].polled_mask & tid_bit) {
+               if (!(fdtab[fd].thread_mask & tid_bit) || !(en & 
FD_EV_POLLED_RW)) {
+                       /* fd removed from poll list */
+                       opcode = EPOLL_CTL_DEL;
+                       HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+               }
+               else {
+                       /* fd status changed */
+                       opcode = EPOLL_CTL_MOD;
+               }
+       }
+       else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
+               /* new fd in the poll list */
+               opcode = EPOLL_CTL_ADD;
+               HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+       }
+       else {
+               return;
+       }
+
+       /* construct the epoll events based on new state */
+       ev.events = 0;
+       if (en & FD_EV_POLLED_R)
+               ev.events |= EPOLLIN | EPOLLRDHUP;
+
+       if (en & FD_EV_POLLED_W)
+               ev.events |= EPOLLOUT;
+
+       ev.data.fd = fd;
+       epoll_ctl(epoll_fd[tid], opcode, fd, &ev);
+}
+
 /*
  * Linux epoll() poller
  */
 REGPRM2 static void _do_poll(struct poller *p, int exp)
 {
        int status, eo, en;
-       int fd, opcode;
+       int fd;
        int count;
        int updt_idx;
        int wait_time;
@@ -89,39 +127,31 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                en = fd_compute_new_polled_status(eo);
                fdtab[fd].state = en;
                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
-
-               if (fdtab[fd].polled_mask & tid_bit) {
-                       if (!(fdtab[fd].thread_mask & tid_bit) || !(en & 
FD_EV_POLLED_RW)) {
-                               /* fd removed from poll list */
-                               opcode = EPOLL_CTL_DEL;
-                               HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
-                       }
-                       else {
-                               /* fd status changed */
-                               opcode = EPOLL_CTL_MOD;
-                       }
-               }
-               else if ((fdtab[fd].thread_mask & tid_bit) && (en & 
FD_EV_POLLED_RW)) {
-                       /* new fd in the poll list */
-                       opcode = EPOLL_CTL_ADD;
-                       HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
-               }
+               _update_fd(fd);
+       }
+       fd_nbupdt = 0;
+       /* Scan the global update list */
+       HA_SPIN_LOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+       for (fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+               if (fdtab[fd].update_mask & tid_bit)
+                       done_update_polling(fd);
                else {
+                       HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
                        continue;
                }
+               fdtab[fd].new = 0;
 
-               /* construct the epoll events based on new state */
-               ev.events = 0;
-               if (en & FD_EV_POLLED_R)
-                       ev.events |= EPOLLIN | EPOLLRDHUP;
-
-               if (en & FD_EV_POLLED_W)
-                       ev.events |= EPOLLOUT;
+               eo = fdtab[fd].state;
+               en = fd_compute_new_polled_status(eo);
+               fdtab[fd].state = en;
 
-               ev.data.fd = fd;
-               epoll_ctl(epoll_fd[tid], opcode, fd, &ev);
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+               if (!fdtab[fd].owner)
+                       continue;
+               _update_fd(fd);
        }
-       fd_nbupdt = 0;
+       HA_SPIN_UNLOCK(FD_UPDATE_LOCK, &fd_updt_lock);
 
        /* compute the epoll_wait() timeout */
        if (!exp)
@@ -208,8 +238,10 @@ static int init_epoll_per_thread()
         * fd for this thread. Let's just mark them as updated, the poller will
         * do the rest.
         */
-       for (fd = 0; fd < maxfd; fd++)
+       for (fd = 0; fd < maxfd; fd++) {
+               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
                updt_fd_polling(fd);
+       }
 
        return 1;
  fail_fd:
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index 8cd6dd84..642de8b3 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -33,6 +33,41 @@ static int kqueue_fd[MAX_THREADS]; // per-thread kqueue_fd
 static THREAD_LOCAL struct kevent *kev = NULL;
 static struct kevent *kev_out = NULL; // Trash buffer for kevent() to write 
the eventlist in
 
+static int _update_fd(int fd, int start)
+{
+       int en;
+       int changes = start;
+
+       en = fdtab[fd].state;
+
+       if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
+               if (!(fdtab[fd].polled_mask & tid_bit)) {
+                       /* fd was not watched, it's still not */
+                       return 0;
+               }
+               /* fd totally removed from poll list */
+               EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+               EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, 
NULL);
+               HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+       }
+       else {
+               /* OK fd has to be monitored, it was either added or changed */
+
+               if (en & FD_EV_POLLED_R)
+                       EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, 
NULL);
+               else if (fdtab[fd].polled_mask & tid_bit)
+                       EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 
0, NULL);
+
+               if (en & FD_EV_POLLED_W)
+                       EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, 
NULL);
+               else if (fdtab[fd].polled_mask & tid_bit)
+                       EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 
0, NULL);
+
+               HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+       }
+       return changes;
+}
+
 /*
  * kqueue() poller
  */
@@ -66,32 +101,32 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                fdtab[fd].state = en;
                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 
-               if (!(fdtab[fd].thread_mask & tid_bit) || !(en & 
FD_EV_POLLED_RW)) {
-                       if (!(fdtab[fd].polled_mask & tid_bit)) {
-                               /* fd was not watched, it's still not */
-                               continue;
-                       }
-                       /* fd totally removed from poll list */
-                       EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 
0, NULL);
-                       EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 
0, NULL);
-                       HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
-               }
-               else {
-                       /* OK fd has to be monitored, it was either added or 
changed */
+               changes = _update_fd(fd, changes);
+       }
 
-                       if (en & FD_EV_POLLED_R)
-                               EV_SET(&kev[changes++], fd, EVFILT_READ, 
EV_ADD, 0, 0, NULL);
-                       else if (fdtab[fd].polled_mask & tid_bit)
-                               EV_SET(&kev[changes++], fd, EVFILT_READ, 
EV_DELETE, 0, 0, NULL);
+       /* Scan the global update list */
+       HA_SPIN_LOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+       for (fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+               if (fdtab[fd].update_mask & tid_bit)
+                       done_update_polling(fd);
+               else {
+                       HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+                       continue;
+               }
+               fdtab[fd].new = 0;
 
-                       if (en & FD_EV_POLLED_W)
-                               EV_SET(&kev[changes++], fd, EVFILT_WRITE, 
EV_ADD, 0, 0, NULL);
-                       else if (fdtab[fd].polled_mask & tid_bit)
-                               EV_SET(&kev[changes++], fd, EVFILT_WRITE, 
EV_DELETE, 0, 0, NULL);
+               eo = fdtab[fd].state;
+               en = fd_compute_new_polled_status(eo);
+               fdtab[fd].state = en;
 
-                       HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
-               }
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+               if (!fdtab[fd].owner)
+                       continue;
+               changes = _update_fd(fd, changes);
        }
+       HA_SPIN_UNLOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+
        if (changes) {
 #ifdef EV_RECEIPT
                kev[0].flags |= EV_RECEIPT;
@@ -189,8 +224,10 @@ static int init_kqueue_per_thread()
         * fd for this thread. Let's just mark them as updated, the poller will
         * do the rest.
         */
-       for (fd = 0; fd < maxfd; fd++)
+       for (fd = 0; fd < maxfd; fd++) {
+               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
                updt_fd_polling(fd);
+       }
 
        return 1;
  fail_fd:
diff --git a/src/ev_poll.c b/src/ev_poll.c
index b7cc0bb3..c913ced2 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -104,6 +104,51 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                        HA_SPIN_UNLOCK(POLL_LOCK, &poll_lock);
                }
        }
+       HA_SPIN_LOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+       for (fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+               if (fdtab[fd].update_mask & tid_bit) {
+                       /* Cheat a bit, as the state is global to all pollers
+                        * we don't need every thread ot take care of the
+                        * update.
+                        */
+                       fdtab[fd].update_mask &= ~all_threads_mask;
+                       done_update_polling(fd);
+               } else {
+                       HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+                       continue;
+               }
+
+               if (!fdtab[fd].owner) {
+                       activity[tid].poll_drop++;
+                       HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+                       continue;
+               }
+
+               fdtab[fd].new = 0;
+
+               eo = fdtab[fd].state;
+               en = fd_compute_new_polled_status(eo);
+               fdtab[fd].state = en;
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+
+               if ((eo ^ en) & FD_EV_POLLED_RW) {
+                       /* poll status changed, update the lists */
+                       HA_SPIN_LOCK(POLL_LOCK, &poll_lock);
+                       if ((eo & ~en) & FD_EV_POLLED_R)
+                               hap_fd_clr(fd, fd_evts[DIR_RD]);
+                       else if ((en & ~eo) & FD_EV_POLLED_R)
+                               hap_fd_set(fd, fd_evts[DIR_RD]);
+
+                       if ((eo & ~en) & FD_EV_POLLED_W)
+                               hap_fd_clr(fd, fd_evts[DIR_WR]);
+                       else if ((en & ~eo) & FD_EV_POLLED_W)
+                               hap_fd_set(fd, fd_evts[DIR_WR]);
+                       HA_SPIN_UNLOCK(POLL_LOCK, &poll_lock);
+               }
+
+       }
+       HA_SPIN_UNLOCK(FD_UPDATE_LOCK, &fd_updt_lock);
        fd_nbupdt = 0;
 
        nbfd = 0;
diff --git a/src/ev_select.c b/src/ev_select.c
index 5f3486ed..bde923ea 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -70,7 +70,42 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                en = fd_compute_new_polled_status(eo);
                fdtab[fd].state = en;
                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+               if ((eo ^ en) & FD_EV_POLLED_RW) {
+                       /* poll status changed, update the lists */
+                       HA_SPIN_LOCK(POLL_LOCK, &poll_lock);
+                       if ((eo & ~en) & FD_EV_POLLED_R)
+                               FD_CLR(fd, fd_evts[DIR_RD]);
+                       else if ((en & ~eo) & FD_EV_POLLED_R)
+                               FD_SET(fd, fd_evts[DIR_RD]);
+
+                       if ((eo & ~en) & FD_EV_POLLED_W)
+                               FD_CLR(fd, fd_evts[DIR_WR]);
+                       else if ((en & ~eo) & FD_EV_POLLED_W)
+                               FD_SET(fd, fd_evts[DIR_WR]);
+                       HA_SPIN_UNLOCK(POLL_LOCK, &poll_lock);
+               }
+       }
+       HA_SPIN_LOCK(FD_UPDATE_LOCK, &fd_updt_lock);
+       for (fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+               HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+               if (fdtab[fd].update_mask & tid_bit) {
+                       /* Cheat a bit, as the state is global to all pollers
+                        * we don't need every thread ot take care of the
+                        * update.
+                        */
+                       fdtab[fd].update_mask &= ~all_threads_mask;
+                       done_update_polling(fd);
+               } else {
+                       HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+                       continue;
+               }
 
+               fdtab[fd].new = 0;
+
+               eo = fdtab[fd].state;
+               en = fd_compute_new_polled_status(eo);
+               fdtab[fd].state = en;
+               HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
                if ((eo ^ en) & FD_EV_POLLED_RW) {
                        /* poll status changed, update the lists */
                        HA_SPIN_LOCK(POLL_LOCK, &poll_lock);
@@ -85,7 +120,9 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                                FD_SET(fd, fd_evts[DIR_WR]);
                        HA_SPIN_UNLOCK(POLL_LOCK, &poll_lock);
                }
+
        }
+       HA_SPIN_UNLOCK(FD_UPDATE_LOCK, &fd_updt_lock);
        fd_nbupdt = 0;
 
        /* let's restore fdset state */
diff --git a/src/fd.c b/src/fd.c
index b64130ed..a134e93e 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -175,9 +175,12 @@ unsigned long fd_cache_mask = 0; // Mask of threads with 
events in the cache
 THREAD_LOCAL int *fd_updt  = NULL;  // FD updates list
 THREAD_LOCAL int  fd_nbupdt = 0;   // number of updates in the list
 
+struct fdlist update_list; // Global update list
 __decl_hathreads(HA_SPINLOCK_T fdtab_lock);       /* global lock to protect 
fdtab array */
 __decl_hathreads(HA_RWLOCK_T   fdcache_lock);     /* global lock to protect 
fd_cache array */
 __decl_hathreads(HA_SPINLOCK_T poll_lock);        /* global lock to protect 
poll info */
+__decl_hathreads(HA_SPINLOCK_T) fd_updt_lock;     /* global lock to protect 
the update list */
+
 
 /* Deletes an FD from the fdsets, and recomputes the maxfd limit.
  * The file descriptor is also closed.
@@ -341,6 +344,9 @@ int init_pollers()
        HA_SPIN_INIT(&fdtab_lock);
        HA_RWLOCK_INIT(&fdcache_lock);
        HA_SPIN_INIT(&poll_lock);
+       HA_SPIN_INIT(&fd_updt_lock);
+       update_list.first = update_list.last = -1;
+
        do {
                bp = NULL;
                for (p = 0; p < nbpollers; p++)
diff --git a/src/hathreads.c b/src/hathreads.c
index daf226ce..5d90402b 100644
--- a/src/hathreads.c
+++ b/src/hathreads.c
@@ -31,7 +31,7 @@ void thread_sync_io_handler(int fd)
 static HA_SPINLOCK_T sync_lock;
 static int           threads_sync_pipe[2];
 static unsigned long threads_want_sync = 0;
-static unsigned long all_threads_mask  = 0;
+unsigned long all_threads_mask  = 0;
 
 #if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
 struct lock_stat lock_stats[LOCK_LABELS];
-- 
2.14.3

Reply via email to