Le 28/03/2018 à 14:16, Максим Куприянов a écrit :
Hi!

I'm sorry but configuration it's too huge too share (over 100 different proxy sections). This is also the reason I can't exactly determine the failing section. Is there a way to get this data from core-file?

2018-03-28 11:18 GMT+03:00 Christopher Faulet <[email protected] <mailto:[email protected]>>:

    Le 28/03/2018 à 09:36, Максим Куприянов a écrit :

        Hi!

        Yesterday one of our haproxies (1.8.5) with nbthread=8 set in
        its config stuck with 800% CPU usage. Some responses were served
        successfully but many of them just timed out. perf top showed this:
           59.19%  [.] thread_enter_sync
           32.68%  [.] fwrr_get_next_server


    Hi,

    Could you share your configuration please ? It will help to diagnose
    the problem. In your logs, what is the values of srv_queue and
    backend_queue fields ?


Hi,

Ok, I partly reproduce your problem using a backend, with an hundred servers and a maxconn to 2 for each one. In this case, I observe same CPUs consumption. I have no timeouts (it probably depends on your values) but performances are quite low.

I think you're hitting a limitation of the current design. We have no mechanism to migrate entities between threads. So to force threads wakeup, we use the sync point. It was not designed to be called very often. In your case, it eats all the CPU.

I attached 3 patches. They add a mechanism to wakeup threads selectively without any lock or loop. They must be applied on HAProxy 1.8 (it will not work on the upstream). So you can check if it fixes your problem or not. It will be useful to validate it is a design limitation and not a bug.

This is just an experimentation. I hope it works well but I didn't do a lot of testing. If yes, I'll then discuss with Willy if it is pertinent or not to do the threads wakeup this way. But, in all cases, it will probably not be backported in HAProxy 1.8.

--
Christopher Faulet
>From 4131ccaf7b62b6d9888d80dfef002dc4df03ea0f Mon Sep 17 00:00:00 2001
From: Christopher Faulet <[email protected]>
Date: Thu, 29 Mar 2018 09:41:09 +0200
Subject: [PATCH 1/3] WIP: threads: Add a mechanism to wake up threads on
 demand

Until this patch, the sync point was the only way to wake up some threads, to be
sure to trigger processings, even if these threads are blocked in the
poller. This works if these wakeups are not too frequent. But the sync point was
not design to be called very often. When a thread is waiting in the sync point,
it consumes all the CPU. And when we need to wake up only one thread, all others
are also woken up.

So to solve these issues, we add a pipe per thread (instead of a pipe for all
threads). This pipe can be used when we need to wake up a thread. We write on it
to wake up the corresponding thread. When the thread is effectively wake up, the
pipe is drained. A bitfield is used to know if a wakeup is already pending for a
thread.

Because this is just a notification mechanism, there is no lock or loop. So it
is pretty light.
---
 include/common/hathreads.h | 12 +++++++
 src/hathreads.c            | 84 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 96 insertions(+)

diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 19299db7..e15d0c4e 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -74,6 +74,10 @@ extern THREAD_LOCAL unsigned long tid_bit; /* The bit corresponding to the threa
 #define THREAD_NO_SYNC()     ({ 0; })
 #define THREAD_NEED_SYNC()   ({ 1; })
 
+#define THREADS_WAKEUP_INIT(n)   do { /* do nothing */ } while(0)
+#define THREADS_WAKEUP_ENABLE(n) do { /* do nothing */ } while(0)
+#define THREADS_WAKEUP(m)        do { /* do nothing */ } while(0)
+
 #define HA_SPIN_INIT(l)         do { /* do nothing */ } while(0)
 #define HA_SPIN_DESTROY(l)      do { /* do nothing */ } while(0)
 #define HA_SPIN_LOCK(lbl, l)    do { /* do nothing */ } while(0)
@@ -191,6 +195,10 @@ extern THREAD_LOCAL unsigned long tid_bit; /* The bit corresponding to the threa
 #define THREAD_NO_SYNC()      thread_no_sync()
 #define THREAD_NEED_SYNC()    thread_need_sync()
 
+#define THREADS_WAKEUP_INIT(n)   threads_wakeup_init(n)
+#define THREADS_WAKEUP_ENABLE(n) threads_wakeup_enable(n)
+#define THREADS_WAKEUP(m)        threads_wakeup(m)
+
 int  thread_sync_init(unsigned long mask);
 void thread_sync_enable(void);
 void thread_want_sync(void);
@@ -199,6 +207,10 @@ void thread_exit_sync(void);
 int  thread_no_sync(void);
 int  thread_need_sync(void);
 
+int  threads_wakeup_init(unsigned int nbthread);
+void threads_wakeup_enable(unsigned int nbthread);
+void threads_wakeup(unsigned long mask);
+
 #if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
 
 /* WARNING!!! if you update this enum, please also keep lock_label() up to date below */
diff --git a/src/hathreads.c b/src/hathreads.c
index fea4ffeb..1547298b 100644
--- a/src/hathreads.c
+++ b/src/hathreads.c
@@ -28,6 +28,11 @@ static int           threads_sync_pipe[2];
 static unsigned long threads_want_sync = 0;
 static unsigned long all_threads_mask  = 0;
 
+
+static int           (*threads_wakeup_pipes)[2] = NULL;
+static unsigned long threads_to_wakeup = 0;
+
+
 #if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
 struct lock_stat lock_stats[LOCK_LABELS];
 #endif
@@ -150,6 +155,79 @@ void thread_exit_sync()
 }
 
 
+
+/* I/O handler used by threads to consume the "wakeup char". */
+static void threads_wake_io_handler(int fd)
+{
+	int c;
+
+	shut_your_big_mouth_gcc(read(fd, &c, 1));
+	fd_done_recv(fd);
+	HA_ATOMIC_AND(&threads_to_wakeup, ~tid_bit);
+}
+
+
+/* Initializes wakeup pipes for threads. It creates a pipe by thread. All other
+ * threads can use it to wake it up. It returns 0 on success and -1 if an error
+ * occurred.
+ */
+int threads_wakeup_init(unsigned int nbthread)
+{
+	int i, rfd;
+
+	threads_wakeup_pipes = calloc(nbthread, sizeof(*threads_wakeup_pipes));
+	if (!threads_wakeup_pipes)
+		return -1;
+
+	for (i = 0; i < nbthread; i++) {
+		if (pipe(threads_wakeup_pipes[i]) < 0) {
+			free(threads_wakeup_pipes);
+			return -1;
+		}
+		rfd = threads_wakeup_pipes[i][0];
+		fcntl(rfd, F_SETFL, O_NONBLOCK);
+
+		fdtab[rfd].owner = threads_wake_io_handler;
+		fdtab[rfd].iocb = threads_wake_io_handler;
+		fd_insert(rfd, 1UL << i);
+	}
+	return 0;
+}
+
+/* Enables the wakeup pipes. */
+void threads_wakeup_enable(unsigned int nbthread)
+{
+	int i;
+
+	for (i = 0; i < nbthread; i++)
+		fd_want_recv(threads_wakeup_pipes[i][0]);
+}
+
+/* Wakes up some threads using <mask>. For a thread, we try to wake it up only
+ * if it was not already done. */
+void threads_wakeup(unsigned long mask)
+{
+	unsigned long bit, old, new;
+	int i;
+
+	for (bit = 0; (i = my_ffsl(mask)) > 0; mask &= ~bit) {
+		i--;
+		bit = (1UL << i);
+		if (i == tid)
+			continue;
+
+		old = threads_to_wakeup;
+		do {
+			new = (old | bit);
+			if (old & bit)
+				goto next;
+		} while (!HA_ATOMIC_CAS(&threads_to_wakeup, &old, new));
+		shut_your_big_mouth_gcc(write(threads_wakeup_pipes[i][1], "W", 1));
+	  next:
+		continue;
+	}
+}
+
 __attribute__((constructor))
 static void __hathreads_init(void)
 {
@@ -160,4 +238,10 @@ static void __hathreads_init(void)
 	hap_register_build_opts("Built with multi-threading support.", 0);
 }
 
+
+__attribute__((destructor))
+static void __hathreads_deinit(void)
+{
+	free(threads_wakeup_pipes);
+}
 #endif
-- 
2.14.3

>From 50c30e71619788c67379ce8e8426fb027ee7b6f9 Mon Sep 17 00:00:00 2001
From: Christopher Faulet <[email protected]>
Date: Thu, 29 Mar 2018 09:46:27 +0200
Subject: [PATCH 2/3] WIP: threads: Initialize the threads wakeup mechanism

---
 src/haproxy.c | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/src/haproxy.c b/src/haproxy.c
index 4628d829..004b37fe 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -2458,6 +2458,7 @@ static void *run_thread_poll_loop(void *data)
 
 	protocol_enable_all();
 	THREAD_SYNC_ENABLE();
+	THREADS_WAKEUP_ENABLE(global.nbthread);
 	run_poll_loop();
 
 	list_for_each_entry(ptdf, &per_thread_deinit_list, list)
@@ -3008,6 +3009,7 @@ int main(int argc, char **argv)
 		int          i;
 
 		THREAD_SYNC_INIT((1UL << global.nbthread) - 1);
+		THREADS_WAKEUP_INIT(global.nbthread);
 
 		/* Init tids array */
 		for (i = 0; i < global.nbthread; i++)
-- 
2.14.3

>From c9a97c190ba798cad82aee9abaa8c4a0d2037b34 Mon Sep 17 00:00:00 2001
From: Christopher Faulet <[email protected]>
Date: Thu, 29 Mar 2018 09:46:39 +0200
Subject: [PATCH 3/3] WIP: threads/queue: Wakeup threads selectively instead of
 using the sync point

---
 src/queue.c | 44 ++++++++++++++++++++------------------------
 1 file changed, 20 insertions(+), 24 deletions(-)

diff --git a/src/queue.c b/src/queue.c
index 7cad813b..bbb07128 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -82,7 +82,7 @@ static void pendconn_unlink(struct pendconn *p)
 
 /* Process the next pending connection from either a server or a proxy, and
  * returns a strictly positive value on success (see below). If no pending
- * connection is found, 0 is returned.  Note that neither <srv> nor <px> may be
+ * connection is found, NULL is returned.  Note that neither <srv> nor <px> may be
  * NULL.  Priority is given to the oldest request in the queue if both <srv> and
  * <px> have pending requests. This ensures that no request will be left
  * unserved.  The <px> queue is not considered if the server (or a tracked
@@ -95,14 +95,12 @@ static void pendconn_unlink(struct pendconn *p)
  *
  * This function must only be called if the server queue _AND_ the proxy queue
  * are locked. Today it is only called by process_srv_queue. When a pending
- * connection is dequeued, this function returns 1 if the pending connection can
- * be handled by the current thread, else it returns 2.
+ * connection is dequeued, this function returns the woken up stream.
  */
-static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
+static struct stream *pendconn_process_next_strm(struct server *srv, struct proxy *px)
 {
 	struct pendconn *p = NULL;
 	struct server   *rsrv;
-	int remote;
 
 	rsrv = srv->track;
 	if (!rsrv)
@@ -139,7 +137,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
 	}
 
 	if (!p)
-		return 0;
+		return NULL;
 
   pendconn_found:
 	pendconn_unlink(p);
@@ -151,13 +149,10 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
 	if (px->lbprm.server_take_conn)
 		px->lbprm.server_take_conn(srv);
 	__stream_add_srv_conn(p->strm, srv);
-
-	remote = !(p->strm->task->thread_mask & tid_bit);
 	task_wakeup(p->strm->task, TASK_WOKEN_RES);
 	HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
 
-	/* Returns 1 if the current thread can process the stream, otherwise returns 2. */
-	return remote ? 2 : 1;
+	return p->strm;
 }
 
 /* Manages a server's connection queue. This function will try to dequeue as
@@ -166,22 +161,23 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
 void process_srv_queue(struct server *s)
 {
 	struct proxy  *p = s->proxy;
-	int maxconn, remote = 0;
+	unsigned long tmask = 0;
+	int maxconn;
 
 	HA_SPIN_LOCK(PROXY_LOCK,  &p->lock);
 	HA_SPIN_LOCK(SERVER_LOCK, &s->lock);
 	maxconn = srv_dynamic_maxconn(s);
 	while (s->served < maxconn) {
-		int ret = pendconn_process_next_strm(s, p);
-		if (!ret)
+		struct stream *strm = pendconn_process_next_strm(s, p);
+		if (!strm)
 			break;
-		remote |= (ret == 2);
+		tmask |= strm->task->thread_mask;
 	}
 	HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock);
 	HA_SPIN_UNLOCK(PROXY_LOCK,  &p->lock);
 
-	if (remote)
-		thread_want_sync();
+	if (tmask != tid_bit)
+		THREADS_WAKEUP(tmask);
 }
 
 /* Adds the stream <strm> to the pending connection list of server <strm>->srv
@@ -242,8 +238,8 @@ struct pendconn *pendconn_add(struct stream *strm)
 int pendconn_redistribute(struct server *s)
 {
 	struct pendconn *p, *pback;
+	unsigned long tmask = 0;
 	int xferred = 0;
-	int remote = 0;
 
 	/* The REDISP option was specified. We will ignore cookie and force to
 	 * balance or use the dispatcher. */
@@ -262,14 +258,14 @@ int pendconn_redistribute(struct server *s)
 		pendconn_unlink(p);
 		p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
 
-		remote |= !(p->strm->task->thread_mask & tid_bit);
+		tmask |= p->strm->task->thread_mask;
 		task_wakeup(p->strm->task, TASK_WOKEN_RES);
 		HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
 	}
 	HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock);
 
-	if (remote)
-		thread_want_sync();
+	if (tmask != tid_bit)
+		THREADS_WAKEUP(tmask);
 
 	return xferred;
 }
@@ -282,8 +278,8 @@ int pendconn_redistribute(struct server *s)
 int pendconn_grab_from_px(struct server *s)
 {
 	struct pendconn *p, *pback;
+	unsigned long tmask = 0;
 	int maxconn, xferred = 0;
-	int remote = 0;
 
 	if (!srv_currently_usable(s))
 		return 0;
@@ -300,15 +296,15 @@ int pendconn_grab_from_px(struct server *s)
 		pendconn_unlink(p);
 		p->srv = s;
 
-		remote |= !(p->strm->task->thread_mask & tid_bit);
+		tmask |= p->strm->task->thread_mask;
 		task_wakeup(p->strm->task, TASK_WOKEN_RES);
 		HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
 		xferred++;
 	}
 	HA_SPIN_UNLOCK(PROXY_LOCK, &s->proxy->lock);
 
-	if (remote)
-		thread_want_sync();
+	if (tmask != tid_bit)
+		THREADS_WAKEUP(tmask);
 
 	return xferred;
 }
-- 
2.14.3

Reply via email to