Olivier, I've been testing from master yesterday and today (with a reload every 60 second loop). As with any multithreaded testing, it's much easier to conclude something crashes than that it doesn't crash, but I didn't see any crashes or deadlocks in my test runs. So, I think it's good.
Richard PS sorry for messing up threading, I'm not receiving messages from Olivier to the list in my corporate mail; I'll move to my personal mail for the future. On 2/20/19, 1:00 PM, "Richard Russo" <rus...@whatsapp.com> wrote: While testing haproxy 1.9 on FreeBSD 11.1, I would often find stuck threads in the old process after reload. Symptoms were 100% cpu in some threads after soft_stop, many sockets in CLOSED state in lsof, and in core dumps, the stuck threads were in fd related processes on fds that were listen sockets before reload (after soft_stop, the FD numbers got reused for outgoing connections). After some more debugging, I was able to track it down to the locking optimizations from 1.9-dev4. From my testing, this was more likely to happen with more threads, and especially when the system was under elevated load. As mentioned in the commit message, I didn't observe this happening in tasks, only when listen sockets are closed, but I suspect it's possible, so it feels better to change that locking pattern at the same time. Apologies in advance if the patch is mutilated by Outlook/Exchange. I can resend from a personal address in case that happens. Thanks, Richard From 11903cdf9f9ac3236765e85b1849b0a26b2f7b51 Mon Sep 17 00:00:00 2001 From: Richard Russo <rus...@whatsapp.com> Date: Wed, 20 Feb 2019 12:43:45 -0800 Subject: [PATCH] BUG/MAJOR: fd/threads, task/threads: ensure all spin locks are unlocked Calculate if the fd or task should be locked once, before locking, and reuse the calculation when determing when to unlock. Fixes a race condition added in 87d54a9a for fds, and b20aa9ee for tasks, released in 1.9-dev4. When one thread modifies thread_mask to be a single thread for a task or fd while a second thread has locked or is waiting on a lock for that task or fd, the second thread will not unlock it. For FDs, this is observable when a listener is polled by multiple threads, and is closed while those threads have events pending. For tasks, this seems possible, where task_set_affinity is called, but I did not observe it. This must be backported to 1.9. --- include/proto/fd.h | 74 +++++++++++++++++++++++++++++++++++----------------- include/proto/task.h | 7 +++-- src/fd.c | 8 +++--- 3 files changed, 60 insertions(+), 29 deletions(-) diff --git a/include/proto/fd.h b/include/proto/fd.h index 81aea87b..787e4640 100644 --- a/include/proto/fd.h +++ b/include/proto/fd.h @@ -279,6 +279,7 @@ static inline int fd_active(const int fd) static inline void fd_stop_recv(int fd) { unsigned char old, new; + unsigned long locked; old = fdtab[fd].state; do { @@ -291,10 +292,11 @@ static inline void fd_stop_recv(int fd) if ((old ^ new) & FD_EV_POLLED_R) updt_fd_polling(fd); - if (atleast2(fdtab[fd].thread_mask)) + locked = atleast2(fdtab[fd].thread_mask); + if (locked) HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); fd_update_cache(fd); /* need an update entry to change the state */ - if (atleast2(fdtab[fd].thread_mask)) + if (locked) HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } @@ -302,6 +304,7 @@ static inline void fd_stop_recv(int fd) static inline void fd_stop_send(int fd) { unsigned char old, new; + unsigned long locked; old = fdtab[fd].state; do { @@ -314,10 +317,11 @@ static inline void fd_stop_send(int fd) if ((old ^ new) & FD_EV_POLLED_W) updt_fd_polling(fd); - if (atleast2(fdtab[fd].thread_mask)) + locked = atleast2(fdtab[fd].thread_mask); + if (locked) HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); fd_update_cache(fd); /* need an update entry to change the state */ - if (atleast2(fdtab[fd].thread_mask)) + if (locked) HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } @@ -325,6 +329,7 @@ static inline void fd_stop_send(int fd) static inline void fd_stop_both(int fd) { unsigned char old, new; + unsigned long locked; old = fdtab[fd].state; do { @@ -337,10 +342,11 @@ static inline void fd_stop_both(int fd) if ((old ^ new) & FD_EV_POLLED_RW) updt_fd_polling(fd); - if (atleast2(fdtab[fd].thread_mask)) + locked = atleast2(fdtab[fd].thread_mask); + if (locked) HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); fd_update_cache(fd); /* need an update entry to change the state */ - if (atleast2(fdtab[fd].thread_mask)) + if (locked) HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } @@ -348,6 +354,7 @@ static inline void fd_stop_both(int fd) static inline void fd_cant_recv(const int fd) { unsigned char old, new; + unsigned long locked; old = fdtab[fd].state; do { @@ -361,23 +368,27 @@ static inline void fd_cant_recv(const int fd) if ((old ^ new) & FD_EV_POLLED_R) updt_fd_polling(fd); - if (atleast2(fdtab[fd].thread_mask)) + locked = atleast2(fdtab[fd].thread_mask); + if (locked) HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); fd_update_cache(fd); /* need an update entry to change the state */ - if (atleast2(fdtab[fd].thread_mask)) + if (locked) HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Report that FD <fd> can receive anymore without polling. */ static inline void fd_may_recv(const int fd) { + unsigned long locked; + /* marking ready never changes polled status */ HA_ATOMIC_OR(&fdtab[fd].state, FD_EV_READY_R); - if (atleast2(fdtab[fd].thread_mask)) + locked = atleast2(fdtab[fd].thread_mask); + if (locked) HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); fd_update_cache(fd); /* need an update entry to change the state */ - if (atleast2(fdtab[fd].thread_mask)) + if (locked) HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } @@ -389,6 +400,7 @@ static inline void fd_may_recv(const int fd) static inline void fd_done_recv(const int fd) { unsigned char old, new; + unsigned long locked; old = fdtab[fd].state; do { @@ -402,10 +414,11 @@ static inline void fd_done_recv(const int fd) if ((old ^ new) & FD_EV_POLLED_R) updt_fd_polling(fd); - if (atleast2(fdtab[fd].thread_mask)) + locked = atleast2(fdtab[fd].thread_mask); + if (locked) HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); fd_update_cache(fd); /* need an update entry to change the state */ - if (atleast2(fdtab[fd].thread_mask)) + if (locked) HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } @@ -413,6 +426,7 @@ static inline void fd_done_recv(const int fd) static inline void fd_cant_send(const int fd) { unsigned char old, new; + unsigned long locked; old = fdtab[fd].state; do { @@ -426,23 +440,27 @@ static inline void fd_cant_send(const int fd) if ((old ^ new) & FD_EV_POLLED_W) updt_fd_polling(fd); - if (atleast2(fdtab[fd].thread_mask)) + locked = atleast2(fdtab[fd].thread_mask); + if (locked) HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); fd_update_cache(fd); /* need an update entry to change the state */ - if (atleast2(fdtab[fd].thread_mask)) + if (locked) HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } /* Report that FD <fd> can send anymore without polling (EAGAIN detected). */ static inline void fd_may_send(const int fd) { + unsigned long locked; + /* marking ready never changes polled status */ HA_ATOMIC_OR(&fdtab[fd].state, FD_EV_READY_W); - if (atleast2(fdtab[fd].thread_mask)) + locked = atleast2(fdtab[fd].thread_mask); + if (locked) HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); fd_update_cache(fd); /* need an update entry to change the state */ - if (atleast2(fdtab[fd].thread_mask)) + if (locked) HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } @@ -450,6 +468,7 @@ static inline void fd_may_send(const int fd) static inline void fd_want_recv(int fd) { unsigned char old, new; + unsigned long locked; old = fdtab[fd].state; do { @@ -463,10 +482,11 @@ static inline void fd_want_recv(int fd) if ((old ^ new) & FD_EV_POLLED_R) updt_fd_polling(fd); - if (atleast2(fdtab[fd].thread_mask)) + locked = atleast2(fdtab[fd].thread_mask); + if (locked) HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); fd_update_cache(fd); /* need an update entry to change the state */ - if (atleast2(fdtab[fd].thread_mask)) + if (locked) HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } @@ -474,6 +494,7 @@ static inline void fd_want_recv(int fd) static inline void fd_want_send(int fd) { unsigned char old, new; + unsigned long locked; old = fdtab[fd].state; do { @@ -487,10 +508,11 @@ static inline void fd_want_send(int fd) if ((old ^ new) & FD_EV_POLLED_W) updt_fd_polling(fd); - if (atleast2(fdtab[fd].thread_mask)) + locked = atleast2(fdtab[fd].thread_mask); + if (locked) HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); fd_update_cache(fd); /* need an update entry to change the state */ - if (atleast2(fdtab[fd].thread_mask)) + if (locked) HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } @@ -498,11 +520,13 @@ static inline void fd_want_send(int fd) * by the poller to set FD_POLL_* flags. */ static inline void fd_update_events(int fd, int evts) { - if (atleast2(fdtab[fd].thread_mask)) + unsigned long locked = atleast2(fdtab[fd].thread_mask); + + if (locked) HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); fdtab[fd].ev &= FD_POLL_STICKY; fdtab[fd].ev |= evts; - if (atleast2(fdtab[fd].thread_mask)) + if (locked) HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) @@ -515,7 +539,9 @@ static inline void fd_update_events(int fd, int evts) /* Prepares <fd> for being polled */ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned long thread_mask) { - if (atleast2(thread_mask)) + unsigned long locked = atleast2(thread_mask); + + if (locked) HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock); fdtab[fd].owner = owner; fdtab[fd].iocb = iocb; @@ -526,7 +552,7 @@ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned /* note: do not reset polled_mask here as it indicates which poller * still knows this FD from a possible previous round. */ - if (atleast2(thread_mask)) + if (locked) HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } diff --git a/include/proto/task.h b/include/proto/task.h index 0177c52b..eddbf925 100644 --- a/include/proto/task.h +++ b/include/proto/task.h @@ -184,11 +184,14 @@ static inline struct task *__task_unlink_wq(struct task *t) */ static inline struct task *task_unlink_wq(struct task *t) { + unsigned long locked; + if (likely(task_in_wq(t))) { - if (atleast2(t->thread_mask)) + locked = atleast2(t->thread_mask); + if (locked) HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock); __task_unlink_wq(t); - if (atleast2(t->thread_mask)) + if (locked) HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock); } return t; diff --git a/src/fd.c b/src/fd.c index 9434c630..e4d8afa2 100644 --- a/src/fd.c +++ b/src/fd.c @@ -405,6 +405,7 @@ void fd_remove(int fd) static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist) { int fd, old_fd, e; + unsigned long locked; for (old_fd = fd = fdlist->first; fd != -1; fd = fdtab[fd].cache.next) { if (fd == -2) { @@ -421,7 +422,8 @@ static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist) continue; HA_ATOMIC_OR(&fd_cache_mask, tid_bit); - if (atleast2(fdtab[fd].thread_mask) && HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) { + locked = atleast2(fdtab[fd].thread_mask); + if (locked && HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) { activity[tid].fd_lock++; continue; } @@ -436,13 +438,13 @@ static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist) fdtab[fd].ev |= FD_POLL_OUT; if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev) { - if (atleast2(fdtab[fd].thread_mask)) + if (locked) HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); fdtab[fd].iocb(fd); } else { fd_release_cache_entry(fd); - if (atleast2(fdtab[fd].thread_mask)) + if (locked) HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock); } } -- 2.13.5