This adds the set-priority-class and set-priority-offset actions to
http-request and tcp-request content.
The priority values are used when connections are queued to determine
which connections should be served first. The lowest priority class is
served first. When multiple requests from the same class are found, the
earliest (according to queue_time + offset) is served first.
---
 doc/configuration.txt  |  38 ++++++
 doc/lua-api/index.rst  |  18 +++
 include/types/proxy.h  |   3 +-
 include/types/queue.h  |   2 +-
 include/types/server.h |   3 +-
 include/types/stream.h |   7 +-
 src/cfgparse.c         |  15 +++
 src/hlua.c             |  74 ++++++++---
 src/log.c              |   4 +-
 src/proto_http.c       |   4 +-
 src/proxy.c            |   2 +-
 src/queue.c            | 345
++++++++++++++++++++++++++++++++++++++++++-------
 src/server.c           |   2 +-
 src/stream.c           |  10 +-
 14 files changed, 453 insertions(+), 74 deletions(-)


diff --git a/doc/configuration.txt b/doc/configuration.txt
index cbea3309d..7ec010811 100644
--- a/doc/configuration.txt
+++ b/doc/configuration.txt
@@ -3911,6 +3911,7 @@ http-request { allow | auth [realm <realm>] | redirect <rule> | reject |
               replace-value <name> <match-regex> <replace-fmt> |
               set-method <fmt> | set-path <fmt> | set-query <fmt> |
               set-uri <fmt> | set-tos <tos> | set-mark <mark> |
+              set-priority-class <expr> | set-priority-offset <expr>
               add-acl(<file name>) <key fmt> |
               del-acl(<file name>) <key fmt> |
               del-map(<file name>) <key fmt> |
@@ -4107,6 +4108,24 @@ http-request { allow | auth [realm <realm>] | redirect <rule> | reject |
       downloads). This works on Linux kernels 2.6.32 and above and requires
       admin privileges.
 
+    - "set-priority-class" is used to set the queue priority class of the
+      current request. The value must be a sample expression which converts to
+      an integer in the range -2047..2047. Results outside this range will be
+      truncated. The priority class determines the order in which queued
+      requests are processed. Lower values have higher priority.
+
+    - "set-priority-offset" is used to set the queue priority timestamp offset
+      of the current request. The value must be a sample expression which
+      converts to an integer in the range -524287..524287. Results outside this
+      range will be truncated. When a request is queued, it is ordered first by
+      the priority class, then by the current timestamp adjusted by the given
+      offset in milliseconds. Lower values have higher priority.
+      Note that the resulting timestamp is is only tracked with enough precision
+      for 524,287ms (8m44s287ms). If the request is queued long enough to where
+      the adjusted timestamp exceeds this value, it will be misidentified as
+      highest priority. Thus it is important to set "timeout queue" to a value,
+      where when combined with the offset, does not exceed this limit.
+
     - "add-acl" is used to add a new entry into an ACL. The ACL must be loaded
       from a file (even a dummy empty file). The file name of the ACL to be
       updated is passed between parentheses. It takes one argument: <key fmt>,
@@ -9446,6 +9465,7 @@ tcp-request content <action> [{if | unless} <condition>]
     - accept : the request is accepted
     - reject : the request is rejected and the connection is closed
     - capture : the specified sample expression is captured
+    - set-priority-class <expr> | set-priority-offset <expr>
     - { track-sc0 | track-sc1 | track-sc2 } <key> [table <table>]
     - sc-inc-gpc0(<sc-id>)
     - sc-inc-gpc1(<sc-id>)
@@ -9507,6 +9527,24 @@ tcp-request content <action> [{if | unless} <condition>]
   The "unset-var" is used to unset a variable. See above for details about
   <var-name>.
 
+  The "set-priority-class" is used to set the queue priority class of the
+  current request. The value must be a sample expression which converts to an
+  integer in the range -2047..2047. Results outside this range will be
+  truncated. The priority class determines the order in which queued requests
+  are processed. Lower values have higher priority.
+
+  The "set-priority-offset" is used to set the queue priority timestamp offset
+  of the current request. The value must be a sample expression which converts
+  to an integer in the range -524287..524287. Results outside this range will be
+  truncated. When a request is queued, it is ordered first by the priority
+  class, then by the current timestamp adjusted by the given offset in
+  milliseconds. Lower values have higher priority.
+  Note that the resulting timestamp is is only tracked with enough precision for
+  524,287ms (8m44s287ms). If the request is queued long enough to where the
+  adjusted timestamp exceeds this value, it will be misidentified as highest
+  priority.  Thus it is important to set "timeout queue" to a value, where when
+  combined with the offset, does not exceed this limit.
+
   The "send-spoe-group" is used to trigger sending of a group of SPOE
   messages. To do so, the SPOE engine used to send messages must be defined, as
   well as the SPOE group to send. Of course, the SPOE engine must refer to an
diff --git a/doc/lua-api/index.rst b/doc/lua-api/index.rst
index ee9dab55c..0c79766eb 100644
--- a/doc/lua-api/index.rst
+++ b/doc/lua-api/index.rst
@@ -1769,6 +1769,24 @@ TXN class
   :param class_txn txn: The class txn object containing the data.
   :param integer mark: The mark value.
 
+.. js:function:: TXN.set_priority_class(txn, prio)
+
+  This function adjusts the priority class of the transaction. The value should
+  be within the range -2047..2047. Values outside this range will be
+  truncated.
+
+  See the HAProxy configuration.txt file keyword "http-request" action
+  "set-priority-class" for details.
+
+.. js:function:: TXN.set_priority_offset(txn, prio)
+
+  This function adjusts the priority offset of the transaction. The value
+  should be within the range -524287..524287. Values outside this range will be
+  truncated.
+
+  See the HAProxy configuration.txt file keyword "http-request" action
+  "set-priority-offset" for details.
+
 .. _socket_class:
 
 Socket class
diff --git a/include/types/proxy.h b/include/types/proxy.h
index 16c13a1c1..157dafdd7 100644
--- a/include/types/proxy.h
+++ b/include/types/proxy.h
@@ -322,9 +322,10 @@ struct proxy {
 		int serverfin;                  /* timeout to apply to server half-closed connections */
 	} timeout;
 	char *id, *desc;			/* proxy id (name) and description */
-	struct list pendconns;			/* pending connections with no server assigned yet */
+	struct eb_root pendconns;		/* pending connections with no server assigned yet */
 	int nbpend;				/* number of pending connections with no server assigned yet */
 	int totpend;				/* total number of pending connections on this instance (for stats) */
+	unsigned int cntdepend;			/* number of pending connections which have been de-queued */
 	unsigned int feconn, beconn;		/* # of active frontend and backends streams */
 	struct freq_ctr fe_req_per_sec;		/* HTTP requests per second on the frontend */
 	struct freq_ctr fe_conn_per_sec;	/* received connections per second on the frontend */
diff --git a/include/types/queue.h b/include/types/queue.h
index 42dbbd047..03377da69 100644
--- a/include/types/queue.h
+++ b/include/types/queue.h
@@ -35,7 +35,7 @@ struct pendconn {
 	struct stream *strm;
 	struct proxy  *px;
 	struct server *srv;        /* the server we are waiting for, may be NULL */
-	struct list    list;       /* next pendconn */
+	struct eb32_node node;
 	__decl_hathreads(HA_SPINLOCK_T lock);
 };
 
diff --git a/include/types/server.h b/include/types/server.h
index 0cd20c096..c3cd6577d 100644
--- a/include/types/server.h
+++ b/include/types/server.h
@@ -209,11 +209,12 @@ struct server {
 	int cur_sess;				/* number of currently active sessions (including syn_sent) */
 	unsigned maxconn, minconn;		/* max # of active sessions (0 = unlimited), min# for dynamic limit. */
 	int nbpend;				/* number of pending connections */
+	unsigned int cntdepend;			/* count of pending connections which have been de-queued */
 	int maxqueue;				/* maximum number of pending connections allowed */
 	struct freq_ctr sess_per_sec;		/* sessions per second on this server */
 	struct be_counters counters;		/* statistics counters */
 
-	struct list pendconns;			/* pending connections */
+	struct eb_root pendconns;		/* pending connections */
 	struct list actconns;			/* active connections */
 	struct list *priv_conns;		/* private idle connections attached to stream interfaces */
 	struct list *idle_conns;		/* sharable idle connections attached or not to a stream interface */
diff --git a/include/types/stream.h b/include/types/stream.h
index 0dbc79f44..584ddab43 100644
--- a/include/types/stream.h
+++ b/include/types/stream.h
@@ -105,8 +105,8 @@ struct strm_logs {
 	long  t_connect;                /* delay before the connect() to the server succeeds, -1 if never occurs */
 	long  t_data;                   /* delay before the first data byte from the server ... */
 	unsigned long t_close;          /* total stream duration */
-	unsigned long srv_queue_size;   /* number of streams waiting for a connect slot on this server at accept() time (in direct assignment) */
-	unsigned long prx_queue_size;   /* overall number of streams waiting for a connect slot on this instance at accept() time */
+	unsigned long srv_queue_pos;    /* number of streams de-queued while waiting for a connection slot on this server */
+	unsigned long prx_queue_pos;    /* number of streams de-qeuued while waiting for a connection slot on this instance */
 	long long bytes_in;             /* number of bytes transferred from the client to the server */
 	long long bytes_out;            /* number of bytes transferred from the server to the client */
 };
@@ -125,6 +125,9 @@ struct stream {
 
 	struct server *srv_conn;        /* stream already has a slot on a server and is not in queue */
 	struct pendconn *pend_pos;      /* if not NULL, points to the pending position in the pending queue */
+	int16_t priority_class;         /* priority class of the stream for the pending queue */
+	int32_t priority_offset;        /* priority offset of the stream for the pending queue */
+	unsigned int cntdepend;         /* value of proxy/server cntdepend at time of enqueue */
 
 	struct http_txn *txn;           /* current HTTP transaction being processed. Should become a list. */
 
diff --git a/src/cfgparse.c b/src/cfgparse.c
index 024502c4b..4e66096db 100644
--- a/src/cfgparse.c
+++ b/src/cfgparse.c
@@ -8114,6 +8114,21 @@ out_uri_auth_compat:
 			}
 		}
 
+		if ((curproxy->mode == PR_MODE_TCP || curproxy->mode == PR_MODE_HTTP) &&
+		    (curproxy->cap & PR_CAP_BE) && (curproxy->srv) &&
+		     (!curproxy->timeout.queue || curproxy->timeout.queue > (((TIMER_LOOK_BACK >> 12) & 0xfffff) / 2))) {
+			// Note that this warning isn't comprehensive.
+			// if the user specifies set-priority-offset > 'timeout queue`, wrapping
+			// may occur and get de-queued out of order. But logging this every
+			// single time might be too noisy.
+			ha_warning("config : excessive timeout queue for backend '%s'.\n"
+				   "   | The 'timeout queue' setting is either missing, or exceeds the maximum\n"
+				   "   | priority offset. If a request sits in the queue for longer than the maximum\n"
+				   "   | priority offset, it may get de-queued out of order.\n",
+				   curproxy->id);
+			err_code |= ERR_WARN;
+		}
+
 		if ((curproxy->options2 & PR_O2_CHK_ANY) == PR_O2_SSL3_CHK) {
 			curproxy->check_len = sizeof(sslv3_client_hello_pkt) - 1;
 			curproxy->check_req = malloc(curproxy->check_len);
diff --git a/src/hlua.c b/src/hlua.c
index d07e8d675..6e727648d 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -5323,6 +5323,44 @@ __LJMP static int hlua_txn_set_mark(lua_State *L)
 	return 0;
 }
 
+__LJMP static int hlua_txn_set_priority_class(lua_State *L)
+{
+	struct hlua_txn *htxn;
+	int class;
+
+	MAY_LJMP(check_args(L, 2, "set_priority_class"));
+	htxn = MAY_LJMP(hlua_checktxn(L, 1));
+	class = MAY_LJMP(luaL_checkinteger(L, 2));
+
+	if (class < -0x7ff)
+		class = -0x7ff;
+	else if (class > 0x7ff)
+		class = 0x7ff;
+
+	htxn->s->priority_class = class;
+
+	return 0;
+}
+
+__LJMP static int hlua_txn_set_priority_offset(lua_State *L)
+{
+	struct hlua_txn *htxn;
+	int offset;
+
+	MAY_LJMP(check_args(L, 2, "set_priority_offset"));
+	htxn = MAY_LJMP(hlua_checktxn(L, 1));
+	offset = MAY_LJMP(luaL_checkinteger(L, 2));
+
+	if (offset < -0x7ffff)
+		offset = -0x7ffff;
+	else if (offset > 0x7ffff)
+		offset = 0x7ffff;
+
+	htxn->s->priority_offset = offset;
+
+	return 0;
+}
+
 /* This function is an Lua binding that send pending data
  * to the client, and close the stream interface.
  */
@@ -7766,21 +7804,23 @@ void hlua_init(void)
 	lua_newtable(gL.T);
 
 	/* Register Lua functions. */
-	hlua_class_function(gL.T, "set_priv",    hlua_set_priv);
-	hlua_class_function(gL.T, "get_priv",    hlua_get_priv);
-	hlua_class_function(gL.T, "set_var",     hlua_set_var);
-	hlua_class_function(gL.T, "unset_var",   hlua_unset_var);
-	hlua_class_function(gL.T, "get_var",     hlua_get_var);
-	hlua_class_function(gL.T, "done",        hlua_txn_done);
-	hlua_class_function(gL.T, "set_loglevel",hlua_txn_set_loglevel);
-	hlua_class_function(gL.T, "set_tos",     hlua_txn_set_tos);
-	hlua_class_function(gL.T, "set_mark",    hlua_txn_set_mark);
-	hlua_class_function(gL.T, "deflog",      hlua_txn_deflog);
-	hlua_class_function(gL.T, "log",         hlua_txn_log);
-	hlua_class_function(gL.T, "Debug",       hlua_txn_log_debug);
-	hlua_class_function(gL.T, "Info",        hlua_txn_log_info);
-	hlua_class_function(gL.T, "Warning",     hlua_txn_log_warning);
-	hlua_class_function(gL.T, "Alert",       hlua_txn_log_alert);
+	hlua_class_function(gL.T, "set_priv",            hlua_set_priv);
+	hlua_class_function(gL.T, "get_priv",            hlua_get_priv);
+	hlua_class_function(gL.T, "set_var",             hlua_set_var);
+	hlua_class_function(gL.T, "unset_var",           hlua_unset_var);
+	hlua_class_function(gL.T, "get_var",             hlua_get_var);
+	hlua_class_function(gL.T, "done",                hlua_txn_done);
+	hlua_class_function(gL.T, "set_loglevel",        hlua_txn_set_loglevel);
+	hlua_class_function(gL.T, "set_tos",             hlua_txn_set_tos);
+	hlua_class_function(gL.T, "set_mark",            hlua_txn_set_mark);
+	hlua_class_function(gL.T, "set_priority_class",  hlua_txn_set_priority_class);
+	hlua_class_function(gL.T, "set_priority_offset", hlua_txn_set_priority_offset);
+	hlua_class_function(gL.T, "deflog",              hlua_txn_deflog);
+	hlua_class_function(gL.T, "log",                 hlua_txn_log);
+	hlua_class_function(gL.T, "Debug",               hlua_txn_log_debug);
+	hlua_class_function(gL.T, "Info",                hlua_txn_log_info);
+	hlua_class_function(gL.T, "Warning",             hlua_txn_log_warning);
+	hlua_class_function(gL.T, "Alert",               hlua_txn_log_alert);
 
 	lua_rawset(gL.T, -3);
 
@@ -7842,7 +7882,7 @@ void hlua_init(void)
 	socket_tcp.proxy = &socket_proxy;
 	socket_tcp.obj_type = OBJ_TYPE_SERVER;
 	LIST_INIT(&socket_tcp.actconns);
-	LIST_INIT(&socket_tcp.pendconns);
+	socket_tcp.pendconns = EB_ROOT;
 	socket_tcp.priv_conns = NULL;
 	socket_tcp.idle_conns = NULL;
 	socket_tcp.safe_conns = NULL;
@@ -7888,7 +7928,7 @@ void hlua_init(void)
 	socket_ssl.proxy = &socket_proxy;
 	socket_ssl.obj_type = OBJ_TYPE_SERVER;
 	LIST_INIT(&socket_ssl.actconns);
-	LIST_INIT(&socket_ssl.pendconns);
+	socket_ssl.pendconns = EB_ROOT;
 	socket_tcp.priv_conns = NULL;
 	socket_tcp.idle_conns = NULL;
 	socket_tcp.safe_conns = NULL;
diff --git a/src/log.c b/src/log.c
index b2d4367f4..6ddfbd6c3 100644
--- a/src/log.c
+++ b/src/log.c
@@ -2131,7 +2131,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list
 				break;
 
 			case LOG_FMT_SRVQUEUE: // %sq
-				ret = ltoa_o(s->logs.srv_queue_size, tmplog, dst + maxsize - tmplog);
+				ret = ltoa_o(s->logs.srv_queue_pos, tmplog, dst + maxsize - tmplog);
 				if (ret == NULL)
 					goto out;
 				tmplog = ret;
@@ -2139,7 +2139,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list
 				break;
 
 			case LOG_FMT_BCKQUEUE:  // %bq
-				ret = ltoa_o(s->logs.prx_queue_size, tmplog, dst + maxsize - tmplog);
+				ret = ltoa_o(s->logs.prx_queue_pos, tmplog, dst + maxsize - tmplog);
 				if (ret == NULL)
 					goto out;
 				tmplog = ret;
diff --git a/src/proto_http.c b/src/proto_http.c
index 345e889ba..5fbc27228 100644
--- a/src/proto_http.c
+++ b/src/proto_http.c
@@ -4344,8 +4344,8 @@ void http_end_txn_clean_session(struct stream *s)
 	s->logs.t_connect = -1;
 	s->logs.t_data = -1;
 	s->logs.t_close = 0;
-	s->logs.prx_queue_size = 0;  /* we get the number of pending conns before us */
-	s->logs.srv_queue_size = 0; /* we will get this number soon */
+	s->logs.prx_queue_pos = 0;  /* we get the number of pending conns before us */
+	s->logs.srv_queue_pos = 0; /* we will get this number soon */
 
 	s->logs.bytes_in = s->req.total = s->req.buf->i;
 	s->logs.bytes_out = s->res.total = s->res.buf->i;
diff --git a/src/proxy.c b/src/proxy.c
index 31253f14d..63a076855 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -726,7 +726,7 @@ void init_new_proxy(struct proxy *p)
 {
 	memset(p, 0, sizeof(struct proxy));
 	p->obj_type = OBJ_TYPE_PROXY;
-	LIST_INIT(&p->pendconns);
+	p->pendconns = EB_ROOT;
 	LIST_INIT(&p->acl);
 	LIST_INIT(&p->http_req_rules);
 	LIST_INIT(&p->http_res_rules);
diff --git a/src/queue.c b/src/queue.c
index 1c730c75c..cf445f97d 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -14,12 +14,33 @@
 #include <common/memory.h>
 #include <common/time.h>
 #include <common/hathreads.h>
+#include <eb32tree.h>
 
+#include <proto/proto_http.h>
 #include <proto/queue.h>
+#include <proto/sample.h>
 #include <proto/server.h>
 #include <proto/stream.h>
 #include <proto/stream_interface.h>
 #include <proto/task.h>
+#include <proto/tcp_rules.h>
+
+
+#define NOW_OFFSET_BOUNDARY() (now_ms - (TIMER_LOOK_BACK >> 12) & 0xfffff)
+#define KEY_CLASS(key) (key & 0xfff00000)
+#define KEY_OFFSET(key) (key & 0xfffff)
+#define KEY_CLASS_BOUNDARY(key) (KEY_CLASS(key) | NOW_OFFSET_BOUNDARY())
+
+static u32 key_incr(u32 key) {
+	u32 key_next = key + 1;
+
+	if (KEY_CLASS(key_next) != KEY_CLASS(key))
+		key_next = KEY_CLASS(key_next);
+	else if (key_next == KEY_CLASS_BOUNDARY(key))
+		key_next += 0x100000;
+
+	return key_next;
+}
 
 
 struct pool_head *pool_head_pendconn;
@@ -71,13 +92,65 @@ unsigned int srv_dynamic_maxconn(const struct server *s)
  */
 static void pendconn_unlink(struct pendconn *p)
 {
-	if (p->srv)
+	if (p->srv) {
+		p->strm->logs.srv_queue_pos += p->srv->cntdepend - p->strm->cntdepend;
 		p->srv->nbpend--;
-	else
+	} else {
+		p->strm->logs.prx_queue_pos += p->px->cntdepend - p->strm->cntdepend;
 		p->px->nbpend--;
+	}
 	HA_ATOMIC_SUB(&p->px->totpend, 1);
-	LIST_DEL(&p->list);
-	LIST_INIT(&p->list);
+	eb32_delete(&p->node);
+}
+
+/* Retrieve the next pending connection from the given pendconns ebtree with
+ * key >= min.
+ *
+ * See pendconn_add for an explanation of the key & queue behavior.
+ *
+ * This function handles all the cases where due to the timestamp wrapping
+ * the first node in the tree is not the highest priority.
+ */
+static struct pendconn *pendconn_next(struct eb_root *pendconns, u32 min) {
+	struct eb32_node *node, *node2 = NULL;
+	u32 max;
+
+	// min is inclusive
+	// max is exclusive
+	max = KEY_CLASS_BOUNDARY(min);
+
+	node = eb32_lookup_ge(pendconns, min);
+
+	if (node) {
+		if (node->key < max || (max <= min && KEY_CLASS(node->key) == KEY_CLASS(min)))
+			return eb32_entry(node, struct pendconn, node);
+		if (KEY_CLASS(node->key) != KEY_CLASS(min))
+			node2 = node;
+		if (max > min)
+			goto class_next;
+	}
+
+	if (max <= min)
+		node = eb32_lookup_ge(pendconns, KEY_CLASS(min));
+	if (!node)
+		return NULL;
+	if (node->key < max && node->key < min)
+		return eb32_entry(node, struct pendconn, node);
+
+class_next:
+	if (node2) {
+		min = KEY_CLASS_BOUNDARY(node2->key);
+		if (node2->key >= min)
+			return eb32_entry(node2, struct pendconn, node);
+	} else
+		min = KEY_CLASS_BOUNDARY(min) + 0x100000;
+	node = eb32_lookup_ge(pendconns, min);
+	if (node && KEY_CLASS(node->key) == KEY_CLASS(min))
+		return eb32_entry(node, struct pendconn, node);
+	if (node2)
+		return eb32_entry(node2, struct pendconn, node);
+
+	return NULL;
 }
 
 /* Process the next pending connection from either a server or a proxy, and
@@ -100,8 +173,9 @@ static void pendconn_unlink(struct pendconn *p)
  */
 static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
 {
-	struct pendconn *p = NULL;
+	struct pendconn *p = NULL, *pp = NULL;
 	struct server   *rsrv;
+	u32 pkey, ppkey;
 	int remote;
 
 	rsrv = srv->track;
@@ -109,43 +183,62 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
 		rsrv = srv;
 
 	if (srv->nbpend) {
-		list_for_each_entry(p, &srv->pendconns, list) {
+		for (p = pendconn_next(&srv->pendconns, NOW_OFFSET_BOUNDARY());
+		     p;
+		     p = pendconn_next(&srv->pendconns, key_incr(p->node.key)))
 			if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock))
-				goto ps_found;
-		}
-		p = NULL;
+				break;
 	}
-
-  ps_found:
-	if (srv_currently_usable(rsrv) && px->nbpend) {
-		struct pendconn *pp;
-
-		list_for_each_entry(pp, &px->pendconns, list) {
-			/* If the server pendconn is older than the proxy one,
-			 * we process the server one. */
-			if (p && !tv_islt(&pp->strm->logs.tv_request, &p->strm->logs.tv_request))
-				goto pendconn_found;
-
-			if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &pp->lock)) {
-				/* Let's switch from the server pendconn to the
-				 * proxy pendconn. Don't forget to unlock the
-				 * server pendconn, if any. */
-				if (p)
-					HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
-				p = pp;
-				goto pendconn_found;
-			}
-		}
+	if (px->nbpend) {
+		for (pp = pendconn_next(&px->pendconns, NOW_OFFSET_BOUNDARY());
+		     pp;
+		     pp = pendconn_next(&px->pendconns, key_incr(pp->node.key)))
+			if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &pp->lock))
+				break;
 	}
 
-	if (!p)
+	if (!p && !pp)
 		return 0;
 
+	if (p && !pp)
+		goto pendconn_found;
+	if (pp && !p) {
+		p = pp;
+		goto pendconn_found;
+	}
+	if (KEY_CLASS(p->node.key) < KEY_CLASS(pp->node.key)) {
+		HA_SPIN_UNLOCK(PENDCONN_LOCK, &pp->lock);
+		goto pendconn_found;
+	}
+	if (KEY_CLASS(pp->node.key) < KEY_CLASS(p->node.key)) {
+		HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+		p = pp;
+		goto pendconn_found;
+	}
+
+	pkey = KEY_OFFSET(p->node.key);
+	ppkey = KEY_OFFSET(pp->node.key);
+	if (pkey < NOW_OFFSET_BOUNDARY())
+		pkey += 0x100000;
+	if (ppkey < NOW_OFFSET_BOUNDARY())
+		ppkey += 0x100000;
+	if (pkey <= ppkey) {
+		HA_SPIN_UNLOCK(PENDCONN_LOCK, &pp->lock);
+	} else {
+		HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+		p = pp;
+	}
+
   pendconn_found:
 	pendconn_unlink(p);
 	p->strm_flags |= SF_ASSIGNED;
 	p->srv = srv;
 
+	if (p != pp)
+		srv->cntdepend++;
+	else
+		px->cntdepend++;
+
 	HA_ATOMIC_ADD(&srv->served, 1);
 	HA_ATOMIC_ADD(&srv->proxy->served, 1);
 	if (px->lbprm.server_take_conn)
@@ -184,12 +277,20 @@ void process_srv_queue(struct server *s)
 		THREAD_WANT_SYNC();
 }
 
-/* Adds the stream <strm> to the pending connection list of server <strm>->srv
+/* Adds the stream <strm> to the pending connection queue of server <strm>->srv
  * or to the one of <strm>->proxy if srv is NULL. All counters and back pointers
  * are updated accordingly. Returns NULL if no memory is available, otherwise the
  * pendconn itself. If the stream was already marked as served, its flag is
  * cleared. It is illegal to call this function with a non-NULL strm->srv_conn.
  *
+ * The queue is sorted by the composition of the priority_class, and the current
+ * timestamp offset by strm->priority_offset. The timestamp is in milliseconds
+ * and truncated to 20 bits, so will wrap every 17m28s575ms.
+ * The offset can be positive or negative, and an offset of 0 puts it in the
+ * middle of this range (~ 8 min). Note that this also means if the adjusted
+ * timestamp wraps around, the request will be misinterpreted as being of
+ * the higest priority for that priority class.
+ *
  * This function must be called by the stream itself, so in the context of
  * process_stream.
  */
@@ -206,6 +307,8 @@ struct pendconn *pendconn_add(struct stream *strm)
 	srv = objt_server(strm->target);
 	px  = strm->be;
 
+	p->node.key = ((u32)(strm->priority_class + 0x7ff) << 20) |
+		      ((u32)(now_ms + strm->priority_offset) & 0xfffff);
 	p->srv        = NULL;
 	p->px         = px;
 	p->strm       = strm;
@@ -216,19 +319,19 @@ struct pendconn *pendconn_add(struct stream *strm)
 		p->srv = srv;
 		HA_SPIN_LOCK(SERVER_LOCK, &srv->lock);
 		srv->nbpend++;
-		strm->logs.srv_queue_size += srv->nbpend;
+		strm->cntdepend = srv->cntdepend;
 		if (srv->nbpend > srv->counters.nbpend_max)
 			srv->counters.nbpend_max = srv->nbpend;
-		LIST_ADDQ(&srv->pendconns, &p->list);
+		eb32_insert(&srv->pendconns, &p->node);
 		HA_SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
 	}
 	else {
 		HA_SPIN_LOCK(PROXY_LOCK, &px->lock);
 		px->nbpend++;
-		strm->logs.prx_queue_size += px->nbpend;
+		strm->cntdepend = px->cntdepend;
 		if (px->nbpend > px->be_counters.nbpend_max)
 			px->be_counters.nbpend_max = px->nbpend;
-		LIST_ADDQ(&px->pendconns, &p->list);
+		eb32_insert(&px->pendconns, &p->node);
 		HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock);
 	}
 	HA_ATOMIC_ADD(&px->totpend, 1);
@@ -241,7 +344,8 @@ struct pendconn *pendconn_add(struct stream *strm)
  */
 int pendconn_redistribute(struct server *s)
 {
-	struct pendconn *p, *pback;
+	struct pendconn *p;
+	struct eb32_node *node;
 	int xferred = 0;
 	int remote = 0;
 
@@ -251,7 +355,10 @@ int pendconn_redistribute(struct server *s)
 		return 0;
 
 	HA_SPIN_LOCK(SERVER_LOCK, &s->lock);
-	list_for_each_entry_safe(p, pback, &s->pendconns, list) {
+	for (node = eb32_first(&s->pendconns);
+	     node;
+	     node = eb32_lookup_ge(&s->pendconns, key_incr(node->key))) {
+		p = eb32_entry(&node, struct pendconn, node);
 		if (p->strm_flags & SF_FORCE_PRST)
 			continue;
 
@@ -280,7 +387,8 @@ int pendconn_redistribute(struct server *s)
  */
 int pendconn_grab_from_px(struct server *s)
 {
-	struct pendconn *p, *pback;
+	struct pendconn *p;
+	struct eb32_node *node;
 	int maxconn, xferred = 0;
 	int remote = 0;
 
@@ -289,7 +397,10 @@ int pendconn_grab_from_px(struct server *s)
 
 	HA_SPIN_LOCK(PROXY_LOCK, &s->proxy->lock);
 	maxconn = srv_dynamic_maxconn(s);
-	list_for_each_entry_safe(p, pback, &s->proxy->pendconns, list) {
+	for (node = eb32_first(&s->proxy->pendconns);
+	     node;
+	     node = eb32_lookup_ge(&s->proxy->pendconns, key_incr(node->key))) {
+		p = eb32_entry(&node, struct pendconn, node);
 		if (s->maxconn && s->served + xferred >= maxconn)
 			break;
 
@@ -337,7 +448,7 @@ int pendconn_dequeue(struct stream *strm)
 
 	/* the pendconn is still linked to the server/proxy queue, so unlock it
 	 * and go away. */
-	if (!LIST_ISEMPTY(&p->list)) {
+	if (p->node.node.leaf_p) {
 		HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
 		return 1;
 	}
@@ -369,19 +480,21 @@ void pendconn_free(struct pendconn *p)
 	HA_SPIN_LOCK(PENDCONN_LOCK, &p->lock);
 
 	/* The pendconn was already unlinked, just release it. */
-	if (LIST_ISEMPTY(&p->list))
+	if (!p->node.node.leaf_p)
 		goto release;
 
 	if (p->srv) {
 		HA_SPIN_LOCK(SERVER_LOCK, &p->srv->lock);
+		p->strm->logs.srv_queue_pos += p->srv->cntdepend - p->strm->cntdepend;
 		p->srv->nbpend--;
-		LIST_DEL(&p->list);
+		eb32_delete(&p->node);
 		HA_SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock);
 	}
 	else {
 		HA_SPIN_LOCK(PROXY_LOCK, &p->px->lock);
+		p->strm->logs.prx_queue_pos += p->px->cntdepend - p->strm->cntdepend;
 		p->px->nbpend--;
-		LIST_DEL(&p->list);
+		eb32_delete(&p->node);
 		HA_SPIN_UNLOCK(PROXY_LOCK, &p->px->lock);
 	}
 	HA_ATOMIC_SUB(&p->px->totpend, 1);
@@ -392,6 +505,150 @@ void pendconn_free(struct pendconn *p)
 	pool_free(pool_head_pendconn, p);
 }
 
+static enum act_return action_set_priority_class(struct act_rule *rule, struct proxy *px,
+                                                 struct session *sess, struct stream *s, int flags)
+{
+	struct sample *smp;
+
+	smp = sample_fetch_as_type(px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL, rule->arg.expr, SMP_T_SINT);
+	if (!smp)
+		return ACT_RET_CONT;
+
+	if (smp->data.u.sint < -0x7ff)
+		smp->data.u.sint = -0x7ff;
+	else if (smp->data.u.sint > 0x7ff)
+		smp->data.u.sint = 0x7ff;
+
+	s->priority_class = smp->data.u.sint;
+	return ACT_RET_CONT;
+}
+
+static enum act_return action_set_priority_offset(struct act_rule *rule, struct proxy *px,
+                                                  struct session *sess, struct stream *s, int flags)
+{
+	struct sample *smp;
+
+	smp = sample_fetch_as_type(px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL, rule->arg.expr, SMP_T_SINT);
+	if (!smp)
+		return ACT_RET_CONT;
+
+	if (smp->data.u.sint < -0x7ffff)
+		smp->data.u.sint = -0x7ffff;
+	else if (smp->data.u.sint > 0x7ffff)
+		smp->data.u.sint = 0x7ffff;
+
+	s->priority_offset = smp->data.u.sint;
+
+	return ACT_RET_CONT;
+}
+
+static enum act_parse_ret parse_set_priority_class(const char **args, int *arg, struct proxy *px,
+                                                   struct act_rule *rule, char **err)
+{
+	unsigned int where = 0;
+
+	rule->arg.expr = sample_parse_expr((char **)args, arg, px->conf.args.file,
+	                                   px->conf.args.line, err, &px->conf.args);
+	if (!rule->arg.expr)
+		return ACT_RET_PRS_ERR;
+
+	if (px->cap & PR_CAP_FE)
+		where |= SMP_VAL_FE_HRQ_HDR;
+	if (px->cap & PR_CAP_BE)
+		where |= SMP_VAL_BE_HRQ_HDR;
+
+	if (!(rule->arg.expr->fetch->val & where)) {
+		memprintf(err,
+			  "fetch method '%s' extracts information from '%s', none of which is available here",
+			  args[0], sample_src_names(rule->arg.expr->fetch->use));
+		free(rule->arg.expr);
+		return ACT_RET_PRS_ERR;
+	}
+
+	rule->action     = ACT_CUSTOM;
+	rule->action_ptr = action_set_priority_class;
+	return ACT_RET_PRS_OK;
+}
+
+static enum act_parse_ret parse_set_priority_offset(const char **args, int *arg, struct proxy *px,
+                                                    struct act_rule *rule, char **err)
+{
+	unsigned int where = 0;
+
+	rule->arg.expr = sample_parse_expr((char **)args, arg, px->conf.args.file,
+	                                   px->conf.args.line, err, &px->conf.args);
+	if (!rule->arg.expr)
+		return ACT_RET_PRS_ERR;
+
+	if (px->cap & PR_CAP_FE)
+		where |= SMP_VAL_FE_HRQ_HDR;
+	if (px->cap & PR_CAP_BE)
+		where |= SMP_VAL_BE_HRQ_HDR;
+
+	if (!(rule->arg.expr->fetch->val & where)) {
+		memprintf(err,
+			  "fetch method '%s' extracts information from '%s', none of which is available here",
+			  args[0], sample_src_names(rule->arg.expr->fetch->use));
+		free(rule->arg.expr);
+		return ACT_RET_PRS_ERR;
+	}
+
+	rule->action     = ACT_CUSTOM;
+	rule->action_ptr = action_set_priority_offset;
+	return ACT_RET_PRS_OK;
+}
+
+static struct action_kw_list tcp_cont_kws = {ILH, {
+	{ "set-priority-class", parse_set_priority_class },
+	{ "set-priority-offset", parse_set_priority_offset },
+	{ /* END */ }
+}};
+
+static struct action_kw_list http_req_kws = {ILH, {
+	{ "set-priority-class", parse_set_priority_class },
+	{ "set-priority-offset", parse_set_priority_offset },
+	{ /* END */ }
+}};
+
+static int
+smp_fetch_priority_class(const struct arg *args, struct sample *smp, const char *kw, void *private)
+{
+	if (!smp->strm)
+		return 0;
+
+	smp->data.type = SMP_T_SINT;
+	smp->data.u.sint = smp->strm->priority_class;
+
+	return 1;
+}
+
+static int
+smp_fetch_priority_offset(const struct arg *args, struct sample *smp, const char *kw, void *private)
+{
+	if (!smp->strm)
+		return 0;
+
+	smp->data.type = SMP_T_SINT;
+	smp->data.u.sint = smp->strm->priority_offset;
+
+	return 1;
+}
+
+
+static struct sample_fetch_kw_list smp_kws = {ILH, {
+	{ "prio_class", smp_fetch_priority_class, 0, NULL, SMP_T_SINT, SMP_USE_INTRN, },
+	{ "prio_offset", smp_fetch_priority_offset, 0, NULL, SMP_T_SINT, SMP_USE_INTRN, },
+	{ /* END */},
+}};
+
+__attribute__((constructor))
+static void __queue_init(void)
+{
+	tcp_req_cont_keywords_register(&tcp_cont_kws);
+	http_req_keywords_register(&http_req_kws);
+	sample_register_fetches(&smp_kws);
+}
+
 /*
  * Local variables:
  *  c-indent-level: 8
diff --git a/src/server.c b/src/server.c
index ebac357fb..5617d8d67 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1575,7 +1575,7 @@ static struct server *new_server(struct proxy *proxy)
 	srv->obj_type = OBJ_TYPE_SERVER;
 	srv->proxy = proxy;
 	LIST_INIT(&srv->actconns);
-	LIST_INIT(&srv->pendconns);
+	srv->pendconns = EB_ROOT;
 
 	if ((srv->priv_conns = calloc(global.nbthread, sizeof(*srv->priv_conns))) == NULL)
 		goto free_srv;
diff --git a/src/stream.c b/src/stream.c
index 1d0b22ca3..39d16689b 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -125,8 +125,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
 	s->logs.t_data = -1;
 	s->logs.t_close = 0;
 	s->logs.bytes_in = s->logs.bytes_out = 0;
-	s->logs.prx_queue_size = 0;  /* we get the number of pending conns before us */
-	s->logs.srv_queue_size = 0; /* we will get this number soon */
+	s->logs.prx_queue_pos = 0;  /* we get the number of pending conns before us */
+	s->logs.srv_queue_pos = 0; /* we will get this number soon */
 
 	/* default logging function */
 	s->do_log = strm_log;
@@ -220,6 +220,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
 	s->target = sess->listener ? sess->listener->default_target : NULL;
 
 	s->pend_pos = NULL;
+	s->priority_class = 0;
+	s->priority_offset = 0;
 
 	/* init store persistence */
 	s->store_count = 0;
@@ -2467,6 +2469,10 @@ struct task *process_stream(struct task *t)
 		return t; /* nothing more to do */
 	}
 
+	// remove from pending queue here so we update counters
+	if (s->pend_pos)
+		pendconn_free(s->pend_pos);
+
 	if (s->flags & SF_BE_ASSIGNED)
 		HA_ATOMIC_SUB(&s->be->beconn, 1);
 

Reply via email to