While testing haproxy 1.9 on FreeBSD 11.1, I would often find stuck threads in 
the old process after reload. Symptoms were 100% cpu in some threads after 
soft_stop, many sockets in CLOSED state in lsof, and in core dumps, the stuck 
threads were in fd related processes on fds that were listen sockets before 
reload (after soft_stop, the FD numbers got reused for outgoing connections). 
After some more debugging, I was able to track it down to the locking 
optimizations from 1.9-dev4. From my testing, this was more likely to happen 
with more threads, and especially when the system was under elevated load.

As mentioned in the commit message, I didn't observe this happening in tasks, 
only when listen sockets are closed, but I suspect it's possible, so it feels 
better to change that locking pattern at the same time.

Apologies in advance if the patch is mutilated by Outlook/Exchange. I can 
resend from a personal address in case that happens.

Thanks,

Richard

From 11903cdf9f9ac3236765e85b1849b0a26b2f7b51 Mon Sep 17 00:00:00 2001
From: Richard Russo <[email protected]>
Date: Wed, 20 Feb 2019 12:43:45 -0800
Subject: [PATCH] BUG/MAJOR: fd/threads, task/threads: ensure all spin locks
 are unlocked

Calculate if the fd or task should be locked once, before locking, and
reuse the calculation when determing when to unlock.

Fixes a race condition added in 87d54a9a for fds, and b20aa9ee for tasks,
released in 1.9-dev4. When one thread modifies thread_mask to be a single
thread for a task or fd while a second thread has locked or is waiting on a
lock for that task or fd, the second thread will not unlock it.  For FDs,
this is observable when a listener is polled by multiple threads, and is
closed while those threads have events pending.  For tasks, this seems
possible, where task_set_affinity is called, but I did not observe it.

This must be backported to 1.9.
---
 include/proto/fd.h   | 74 +++++++++++++++++++++++++++++++++++-----------------
 include/proto/task.h |  7 +++--
 src/fd.c             |  8 +++---
 3 files changed, 60 insertions(+), 29 deletions(-)

diff --git a/include/proto/fd.h b/include/proto/fd.h
index 81aea87b..787e4640 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -279,6 +279,7 @@ static inline int fd_active(const int fd)
 static inline void fd_stop_recv(int fd)
 {
        unsigned char old, new;
+       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
@@ -291,10 +292,11 @@ static inline void fd_stop_recv(int fd)
        if ((old ^ new) & FD_EV_POLLED_R)
                updt_fd_polling(fd);
 
-       if (atleast2(fdtab[fd].thread_mask))
+       locked = atleast2(fdtab[fd].thread_mask);
+       if (locked)
                HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
        fd_update_cache(fd); /* need an update entry to change the state */
-       if (atleast2(fdtab[fd].thread_mask))
+       if (locked)
                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
@@ -302,6 +304,7 @@ static inline void fd_stop_recv(int fd)
 static inline void fd_stop_send(int fd)
 {
        unsigned char old, new;
+       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
@@ -314,10 +317,11 @@ static inline void fd_stop_send(int fd)
        if ((old ^ new) & FD_EV_POLLED_W)
                updt_fd_polling(fd);
 
-       if (atleast2(fdtab[fd].thread_mask))
+       locked = atleast2(fdtab[fd].thread_mask);
+       if (locked)
                HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
        fd_update_cache(fd); /* need an update entry to change the state */
-       if (atleast2(fdtab[fd].thread_mask))
+       if (locked)
                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
@@ -325,6 +329,7 @@ static inline void fd_stop_send(int fd)
 static inline void fd_stop_both(int fd)
 {
        unsigned char old, new;
+       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
@@ -337,10 +342,11 @@ static inline void fd_stop_both(int fd)
        if ((old ^ new) & FD_EV_POLLED_RW)
                updt_fd_polling(fd);
 
-       if (atleast2(fdtab[fd].thread_mask))
+       locked = atleast2(fdtab[fd].thread_mask);
+       if (locked)
                HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
        fd_update_cache(fd); /* need an update entry to change the state */
-       if (atleast2(fdtab[fd].thread_mask))
+       if (locked)
                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
@@ -348,6 +354,7 @@ static inline void fd_stop_both(int fd)
 static inline void fd_cant_recv(const int fd)
 {
        unsigned char old, new;
+       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
@@ -361,23 +368,27 @@ static inline void fd_cant_recv(const int fd)
        if ((old ^ new) & FD_EV_POLLED_R)
                updt_fd_polling(fd);
 
-       if (atleast2(fdtab[fd].thread_mask))
+       locked = atleast2(fdtab[fd].thread_mask);
+       if (locked)
                HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
        fd_update_cache(fd); /* need an update entry to change the state */
-       if (atleast2(fdtab[fd].thread_mask))
+       if (locked)
                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Report that FD <fd> can receive anymore without polling. */
 static inline void fd_may_recv(const int fd)
 {
+       unsigned long locked;
+
        /* marking ready never changes polled status */
        HA_ATOMIC_OR(&fdtab[fd].state, FD_EV_READY_R);
 
-       if (atleast2(fdtab[fd].thread_mask))
+       locked = atleast2(fdtab[fd].thread_mask);
+       if (locked)
                HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
        fd_update_cache(fd); /* need an update entry to change the state */
-       if (atleast2(fdtab[fd].thread_mask))
+       if (locked)
                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
@@ -389,6 +400,7 @@ static inline void fd_may_recv(const int fd)
 static inline void fd_done_recv(const int fd)
 {
        unsigned char old, new;
+       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
@@ -402,10 +414,11 @@ static inline void fd_done_recv(const int fd)
        if ((old ^ new) & FD_EV_POLLED_R)
                updt_fd_polling(fd);
 
-       if (atleast2(fdtab[fd].thread_mask))
+       locked = atleast2(fdtab[fd].thread_mask);
+       if (locked)
                HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
        fd_update_cache(fd); /* need an update entry to change the state */
-       if (atleast2(fdtab[fd].thread_mask))
+       if (locked)
                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
@@ -413,6 +426,7 @@ static inline void fd_done_recv(const int fd)
 static inline void fd_cant_send(const int fd)
 {
        unsigned char old, new;
+       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
@@ -426,23 +440,27 @@ static inline void fd_cant_send(const int fd)
        if ((old ^ new) & FD_EV_POLLED_W)
                updt_fd_polling(fd);
 
-       if (atleast2(fdtab[fd].thread_mask))
+       locked = atleast2(fdtab[fd].thread_mask);
+       if (locked)
                HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
        fd_update_cache(fd); /* need an update entry to change the state */
-       if (atleast2(fdtab[fd].thread_mask))
+       if (locked)
                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Report that FD <fd> can send anymore without polling (EAGAIN detected). */
 static inline void fd_may_send(const int fd)
 {
+       unsigned long locked;
+
        /* marking ready never changes polled status */
        HA_ATOMIC_OR(&fdtab[fd].state, FD_EV_READY_W);
 
-       if (atleast2(fdtab[fd].thread_mask))
+       locked = atleast2(fdtab[fd].thread_mask);
+       if (locked)
                HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
        fd_update_cache(fd); /* need an update entry to change the state */
-       if (atleast2(fdtab[fd].thread_mask))
+       if (locked)
                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
@@ -450,6 +468,7 @@ static inline void fd_may_send(const int fd)
 static inline void fd_want_recv(int fd)
 {
        unsigned char old, new;
+       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
@@ -463,10 +482,11 @@ static inline void fd_want_recv(int fd)
        if ((old ^ new) & FD_EV_POLLED_R)
                updt_fd_polling(fd);
 
-       if (atleast2(fdtab[fd].thread_mask))
+       locked = atleast2(fdtab[fd].thread_mask);
+       if (locked)
                HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
        fd_update_cache(fd); /* need an update entry to change the state */
-       if (atleast2(fdtab[fd].thread_mask))
+       if (locked)
                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
@@ -474,6 +494,7 @@ static inline void fd_want_recv(int fd)
 static inline void fd_want_send(int fd)
 {
        unsigned char old, new;
+       unsigned long locked;
 
        old = fdtab[fd].state;
        do {
@@ -487,10 +508,11 @@ static inline void fd_want_send(int fd)
        if ((old ^ new) & FD_EV_POLLED_W)
                updt_fd_polling(fd);
 
-       if (atleast2(fdtab[fd].thread_mask))
+       locked = atleast2(fdtab[fd].thread_mask);
+       if (locked)
                HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
        fd_update_cache(fd); /* need an update entry to change the state */
-       if (atleast2(fdtab[fd].thread_mask))
+       if (locked)
                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
@@ -498,11 +520,13 @@ static inline void fd_want_send(int fd)
  * by the poller to set FD_POLL_* flags. */
 static inline void fd_update_events(int fd, int evts)
 {
-       if (atleast2(fdtab[fd].thread_mask))
+       unsigned long locked = atleast2(fdtab[fd].thread_mask);
+
+       if (locked)
                HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
        fdtab[fd].ev &= FD_POLL_STICKY;
        fdtab[fd].ev |= evts;
-       if (atleast2(fdtab[fd].thread_mask))
+       if (locked)
                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 
        if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
@@ -515,7 +539,9 @@ static inline void fd_update_events(int fd, int evts)
 /* Prepares <fd> for being polled */
 static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), 
unsigned long thread_mask)
 {
-       if (atleast2(thread_mask))
+       unsigned long locked = atleast2(thread_mask);
+
+       if (locked)
                HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
        fdtab[fd].owner = owner;
        fdtab[fd].iocb = iocb;
@@ -526,7 +552,7 @@ static inline void fd_insert(int fd, void *owner, void 
(*iocb)(int fd), unsigned
        /* note: do not reset polled_mask here as it indicates which poller
         * still knows this FD from a possible previous round.
         */
-       if (atleast2(thread_mask))
+       if (locked)
                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
diff --git a/include/proto/task.h b/include/proto/task.h
index 0177c52b..eddbf925 100644
--- a/include/proto/task.h
+++ b/include/proto/task.h
@@ -184,11 +184,14 @@ static inline struct task *__task_unlink_wq(struct task 
*t)
  */
 static inline struct task *task_unlink_wq(struct task *t)
 {
+       unsigned long locked;
+
        if (likely(task_in_wq(t))) {
-               if (atleast2(t->thread_mask))
+               locked = atleast2(t->thread_mask);
+               if (locked)
                        HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
                __task_unlink_wq(t);
-               if (atleast2(t->thread_mask))
+               if (locked)
                        HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
        }
        return t;
diff --git a/src/fd.c b/src/fd.c
index 9434c630..e4d8afa2 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -405,6 +405,7 @@ void fd_remove(int fd)
 static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist)
 {
        int fd, old_fd, e;
+       unsigned long locked;
 
        for (old_fd = fd = fdlist->first; fd != -1; fd = fdtab[fd].cache.next) {
                if (fd == -2) {
@@ -421,7 +422,8 @@ static inline void fdlist_process_cached_events(volatile 
struct fdlist *fdlist)
                        continue;
 
                HA_ATOMIC_OR(&fd_cache_mask, tid_bit);
-               if (atleast2(fdtab[fd].thread_mask) && HA_SPIN_TRYLOCK(FD_LOCK, 
&fdtab[fd].lock)) {
+               locked = atleast2(fdtab[fd].thread_mask);
+               if (locked && HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) {
                        activity[tid].fd_lock++;
                        continue;
                }
@@ -436,13 +438,13 @@ static inline void fdlist_process_cached_events(volatile 
struct fdlist *fdlist)
                        fdtab[fd].ev |= FD_POLL_OUT;
 
                if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev) {
-                       if (atleast2(fdtab[fd].thread_mask))
+                       if (locked)
                                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
                        fdtab[fd].iocb(fd);
                }
                else {
                        fd_release_cache_entry(fd);
-                       if (atleast2(fdtab[fd].thread_mask))
+                       if (locked)
                                HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
                }
        }
-- 
2.13.5


Reply via email to