Hi,

When the code was changed to use one poller per thread, we overlooked the
fact that some fds can be shared between multiple threads, and when one
event occured, that required the fd to be added or removed from the poller,
the information would be sent only to the current thread, the other threads
may totally miss it, and thus either miss events, or report spurious events.
The attached patches is an attempt at fixing this, by adding a new global
update list, in addition to the local thread update list.

This can't be applied to 1.8, as it uses code that was not, and probably won't
be, backported, so a different patch, similar in spirit, will be developed.

Regards,

Olivier
>From 7ae6ae7215984deb4487391201e3b0f99a072c4b Mon Sep 17 00:00:00 2001
From: Olivier Houchard <ohouch...@haproxy.com>
Date: Wed, 25 Apr 2018 15:10:30 +0200
Subject: [PATCH 1/4] MINOR: fd: Make the lockless fd list work with multiple
 lists.

Modify fd_add_to_fd_list() and fd_rm_from_fd_list() so that they take an
offset in the fdtab to the list entry, instead of hardcoding the fd cache,
so we can use them with other lists.
---
 include/proto/fd.h | 12 ++++++------
 src/fd.c           | 55 +++++++++++++++++++++++++++++-------------------------
 2 files changed, 36 insertions(+), 31 deletions(-)

diff --git a/include/proto/fd.h b/include/proto/fd.h
index 6c9cfe701..543a42007 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -93,8 +93,8 @@ void run_poller();
  */
 void fd_process_cached_events();
 
-void fd_add_to_fd_list(volatile struct fdlist *list, int fd);
-void fd_rm_from_fd_list(volatile struct fdlist *list, int fd);
+void fd_add_to_fd_list(volatile struct fdlist *list, int fd, int off);
+void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off);
 
 /* Mark fd <fd> as updated for polling and allocate an entry in the update list
  * for this if it was not already there. This can be done at any time.
@@ -119,9 +119,9 @@ static inline void fd_alloc_cache_entry(const int fd)
 {
        HA_ATOMIC_OR(&fd_cache_mask, fdtab[fd].thread_mask);
        if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
-               
fd_add_to_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd);
+               
fd_add_to_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd,  
offsetof(struct fdtab, cache));
        else
-               fd_add_to_fd_list(&fd_cache, fd);
+               fd_add_to_fd_list(&fd_cache, fd,  offsetof(struct fdtab, 
cache));
 }
 
 /* Removes entry used by fd <fd> from the FD cache and replaces it with the
@@ -131,9 +131,9 @@ static inline void fd_alloc_cache_entry(const int fd)
 static inline void fd_release_cache_entry(const int fd)
 {
        if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
-               
fd_rm_from_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd);
+               
fd_rm_from_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd, 
offsetof(struct fdtab, cache));
        else
-               fd_rm_from_fd_list(&fd_cache, fd);
+               fd_rm_from_fd_list(&fd_cache, fd, offsetof(struct fdtab, 
cache));
 }
 
 /* This function automatically enables/disables caching for an entry depending
diff --git a/src/fd.c b/src/fd.c
index 1af64e543..cbf22bd22 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -175,8 +175,10 @@ unsigned long fd_cache_mask = 0; // Mask of threads with 
events in the cache
 THREAD_LOCAL int *fd_updt  = NULL;  // FD updates list
 THREAD_LOCAL int  fd_nbupdt = 0;   // number of updates in the list
 
+#define _FD_NEXT(fd, off) ((struct fdlist_entry *)(void *)((char 
*)(&fdtab[fd]) + off))->next
+#define _FD_PREV(fd, off) ((struct fdlist_entry *)(void *)((char 
*)(&fdtab[fd]) + off))->prev
 /* adds fd <fd> to fd list <list> if it was not yet in it */
-void fd_add_to_fd_list(volatile struct fdlist *list, int fd)
+void fd_add_to_fd_list(volatile struct fdlist *list, int fd, int off)
 {
        int next;
        int new;
@@ -184,11 +186,11 @@ void fd_add_to_fd_list(volatile struct fdlist *list, int 
fd)
        int last;
 
 redo_next:
-       next = fdtab[fd].cache.next;
+       next = _FD_NEXT(fd, off);
        /* Check that we're not already in the cache, and if not, lock us. */
        if (next >= -2)
                goto done;
-       if (!HA_ATOMIC_CAS(&fdtab[fd].cache.next, &next, -2))
+       if (!HA_ATOMIC_CAS(&_FD_NEXT(fd, off), &next, -2))
                goto redo_next;
        __ha_barrier_store();
 
@@ -198,7 +200,7 @@ redo_last:
        last = list->last;
        old = -1;
 
-       fdtab[fd].cache.prev = -2;
+       _FD_PREV(fd, off) = -2;
        /* Make sure the "prev" store is visible before we update the last 
entry */
        __ha_barrier_store();
 
@@ -214,7 +216,7 @@ redo_last:
                 * The CAS will only succeed if its next is -1,
                 * which means it's in the cache, and the last element.
                 */
-               if (unlikely(!HA_ATOMIC_CAS(&fdtab[last].cache.next, &old, 
new)))
+               if (unlikely(!HA_ATOMIC_CAS(&_FD_NEXT(last, off), &old, new)))
                        goto redo_last;
 
                /* Then, update the last entry */
@@ -224,15 +226,15 @@ redo_last:
        /* since we're alone at the end of the list and still locked(-2),
         * we know noone tried to add past us. Mark the end of list.
         */
-       fdtab[fd].cache.prev = last;
-       fdtab[fd].cache.next = -1;
+       _FD_PREV(fd, off) = last;
+       _FD_NEXT(fd, off) = -1;
        __ha_barrier_store();
 done:
        return;
 }
 
 /* removes fd <fd> from fd list <list> */
-void fd_rm_from_fd_list(volatile struct fdlist *list, int fd)
+void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off)
 {
 #if defined(HA_HAVE_CAS_DW) || defined(HA_CAS_IS_8B)
        volatile struct fdlist_entry cur_list, next_list;
@@ -246,7 +248,7 @@ void fd_rm_from_fd_list(volatile struct fdlist *list, int 
fd)
 lock_self:
 #if (defined(HA_CAS_IS_8B) || defined(HA_HAVE_CAS_DW))
        next_list.next = next_list.prev = -2;
-       cur_list = fdtab[fd].cache;
+       cur_list = *(volatile struct fdlist_entry *)(((char *)&fdtab[fd]) + 
off);
        /* First, attempt to lock our own entries */
        do {
                /* The FD is not in the FD cache, give up */
@@ -256,9 +258,9 @@ lock_self:
                        goto lock_self;
        } while (
 #ifdef HA_CAS_IS_8B
-           unlikely(!HA_ATOMIC_CAS(((void **)(void *)&fdtab[fd].cache.next), 
((void **)(void *)&cur_list), (*(void **)(void *)&next_list))))
+           unlikely(!HA_ATOMIC_CAS(((void **)(void *)&_FD_NEXT(fd, off)), 
((void **)(void *)&cur_list), (*(void **)(void *)&next_list))))
 #else
-           unlikely(!__ha_cas_dw((void *)&fdtab[fd].cache.next, (void 
*)&cur_list, (void *)&next_list)))
+           unlikely(!__ha_cas_dw((void *)&_FD_NEXT(fd, off), (void 
*)&cur_list, (void *)&next_list)))
 #endif
            ;
        next = cur_list.next;
@@ -266,18 +268,18 @@ lock_self:
 
 #else
 lock_self_next:
-       next = ({ volatile int *next = &fdtab[fd].cache.next; *next; });
+       next = ({ volatile int *next = &_FD_NEXT(fd, off); *next; });
        if (next == -2)
                goto lock_self_next;
        if (next <= -3)
                goto done;
-       if (unlikely(!HA_ATOMIC_CAS(&fdtab[fd].cache.next, &next, -2)))
+       if (unlikely(!HA_ATOMIC_CAS(&_FD_NEXT(fd, off), &next, -2)))
                goto lock_self_next;
 lock_self_prev:
-       prev = ({ volatile int *prev = &fdtab[fd].cache.prev; *prev; });
+       prev = ({ volatile int *prev = &_FD_PREV(fd, off); *prev; });
        if (prev == -2)
                goto lock_self_prev;
-       if (unlikely(!HA_ATOMIC_CAS(&fdtab[fd].cache.prev, &prev, -2)))
+       if (unlikely(!HA_ATOMIC_CAS(&_FD_PREV(fd, off), &prev, -2)))
                goto lock_self_prev;
 #endif
        __ha_barrier_store();
@@ -287,14 +289,14 @@ lock_self_prev:
 redo_prev:
                old = fd;
 
-               if (unlikely(!HA_ATOMIC_CAS(&fdtab[prev].cache.next, &old, 
new))) {
+               if (unlikely(!HA_ATOMIC_CAS(&_FD_NEXT(prev, off), &old, new))) {
                        if (unlikely(old == -2)) {
                                /* Neighbour already locked, give up and
                                 * retry again once he's done
                                 */
-                               fdtab[fd].cache.prev = prev;
+                               _FD_PREV(fd, off) = prev;
                                __ha_barrier_store();
-                               fdtab[fd].cache.next = next;
+                               _FD_NEXT(fd, off) = next;
                                __ha_barrier_store();
                                goto lock_self;
                        }
@@ -304,18 +306,18 @@ redo_prev:
        if (likely(next != -1)) {
 redo_next:
                old = fd;
-               if (unlikely(!HA_ATOMIC_CAS(&fdtab[next].cache.prev, &old, 
new))) {
+               if (unlikely(!HA_ATOMIC_CAS(&_FD_PREV(next, off), &old, new))) {
                        if (unlikely(old == -2)) {
                                /* Neighbour already locked, give up and
                                 * retry again once he's done
                                 */
                                if (prev != -1) {
-                                       fdtab[prev].cache.next = fd;
+                                       _FD_NEXT(prev, off) = fd;
                                        __ha_barrier_store();
                                }
-                               fdtab[fd].cache.prev = prev;
+                               _FD_PREV(fd, off) = prev;
                                __ha_barrier_store();
-                               fdtab[fd].cache.next = next;
+                               _FD_NEXT(fd, off) = next;
                                __ha_barrier_store();
                                goto lock_self;
                        }
@@ -333,18 +335,21 @@ redo_next:
         */
        __ha_barrier_store();
        if (likely(prev != -1))
-               fdtab[prev].cache.next = next;
+               _FD_NEXT(prev, off) = next;
        __ha_barrier_store();
        if (likely(next != -1))
-               fdtab[next].cache.prev = prev;
+               _FD_PREV(next, off) = prev;
        __ha_barrier_store();
        /* Ok, now we're out of the fd cache */
-       fdtab[fd].cache.next = -(next + 4);
+       _FD_NEXT(fd, off) = -(next + 4);
        __ha_barrier_store();
 done:
        return;
 }
 
+#undef _FD_NEXT
+#undef _FD_PREV
+
 /* Deletes an FD from the fdsets.
  * The file descriptor is also closed.
  */
-- 
2.14.3

>From bc11330fd8ef0fb6edccc58810da02c5fac16ee2 Mon Sep 17 00:00:00 2001
From: Olivier Houchard <ohouch...@haproxy.com>
Date: Wed, 25 Apr 2018 16:58:25 +0200
Subject: [PATCH 2/4] BUG/MEDIUM: pollers: Use a global list for fd shared
 between threads.

With the old model, any fd shared by multiple threads, such as listeners
or dns sockets, would only be updated on one threads, so that could lead
to missed event, or spurious wakeups.
To avoid this, add a global list for fd that are shared, using the same
implementation as the fd cache, and only remove entries from this list
when every thread as updated its poller.
---
 include/common/hathreads.h |  2 +
 include/proto/fd.h         | 54 +++++++++++++++++++++++---
 include/types/fd.h         |  1 +
 src/ev_epoll.c             | 92 +++++++++++++++++++++++++++----------------
 src/ev_kqueue.c            | 84 ++++++++++++++++++++++++++-------------
 src/ev_poll.c              | 95 ++++++++++++++++++++++++++++++---------------
 src/ev_select.c            | 97 ++++++++++++++++++++++++++++++----------------
 src/fd.c                   |  5 ++-
 src/hathreads.c            |  2 +-
 9 files changed, 297 insertions(+), 135 deletions(-)

diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 0f10b48ca..e27ecc63f 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -256,6 +256,8 @@ void thread_exit_sync(void);
 int  thread_no_sync(void);
 int  thread_need_sync(void);
 
+extern unsigned long all_threads_mask;
+
 #if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
 
 /* WARNING!!! if you update this enum, please also keep lock_label() up to 
date below */
diff --git a/include/proto/fd.h b/include/proto/fd.h
index 543a42007..a88f7ae64 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -36,6 +36,8 @@
 extern volatile struct fdlist fd_cache;
 extern volatile struct fdlist fd_cache_local[MAX_THREADS];
 
+extern volatile struct fdlist update_list;
+
 extern unsigned long fd_cache_mask; // Mask of threads with events in the cache
 
 extern THREAD_LOCAL int *fd_updt;  // FD updates list
@@ -101,15 +103,55 @@ void fd_rm_from_fd_list(volatile struct fdlist *list, int 
fd, int off);
  */
 static inline void updt_fd_polling(const int fd)
 {
-       unsigned int oldupdt;
+       if (fdtab[fd].thread_mask == tid_bit) {
+               unsigned int oldupdt;
+
+               /* note: we don't have a test-and-set yet in hathreads */
+
+               if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+                       return;
+
+               oldupdt = HA_ATOMIC_ADD(&fd_nbupdt, 1) - 1;
+               fd_updt[oldupdt] = fd;
+       } else {
+               unsigned long update_mask = fdtab[fd].update_mask;
+               do {
+                       if (update_mask == fdtab[fd].thread_mask)
+                               return;
+               } while (!HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask,
+                   fdtab[fd].thread_mask));
+               fd_add_to_fd_list(&update_list, fd, offsetof(struct fdtab, 
update));
+       }
 
-       /* note: we don't have a test-and-set yet in hathreads */
+}
 
-       if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
-               return;
+/* Called from the poller to acknoledge we read an entry from the global
+ * update list, to remove our bit from the update_mask, and remove it from
+ * the list if we were the last one.
+ */
+static inline void done_update_polling(int fd)
+{
+       unsigned long update_mask;
+
+       update_mask = HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
+       while ((update_mask & all_threads_mask) == 0) {
+               /* If we were the last one that had to update that entry, 
remove it from the list */
+               fd_rm_from_fd_list(&update_list, fd, offsetof(struct fdtab, 
update));
+               update_mask = (volatile unsigned long)fdtab[fd].update_mask;
+               if ((update_mask & all_threads_mask) != 0) {
+                       /* Maybe it's been re-updated in the meanwhile, and we
+                        * wrongly removed it from the list, if so, re-add it
+                        */
+                       fd_add_to_fd_list(&update_list, fd, offsetof(struct 
fdtab, update));
+                       update_mask = (volatile unsigned 
long)(fdtab[fd].update_mask);
+                       /* And then check again, just in case after all it
+                        * should be removed, even if it's very unlikely, given
+                        * the current thread wouldn't have been able to take
+                        * care of it yet */
+               } else
+                       break;
 
-       oldupdt = HA_ATOMIC_ADD(&fd_nbupdt, 1) - 1;
-       fd_updt[oldupdt] = fd;
+       }
 }
 
 /* Allocates a cache entry for a file descriptor if it does not yet have one.
diff --git a/include/types/fd.h b/include/types/fd.h
index 0902e7fc4..aa18ebefc 100644
--- a/include/types/fd.h
+++ b/include/types/fd.h
@@ -117,6 +117,7 @@ struct fdtab {
        unsigned long polled_mask;           /* mask of thread IDs currently 
polling this fd */
        unsigned long update_mask;           /* mask of thread IDs having an 
update for fd */
        struct fdlist_entry cache;           /* Entry in the fdcache */
+       struct fdlist_entry update;          /* Entry in the global update list 
*/
        void (*iocb)(int fd);                /* I/O handler */
        void *owner;                         /* the connection or listener 
associated with this fd, NULL if closed */
        unsigned char state;                 /* FD state for read and write 
directions (2*3 bits) */
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index a8e57973f..2fec9070d 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -59,16 +59,55 @@ REGPRM1 static void __fd_clo(int fd)
        }
 }
 
+static inline void _update_fd(int fd)
+{
+       int en, opcode;
+
+       en = fdtab[fd].state;
+
+       if (fdtab[fd].polled_mask & tid_bit) {
+               if (!(fdtab[fd].thread_mask & tid_bit) || !(en & 
FD_EV_POLLED_RW)) {
+                       /* fd removed from poll list */
+                       opcode = EPOLL_CTL_DEL;
+                       HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+               }
+               else {
+                       /* fd status changed */
+                       opcode = EPOLL_CTL_MOD;
+               }
+       }
+       else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
+               /* new fd in the poll list */
+               opcode = EPOLL_CTL_ADD;
+               HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+       }
+       else {
+               return;
+       }
+
+       /* construct the epoll events based on new state */
+       ev.events = 0;
+       if (en & FD_EV_POLLED_R)
+               ev.events |= EPOLLIN | EPOLLRDHUP;
+
+       if (en & FD_EV_POLLED_W)
+               ev.events |= EPOLLOUT;
+
+       ev.data.fd = fd;
+       epoll_ctl(epoll_fd[tid], opcode, fd, &ev);
+}
+
 /*
  * Linux epoll() poller
  */
 REGPRM2 static void _do_poll(struct poller *p, int exp)
 {
-       int status, en;
-       int fd, opcode;
+       int status;
+       int fd;
        int count;
        int updt_idx;
        int wait_time;
+       int old_fd;
 
        /* first, scan the update list to find polling changes */
        for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
@@ -80,40 +119,27 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                        continue;
                }
 
-               en = fdtab[fd].state;
-
-               if (fdtab[fd].polled_mask & tid_bit) {
-                       if (!(fdtab[fd].thread_mask & tid_bit) || !(en & 
FD_EV_POLLED_RW)) {
-                               /* fd removed from poll list */
-                               opcode = EPOLL_CTL_DEL;
-                               HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
-                       }
-                       else {
-                               /* fd status changed */
-                               opcode = EPOLL_CTL_MOD;
-                       }
-               }
-               else if ((fdtab[fd].thread_mask & tid_bit) && (en & 
FD_EV_POLLED_RW)) {
-                       /* new fd in the poll list */
-                       opcode = EPOLL_CTL_ADD;
-                       HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
-               }
-               else {
+               _update_fd(fd);
+       }
+       fd_nbupdt = 0;
+       /* Scan the global update list */
+       for (old_fd = fd = update_list.first; fd != -1; fd = 
fdtab[fd].update.next) {
+               if (fd == -2) {
+                       fd = old_fd;
                        continue;
                }
-
-               /* construct the epoll events based on new state */
-               ev.events = 0;
-               if (en & FD_EV_POLLED_R)
-                       ev.events |= EPOLLIN | EPOLLRDHUP;
-
-               if (en & FD_EV_POLLED_W)
-                       ev.events |= EPOLLOUT;
-
-               ev.data.fd = fd;
-               epoll_ctl(epoll_fd[tid], opcode, fd, &ev);
+               else if (fd <= -3)
+                       fd = -fd -4;
+               if (fd == -1)
+                       break;
+               if (fdtab[fd].update_mask & tid_bit)
+                       done_update_polling(fd);
+               else
+                       continue;
+               if (!fdtab[fd].owner)
+                       continue;
+               _update_fd(fd);
        }
-       fd_nbupdt = 0;
 
        /* compute the epoll_wait() timeout */
        if (!exp)
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index ebfd5d210..6392dbfdd 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -33,6 +33,41 @@ static int kqueue_fd[MAX_THREADS]; // per-thread kqueue_fd
 static THREAD_LOCAL struct kevent *kev = NULL;
 static struct kevent *kev_out = NULL; // Trash buffer for kevent() to write 
the eventlist in
 
+static inline int _update_fd(int fd)
+{
+       int en;
+       int changes = 0;
+
+       en = fdtab[fd].state;
+
+       if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
+               if (!(fdtab[fd].polled_mask & tid_bit)) {
+                       /* fd was not watched, it's still not */
+                       return 0;
+               }
+               /* fd totally removed from poll list */
+               EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+               EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, 
NULL);
+               HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+       }
+       else {
+               /* OK fd has to be monitored, it was either added or changed */
+
+               if (en & FD_EV_POLLED_R)
+                       EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, 
NULL);
+               else if (fdtab[fd].polled_mask & tid_bit)
+                       EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 
0, NULL);
+
+               if (en & FD_EV_POLLED_W)
+                       EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, 
NULL);
+               else if (fdtab[fd].polled_mask & tid_bit)
+                       EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 
0, NULL);
+
+               HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+       }
+       return changes;
+}
+
 /*
  * kqueue() poller
  */
@@ -41,8 +76,9 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
        int status;
        int count, fd, delta_ms;
        struct timespec timeout;
-       int updt_idx, en;
+       int updt_idx;
        int changes = 0;
+       int old_fd;
 
        timeout.tv_sec  = 0;
        timeout.tv_nsec = 0;
@@ -55,35 +91,27 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                        activity[tid].poll_drop++;
                        continue;
                }
-
-               en = fdtab[fd].state;
-
-               if (!(fdtab[fd].thread_mask & tid_bit) || !(en & 
FD_EV_POLLED_RW)) {
-                       if (!(fdtab[fd].polled_mask & tid_bit)) {
-                               /* fd was not watched, it's still not */
-                               continue;
-                       }
-                       /* fd totally removed from poll list */
-                       EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 
0, NULL);
-                       EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 
0, NULL);
-                       HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
-               }
-               else {
-                       /* OK fd has to be monitored, it was either added or 
changed */
-
-                       if (en & FD_EV_POLLED_R)
-                               EV_SET(&kev[changes++], fd, EVFILT_READ, 
EV_ADD, 0, 0, NULL);
-                       else if (fdtab[fd].polled_mask & tid_bit)
-                               EV_SET(&kev[changes++], fd, EVFILT_READ, 
EV_DELETE, 0, 0, NULL);
-
-                       if (en & FD_EV_POLLED_W)
-                               EV_SET(&kev[changes++], fd, EVFILT_WRITE, 
EV_ADD, 0, 0, NULL);
-                       else if (fdtab[fd].polled_mask & tid_bit)
-                               EV_SET(&kev[changes++], fd, EVFILT_WRITE, 
EV_DELETE, 0, 0, NULL);
-
-                       HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+               changes += _update_fd(fd);
+       }
+       /* Scan the global update list */
+       for (old_fd = fd = update_list.first; fd != -1; fd = 
fdtab[fd].update.next) {
+               if (fd == -2) {
+                       fd = old_fd;
+                       continue;
                }
+               else if (fd <= -3)
+                       fd = -fd -4;
+               if (fd == -1)
+                       break;
+               if (fdtab[fd].update_mask & tid_bit)
+                       done_update_polling(fd);
+               else
+                       continue;
+               if (!fdtab[fd].owner)
+                       continue;
+               changes += _update_fd(fd);
        }
+
        if (changes) {
 #ifdef EV_RECEIPT
                kev[0].flags |= EV_RECEIPT;
diff --git a/src/ev_poll.c b/src/ev_poll.c
index 6093b652b..2f10fe9ae 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -45,6 +45,44 @@ REGPRM1 static void __fd_clo(int fd)
        hap_fd_clr(fd, fd_evts[DIR_WR]);
 }
 
+static inline void _update_fd(int fd, int *max_add_fd)
+{
+       int en;
+
+       en = fdtab[fd].state;
+
+       /* we have a single state for all threads, which is why we
+        * don't check the tid_bit. First thread to see the update
+        * takes it for every other one.
+        */
+       if (!(en & FD_EV_POLLED_RW)) {
+               if (!fdtab[fd].polled_mask) {
+                       /* fd was not watched, it's still not */
+                       return;
+               }
+               /* fd totally removed from poll list */
+               hap_fd_clr(fd, fd_evts[DIR_RD]);
+               hap_fd_clr(fd, fd_evts[DIR_WR]);
+               HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
+       }
+       else {
+               /* OK fd has to be monitored, it was either added or changed */
+               if (!(en & FD_EV_POLLED_R))
+                       hap_fd_clr(fd, fd_evts[DIR_RD]);
+               else
+                       hap_fd_set(fd, fd_evts[DIR_RD]);
+
+               if (!(en & FD_EV_POLLED_W))
+                       hap_fd_clr(fd, fd_evts[DIR_WR]);
+               else
+                       hap_fd_set(fd, fd_evts[DIR_WR]);
+
+               HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+               if (fd > *max_add_fd)
+                       *max_add_fd = fd;
+       }
+}
+
 /*
  * Poll() poller
  */
@@ -53,11 +91,12 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
        int status;
        int fd;
        int wait_time;
-       int updt_idx, en;
+       int updt_idx;
        int fds, count;
        int sr, sw;
        int old_maxfd, new_maxfd, max_add_fd;
        unsigned rn, wn; /* read new, write new */
+       int old_fd;
 
        max_add_fd = -1;
 
@@ -70,39 +109,31 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                        activity[tid].poll_drop++;
                        continue;
                }
+               _update_fd(fd, &max_add_fd);
+       }
 
-               en = fdtab[fd].state;
-
-               /* we have a single state for all threads, which is why we
-                * don't check the tid_bit. First thread to see the update
-                * takes it for every other one.
-                */
-               if (!(en & FD_EV_POLLED_RW)) {
-                       if (!fdtab[fd].polled_mask) {
-                               /* fd was not watched, it's still not */
-                               continue;
-                       }
-                       /* fd totally removed from poll list */
-                       hap_fd_clr(fd, fd_evts[DIR_RD]);
-                       hap_fd_clr(fd, fd_evts[DIR_WR]);
-                       HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
-               }
-               else {
-                       /* OK fd has to be monitored, it was either added or 
changed */
-                       if (!(en & FD_EV_POLLED_R))
-                               hap_fd_clr(fd, fd_evts[DIR_RD]);
-                       else
-                               hap_fd_set(fd, fd_evts[DIR_RD]);
-
-                       if (!(en & FD_EV_POLLED_W))
-                               hap_fd_clr(fd, fd_evts[DIR_WR]);
-                       else
-                               hap_fd_set(fd, fd_evts[DIR_WR]);
-
-                       HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
-                       if (fd > max_add_fd)
-                               max_add_fd = fd;
+       /* Now scan the global update list */
+       for (old_fd = fd = update_list.first; fd != -1; fd = 
fdtab[fd].update.next) {
+               if (fd == -2) {
+                       fd = old_fd;
+                       continue;
                }
+               else if (fd <= -3)
+                       fd = -fd -4;
+               if (fd == -1)
+                       break;
+               if (fdtab[fd].update_mask & tid_bit) {
+                       /* Cheat a bit, as the state is global to all pollers
+                        * we don't need every thread ot take care of the
+                        * update.
+                        */
+                       HA_ATOMIC_AND(&fdtab[fd].update_mask, 
~all_threads_mask);
+                       done_update_polling(fd);
+               } else
+                       continue;
+               if (!fdtab[fd].owner)
+                       continue;
+               _update_fd(fd, &max_add_fd);
        }
 
        /* maybe we added at least one fd larger than maxfd */
diff --git a/src/ev_select.c b/src/ev_select.c
index 163a45839..67775ca7e 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -36,6 +36,44 @@ REGPRM1 static void __fd_clo(int fd)
        hap_fd_clr(fd, fd_evts[DIR_WR]);
 }
 
+static inline void _update_fd(int fd, int *max_add_fd)
+{
+       int en;
+
+       en = fdtab[fd].state;
+
+       /* we have a single state for all threads, which is why we
+        * don't check the tid_bit. First thread to see the update
+        * takes it for every other one.
+        */
+       if (!(en & FD_EV_POLLED_RW)) {
+               if (!fdtab[fd].polled_mask) {
+                       /* fd was not watched, it's still not */
+                       return;
+               }
+               /* fd totally removed from poll list */
+               hap_fd_clr(fd, fd_evts[DIR_RD]);
+               hap_fd_clr(fd, fd_evts[DIR_WR]);
+               HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
+       }
+       else {
+               /* OK fd has to be monitored, it was either added or changed */
+               if (!(en & FD_EV_POLLED_R))
+                       hap_fd_clr(fd, fd_evts[DIR_RD]);
+               else
+                       hap_fd_set(fd, fd_evts[DIR_RD]);
+
+               if (!(en & FD_EV_POLLED_W))
+                       hap_fd_clr(fd, fd_evts[DIR_WR]);
+               else
+                       hap_fd_set(fd, fd_evts[DIR_WR]);
+
+               HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+               if (fd > *max_add_fd)
+                       *max_add_fd = fd;
+       }
+}
+
 /*
  * Select() poller
  */
@@ -46,10 +84,11 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
        struct timeval delta;
        int delta_ms;
        int fds;
-       int updt_idx, en;
+       int updt_idx;
        char count;
        int readnotnull, writenotnull;
        int old_maxfd, new_maxfd, max_add_fd;
+       int old_fd;
 
        max_add_fd = -1;
 
@@ -62,41 +101,33 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                        activity[tid].poll_drop++;
                        continue;
                }
-
-               en = fdtab[fd].state;
-
-               /* we have a single state for all threads, which is why we
-                * don't check the tid_bit. First thread to see the update
-                * takes it for every other one.
-                */
-               if (!(en & FD_EV_POLLED_RW)) {
-                       if (!fdtab[fd].polled_mask) {
-                               /* fd was not watched, it's still not */
-                               continue;
-                       }
-                       /* fd totally removed from poll list */
-                       hap_fd_clr(fd, fd_evts[DIR_RD]);
-                       hap_fd_clr(fd, fd_evts[DIR_WR]);
-                       HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
-               }
-               else {
-                       /* OK fd has to be monitored, it was either added or 
changed */
-                       if (!(en & FD_EV_POLLED_R))
-                               hap_fd_clr(fd, fd_evts[DIR_RD]);
-                       else
-                               hap_fd_set(fd, fd_evts[DIR_RD]);
-
-                       if (!(en & FD_EV_POLLED_W))
-                               hap_fd_clr(fd, fd_evts[DIR_WR]);
-                       else
-                               hap_fd_set(fd, fd_evts[DIR_WR]);
-
-                       HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
-                       if (fd > max_add_fd)
-                               max_add_fd = fd;
+               _update_fd(fd, &max_add_fd);
+       }
+       /* Now scan the global update list */
+       for (old_fd = fd = update_list.first; fd != -1; fd = 
fdtab[fd].update.next) {
+               if (fd == -2) {
+                       fd = old_fd;
+                       continue;
                }
+               else if (fd <= -3)
+                       fd = -fd -4;
+               if (fd == -1)
+                       break;
+               if (fdtab[fd].update_mask & tid_bit) {
+                       /* Cheat a bit, as the state is global to all pollers
+                        * we don't need every thread ot take care of the
+                        * update.
+                        */
+                       HA_ATOMIC_AND(&fdtab[fd].update_mask, 
~all_threads_mask);
+                       done_update_polling(fd);
+               } else
+                       continue;
+               if (!fdtab[fd].owner)
+                       continue;
+               _update_fd(fd, &max_add_fd);
        }
 
+
        /* maybe we added at least one fd larger than maxfd */
        for (old_maxfd = maxfd; old_maxfd <= max_add_fd; ) {
                if (HA_ATOMIC_CAS(&maxfd, &old_maxfd, max_add_fd + 1))
diff --git a/src/fd.c b/src/fd.c
index cbf22bd22..508ee098f 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -169,6 +169,7 @@ int nbpollers = 0;
 
 volatile struct fdlist fd_cache ; // FD events cache
 volatile struct fdlist fd_cache_local[MAX_THREADS]; // FD events local for 
each thread
+volatile struct fdlist update_list; // Global update list
 
 unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache
 
@@ -244,7 +245,6 @@ void fd_rm_from_fd_list(volatile struct fdlist *list, int 
fd, int off)
        int prev;
        int next;
        int last;
-
 lock_self:
 #if (defined(HA_CAS_IS_8B) || defined(HA_HAVE_CAS_DW))
        next_list.next = next_list.prev = -2;
@@ -492,6 +492,7 @@ int init_pollers()
                goto fail_info;
 
        fd_cache.first = fd_cache.last = -1;
+       update_list.first = update_list.last = -1;
        hap_register_per_thread_init(init_pollers_per_thread);
        hap_register_per_thread_deinit(deinit_pollers_per_thread);
 
@@ -499,7 +500,7 @@ int init_pollers()
                HA_SPIN_INIT(&fdtab[p].lock);
                /* Mark the fd as out of the fd cache */
                fdtab[p].cache.next = -3;
-               fdtab[p].cache.next = -3;
+               fdtab[p].update.next = -3;
        }
        for (p = 0; p < global.nbthread; p++)
                fd_cache_local[p].first = fd_cache_local[p].last = -1;
diff --git a/src/hathreads.c b/src/hathreads.c
index 0d690f383..5db3c2197 100644
--- a/src/hathreads.c
+++ b/src/hathreads.c
@@ -31,7 +31,7 @@ void thread_sync_io_handler(int fd)
 static HA_SPINLOCK_T sync_lock;
 static int           threads_sync_pipe[2];
 static unsigned long threads_want_sync = 0;
-static unsigned long all_threads_mask  = 0;
+unsigned long all_threads_mask  = 0;
 
 #if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
 struct lock_stat lock_stats[LOCK_LABELS];
-- 
2.14.3

>From 036d5b1c96e70b38764cc69e58741011f5786159 Mon Sep 17 00:00:00 2001
From: Olivier Houchard <ohouch...@haproxy.com>
Date: Thu, 26 Apr 2018 14:23:07 +0200
Subject: [PATCH 3/4] MINOR: pollers: move polled_mask outside of struct fdtab.

The polled_mask is only used in the pollers, and removing it from the
struct fdtab makes it fit in one 64B cacheline again, on a 64bits machine,
so make it a separate array.
---
 include/proto/fd.h |  2 ++
 include/types/fd.h |  1 -
 src/ev_epoll.c     | 10 +++++-----
 src/ev_kqueue.c    | 10 +++++-----
 src/ev_poll.c      |  6 +++---
 src/ev_select.c    |  6 +++---
 src/fd.c           |  8 +++++++-
 7 files changed, 25 insertions(+), 18 deletions(-)

diff --git a/include/proto/fd.h b/include/proto/fd.h
index a88f7ae64..98a58f55d 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -38,6 +38,8 @@ extern volatile struct fdlist fd_cache_local[MAX_THREADS];
 
 extern volatile struct fdlist update_list;
 
+extern unsigned long *polled_mask;
+
 extern unsigned long fd_cache_mask; // Mask of threads with events in the cache
 
 extern THREAD_LOCAL int *fd_updt;  // FD updates list
diff --git a/include/types/fd.h b/include/types/fd.h
index aa18ebefc..5947bafc3 100644
--- a/include/types/fd.h
+++ b/include/types/fd.h
@@ -114,7 +114,6 @@ struct fdlist {
 struct fdtab {
        __decl_hathreads(HA_SPINLOCK_T lock);
        unsigned long thread_mask;           /* mask of thread IDs authorized 
to process the task */
-       unsigned long polled_mask;           /* mask of thread IDs currently 
polling this fd */
        unsigned long update_mask;           /* mask of thread IDs having an 
update for fd */
        struct fdlist_entry cache;           /* Entry in the fdcache */
        struct fdlist_entry update;          /* Entry in the global update list 
*/
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index 2fec9070d..05f1b5ec8 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -50,7 +50,7 @@ static THREAD_LOCAL struct epoll_event ev;
 REGPRM1 static void __fd_clo(int fd)
 {
        if (unlikely(fdtab[fd].cloned)) {
-               unsigned long m = fdtab[fd].polled_mask;
+               unsigned long m = polled_mask[fd];
                int i;
 
                for (i = global.nbthread - 1; i >= 0; i--)
@@ -65,11 +65,11 @@ static inline void _update_fd(int fd)
 
        en = fdtab[fd].state;
 
-       if (fdtab[fd].polled_mask & tid_bit) {
+       if (polled_mask[fd] & tid_bit) {
                if (!(fdtab[fd].thread_mask & tid_bit) || !(en & 
FD_EV_POLLED_RW)) {
                        /* fd removed from poll list */
                        opcode = EPOLL_CTL_DEL;
-                       HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+                       HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit);
                }
                else {
                        /* fd status changed */
@@ -79,7 +79,7 @@ static inline void _update_fd(int fd)
        else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
                /* new fd in the poll list */
                opcode = EPOLL_CTL_ADD;
-               HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+               HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
        }
        else {
                return;
@@ -177,7 +177,7 @@ REGPRM2 static void _do_poll(struct poller *p, int exp)
                        /* FD has been migrated */
                        activity[tid].poll_skip++;
                        epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
-                       HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+                       HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit);
                        continue;
                }
 
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index 6392dbfdd..0fa4e1651 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -41,29 +41,29 @@ static inline int _update_fd(int fd)
        en = fdtab[fd].state;
 
        if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
-               if (!(fdtab[fd].polled_mask & tid_bit)) {
+               if (!(polled_mask[fd] & tid_bit)) {
                        /* fd was not watched, it's still not */
                        return 0;
                }
                /* fd totally removed from poll list */
                EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
                EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, 
NULL);
-               HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+               HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit);
        }
        else {
                /* OK fd has to be monitored, it was either added or changed */
 
                if (en & FD_EV_POLLED_R)
                        EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, 
NULL);
-               else if (fdtab[fd].polled_mask & tid_bit)
+               else if (polled_mask[fd] & tid_bit)
                        EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 
0, NULL);
 
                if (en & FD_EV_POLLED_W)
                        EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, 
NULL);
-               else if (fdtab[fd].polled_mask & tid_bit)
+               else if (polled_mask[fd] & tid_bit)
                        EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 
0, NULL);
 
-               HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+               HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
        }
        return changes;
 }
diff --git a/src/ev_poll.c b/src/ev_poll.c
index 2f10fe9ae..65c92d992 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -56,14 +56,14 @@ static inline void _update_fd(int fd, int *max_add_fd)
         * takes it for every other one.
         */
        if (!(en & FD_EV_POLLED_RW)) {
-               if (!fdtab[fd].polled_mask) {
+               if (!polled_mask[fd]) {
                        /* fd was not watched, it's still not */
                        return;
                }
                /* fd totally removed from poll list */
                hap_fd_clr(fd, fd_evts[DIR_RD]);
                hap_fd_clr(fd, fd_evts[DIR_WR]);
-               HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
+               HA_ATOMIC_AND(&polled_mask[fd], 0);
        }
        else {
                /* OK fd has to be monitored, it was either added or changed */
@@ -77,7 +77,7 @@ static inline void _update_fd(int fd, int *max_add_fd)
                else
                        hap_fd_set(fd, fd_evts[DIR_WR]);
 
-               HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+               HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
                if (fd > *max_add_fd)
                        *max_add_fd = fd;
        }
diff --git a/src/ev_select.c b/src/ev_select.c
index 67775ca7e..d83db6c74 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -47,14 +47,14 @@ static inline void _update_fd(int fd, int *max_add_fd)
         * takes it for every other one.
         */
        if (!(en & FD_EV_POLLED_RW)) {
-               if (!fdtab[fd].polled_mask) {
+               if (!polled_mask[fd]) {
                        /* fd was not watched, it's still not */
                        return;
                }
                /* fd totally removed from poll list */
                hap_fd_clr(fd, fd_evts[DIR_RD]);
                hap_fd_clr(fd, fd_evts[DIR_WR]);
-               HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
+               HA_ATOMIC_AND(&polled_mask[fd], 0);
        }
        else {
                /* OK fd has to be monitored, it was either added or changed */
@@ -68,7 +68,7 @@ static inline void _update_fd(int fd, int *max_add_fd)
                else
                        hap_fd_set(fd, fd_evts[DIR_WR]);
 
-               HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+               HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
                if (fd > *max_add_fd)
                        *max_add_fd = fd;
        }
diff --git a/src/fd.c b/src/fd.c
index 508ee098f..bb6fc4283 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -159,6 +159,7 @@
 #include <proto/port_range.h>
 
 struct fdtab *fdtab = NULL;     /* array of all the file descriptors */
+unsigned long *polled_mask = NULL; /* Array for the polled_mask of each fd */
 struct fdinfo *fdinfo = NULL;   /* less-often used infos for file descriptors 
*/
 int totalconn;                  /* total # of terminated sessions */
 int actconn;                    /* # of active sessions */
@@ -373,7 +374,7 @@ static void fd_dodelete(int fd, int do_close)
        fdtab[fd].update_mask &= ~tid_bit;
        fdtab[fd].thread_mask = 0;
        if (do_close) {
-               fdtab[fd].polled_mask = 0;
+               polled_mask[fd] = 0;
                close(fd);
        }
        HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
@@ -488,6 +489,8 @@ int init_pollers()
        if ((fdtab = calloc(global.maxsock, sizeof(struct fdtab))) == NULL)
                goto fail_tab;
 
+       if ((polled_mask = calloc(global.maxsock, sizeof(unsigned long))) == 
NULL)
+               goto fail_polledmask;
        if ((fdinfo = calloc(global.maxsock, sizeof(struct fdinfo))) == NULL)
                goto fail_info;
 
@@ -526,6 +529,8 @@ int init_pollers()
  fail_info:
        free(fdtab);
  fail_tab:
+       free(polled_mask);
+ fail_polledmask:
        return 0;
 }
 
@@ -549,6 +554,7 @@ void deinit_pollers() {
 
        free(fdinfo);   fdinfo   = NULL;
        free(fdtab);    fdtab    = NULL;
+       free(polled_mask); polled_mask = NULL;
 }
 
 /*
-- 
2.14.3

Reply via email to