This adds the set-priority-class and set-priority-offset actions to
http-request and tcp-request content.
The priority values are used when connections are queued to determine
which connections should be served first. The lowest priority class is
served first. When multiple requests from the same class are found, the
earliest (according to queue_time + offset) is served first.
---
 include/types/proxy.h  |   2 +-
 include/types/queue.h  |   2 +-
 include/types/server.h |   2 +-
 include/types/stream.h |   2 +
 src/proxy.c            |   2 +-
 src/queue.c            | 280
++++++++++++++++++++++++++++++++++++++++++-------
 src/server.c           |   2 +-
 src/stream.c           |   2 +
 8 files changed, 250 insertions(+), 44 deletions(-)


diff --git a/include/types/proxy.h b/include/types/proxy.h
index 16c13a1c1..2cc1dfd9e 100644
--- a/include/types/proxy.h
+++ b/include/types/proxy.h
@@ -322,7 +322,7 @@ struct proxy {
 		int serverfin;                  /* timeout to apply to server half-closed connections */
 	} timeout;
 	char *id, *desc;			/* proxy id (name) and description */
-	struct list pendconns;			/* pending connections with no server assigned yet */
+	struct eb_root pendconns;			/* pending connections with no server assigned yet */
 	int nbpend;				/* number of pending connections with no server assigned yet */
 	int totpend;				/* total number of pending connections on this instance (for stats) */
 	unsigned int feconn, beconn;		/* # of active frontend and backends streams */
diff --git a/include/types/queue.h b/include/types/queue.h
index 42dbbd047..03377da69 100644
--- a/include/types/queue.h
+++ b/include/types/queue.h
@@ -35,7 +35,7 @@ struct pendconn {
 	struct stream *strm;
 	struct proxy  *px;
 	struct server *srv;        /* the server we are waiting for, may be NULL */
-	struct list    list;       /* next pendconn */
+	struct eb32_node node;
 	__decl_hathreads(HA_SPINLOCK_T lock);
 };
 
diff --git a/include/types/server.h b/include/types/server.h
index 0cd20c096..5339c911e 100644
--- a/include/types/server.h
+++ b/include/types/server.h
@@ -213,7 +213,7 @@ struct server {
 	struct freq_ctr sess_per_sec;		/* sessions per second on this server */
 	struct be_counters counters;		/* statistics counters */
 
-	struct list pendconns;			/* pending connections */
+	struct eb_root pendconns;			/* pending connections */
 	struct list actconns;			/* active connections */
 	struct list *priv_conns;		/* private idle connections attached to stream interfaces */
 	struct list *idle_conns;		/* sharable idle connections attached or not to a stream interface */
diff --git a/include/types/stream.h b/include/types/stream.h
index 0dbc79f44..489a0b648 100644
--- a/include/types/stream.h
+++ b/include/types/stream.h
@@ -125,6 +125,8 @@ struct stream {
 
 	struct server *srv_conn;        /* stream already has a slot on a server and is not in queue */
 	struct pendconn *pend_pos;      /* if not NULL, points to the pending position in the pending queue */
+	int16_t priority_class;         /* priority class of the stream for the pending queue */
+	int32_t priority_offset;        /* priority offset of the stream for the pending queue */
 
 	struct http_txn *txn;           /* current HTTP transaction being processed. Should become a list. */
 
diff --git a/src/proxy.c b/src/proxy.c
index 31253f14d..88e274b6b 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -726,7 +726,7 @@ void init_new_proxy(struct proxy *p)
 {
 	memset(p, 0, sizeof(struct proxy));
 	p->obj_type = OBJ_TYPE_PROXY;
-	LIST_INIT(&p->pendconns);
+	memset(&p->pendconns, 0, sizeof(p->pendconns));
 	LIST_INIT(&p->acl);
 	LIST_INIT(&p->http_req_rules);
 	LIST_INIT(&p->http_res_rules);
diff --git a/src/queue.c b/src/queue.c
index 1c730c75c..28d36548d 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -14,12 +14,16 @@
 #include <common/memory.h>
 #include <common/time.h>
 #include <common/hathreads.h>
+#include <eb32tree.h>
 
+#include <proto/proto_http.h>
 #include <proto/queue.h>
+#include <proto/sample.h>
 #include <proto/server.h>
 #include <proto/stream.h>
 #include <proto/stream_interface.h>
 #include <proto/task.h>
+#include <proto/tcp_rules.h>
 
 
 struct pool_head *pool_head_pendconn;
@@ -76,8 +80,69 @@ static void pendconn_unlink(struct pendconn *p)
 	else
 		p->px->nbpend--;
 	HA_ATOMIC_SUB(&p->px->totpend, 1);
-	LIST_DEL(&p->list);
-	LIST_INIT(&p->list);
+	eb32_delete(&p->node);
+}
+
+#define offset_boundary() (now_ms - (TIMER_LOOK_BACK >> 12) & 0xfffff)
+#define key_class(key) (key & 0xfff00000)
+#define key_offset(key) (key & 0xfffff)
+#define key_class_boundary(key) (key_class(key) | offset_boundary())
+static u32 key_incr(u32 key) {
+	u32 key_next;
+
+	key_next = key + 1;
+	if (key_class(key_next) != key_class(key))
+		key_next = key_class(key_next);
+	else if (key_next == key_class_boundary(key))
+		key_next += 0x100000;
+
+	return key_next;
+}
+
+static struct pendconn *next_pendconn(struct eb_root *pendconns, u32 min) {
+	struct eb32_node *node, *node2 = NULL;
+	u32 max;
+
+	// min is inclusive
+	// max is exclusive
+	max = key_class_boundary(min);
+
+	node = eb32_lookup_ge(pendconns, min);
+
+	if (node) {
+		if (node->key < max || (max <= min && key_class(node->key) == key_class(min)))
+			return eb32_entry(node, struct pendconn, node);
+		if (key_class(node->key) != key_class(min))
+			node2 = node;
+		if (max > min)
+			goto class_next;
+	}
+
+	if (max <= min)
+		node = eb32_lookup_ge(pendconns, key_class(min));
+	if (!node)
+		return NULL;
+	if (node->key < max && node->key < min)
+		return eb32_entry(node, struct pendconn, node);
+
+class_next:
+	if (node2) {
+		min = key_class_boundary(node2->key);
+		if (node2->key >= min)
+			return eb32_entry(node2, struct pendconn, node);
+	} else
+		min = key_class_boundary(min) + 0x100000;
+	node = eb32_lookup_ge(pendconns, min);
+	if (node && key_class(node->key) == key_class(min))
+		return eb32_entry(node, struct pendconn, node);
+	if (node2)
+		return eb32_entry(node2, struct pendconn, node);
+	// theoretically this should never find anything we haven't already found
+	//node = eb32_lookup_ge(pendconns, key_class(min));
+	//if (node)
+	//	return eb32_entry(node, struct pendconn, node);
+
+	return NULL;
 }
 
 /* Process the next pending connection from either a server or a proxy, and
@@ -100,8 +165,9 @@ static void pendconn_unlink(struct pendconn *p)
  */
 static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
 {
-	struct pendconn *p = NULL;
+	struct pendconn *p = NULL, *pp = NULL;
 	struct server   *rsrv;
+	u32 pkey, ppkey;
 	int remote;
 
 	rsrv = srv->track;
@@ -109,38 +175,53 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
 		rsrv = srv;
 
 	if (srv->nbpend) {
-		list_for_each_entry(p, &srv->pendconns, list) {
+		for (p = next_pendconn(&srv->pendconns, offset_boundary());
+		     p;
+		     p = next_pendconn(&srv->pendconns, key_incr(p->node.key)))
 			if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock))
-				goto ps_found;
-		}
-		p = NULL;
+				break;
 	}
-
-  ps_found:
-	if (srv_currently_usable(rsrv) && px->nbpend) {
-		struct pendconn *pp;
-
-		list_for_each_entry(pp, &px->pendconns, list) {
-			/* If the server pendconn is older than the proxy one,
-			 * we process the server one. */
-			if (p && !tv_islt(&pp->strm->logs.tv_request, &p->strm->logs.tv_request))
-				goto pendconn_found;
-
-			if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &pp->lock)) {
-				/* Let's switch from the server pendconn to the
-				 * proxy pendconn. Don't forget to unlock the
-				 * server pendconn, if any. */
-				if (p)
-					HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
-				p = pp;
-				goto pendconn_found;
-			}
-		}
+	if (px->nbpend) {
+		for (pp = next_pendconn(&px->pendconns, offset_boundary());
+		     pp;
+		     pp = next_pendconn(&px->pendconns, key_incr(pp->node.key)))
+			if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &pp->lock))
+				break;
 	}
 
-	if (!p)
+	if (!p && !pp)
 		return 0;
 
+	if (p && !pp)
+		goto pendconn_found;
+	if (pp && !p) {
+		p = pp;
+		goto pendconn_found;
+	}
+	if (key_class(p->node.key) < key_class(pp->node.key)) {
+		HA_SPIN_UNLOCK(PENDCONN_LOCK, &pp->lock);
+		goto pendconn_found;
+	}
+	if (key_class(pp->node.key) < key_class(p->node.key)) {
+		HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+		p = pp;
+		goto pendconn_found;
+	}
+
+	pkey = key_offset(p->node.key);
+	ppkey = key_offset(pp->node.key);
+	if (pkey < offset_boundary())
+		pkey += (TIMER_LOOK_BACK >> 12);
+	if (ppkey < offset_boundary())
+		ppkey += (TIMER_LOOK_BACK >> 12);
+	if (pkey <= ppkey) {
+		HA_SPIN_UNLOCK(PENDCONN_LOCK, &pp->lock);
+		goto pendconn_found;
+	}
+	HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+	p = pp;
+	goto pendconn_found;
+
   pendconn_found:
 	pendconn_unlink(p);
 	p->strm_flags |= SF_ASSIGNED;
@@ -206,6 +287,7 @@ struct pendconn *pendconn_add(struct stream *strm)
 	srv = objt_server(strm->target);
 	px  = strm->be;
 
+	p->node.key = ((u32)(strm->priority_class + 0x7ff) << 20) | ((u32)(now_ms + strm->priority_offset) & 0xfffff);
 	p->srv        = NULL;
 	p->px         = px;
 	p->strm       = strm;
@@ -219,7 +301,7 @@ struct pendconn *pendconn_add(struct stream *strm)
 		strm->logs.srv_queue_size += srv->nbpend;
 		if (srv->nbpend > srv->counters.nbpend_max)
 			srv->counters.nbpend_max = srv->nbpend;
-		LIST_ADDQ(&srv->pendconns, &p->list);
+		eb32_insert(&srv->pendconns, &p->node);
 		HA_SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
 	}
 	else {
@@ -228,7 +310,7 @@ struct pendconn *pendconn_add(struct stream *strm)
 		strm->logs.prx_queue_size += px->nbpend;
 		if (px->nbpend > px->be_counters.nbpend_max)
 			px->be_counters.nbpend_max = px->nbpend;
-		LIST_ADDQ(&px->pendconns, &p->list);
+		eb32_insert(&px->pendconns, &p->node);
 		HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock);
 	}
 	HA_ATOMIC_ADD(&px->totpend, 1);
@@ -241,9 +323,10 @@ struct pendconn *pendconn_add(struct stream *strm)
  */
 int pendconn_redistribute(struct server *s)
 {
-	struct pendconn *p, *pback;
+	struct pendconn *p;
 	int xferred = 0;
 	int remote = 0;
+	struct eb32_node *node;
 
 	/* The REDISP option was specified. We will ignore cookie and force to
 	 * balance or use the dispatcher. */
@@ -251,7 +334,10 @@ int pendconn_redistribute(struct server *s)
 		return 0;
 
 	HA_SPIN_LOCK(SERVER_LOCK, &s->lock);
-	list_for_each_entry_safe(p, pback, &s->pendconns, list) {
+	for (node = eb32_first(&s->pendconns);
+	     node;
+	     node = eb32_lookup_ge(&s->pendconns, node->key + 1)) {
+		p = eb32_entry(&node, struct pendconn, node);
 		if (p->strm_flags & SF_FORCE_PRST)
 			continue;
 
@@ -280,7 +366,8 @@ int pendconn_redistribute(struct server *s)
  */
 int pendconn_grab_from_px(struct server *s)
 {
-	struct pendconn *p, *pback;
+	struct pendconn *p;
+	struct eb32_node *node;
 	int maxconn, xferred = 0;
 	int remote = 0;
 
@@ -289,7 +376,10 @@ int pendconn_grab_from_px(struct server *s)
 
 	HA_SPIN_LOCK(PROXY_LOCK, &s->proxy->lock);
 	maxconn = srv_dynamic_maxconn(s);
-	list_for_each_entry_safe(p, pback, &s->proxy->pendconns, list) {
+	for (node = eb32_first(&s->proxy->pendconns);
+	     node;
+	     node = eb32_lookup_ge(&s->proxy->pendconns, node->key + 1)) {
+		p = eb32_entry(&node, struct pendconn, node);
 		if (s->maxconn && s->served + xferred >= maxconn)
 			break;
 
@@ -337,7 +427,7 @@ int pendconn_dequeue(struct stream *strm)
 
 	/* the pendconn is still linked to the server/proxy queue, so unlock it
 	 * and go away. */
-	if (!LIST_ISEMPTY(&p->list)) {
+	if (p->node.node.leaf_p) {
 		HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
 		return 1;
 	}
@@ -369,19 +459,19 @@ void pendconn_free(struct pendconn *p)
 	HA_SPIN_LOCK(PENDCONN_LOCK, &p->lock);
 
 	/* The pendconn was already unlinked, just release it. */
-	if (LIST_ISEMPTY(&p->list))
+	if (!p->node.node.leaf_p)
 		goto release;
 
 	if (p->srv) {
 		HA_SPIN_LOCK(SERVER_LOCK, &p->srv->lock);
 		p->srv->nbpend--;
-		LIST_DEL(&p->list);
+		eb32_delete(&p->node);
 		HA_SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock);
 	}
 	else {
 		HA_SPIN_LOCK(PROXY_LOCK, &p->px->lock);
 		p->px->nbpend--;
-		LIST_DEL(&p->list);
+		eb32_delete(&p->node);
 		HA_SPIN_UNLOCK(PROXY_LOCK, &p->px->lock);
 	}
 	HA_ATOMIC_SUB(&p->px->totpend, 1);
@@ -392,6 +482,118 @@ void pendconn_free(struct pendconn *p)
 	pool_free(pool_head_pendconn, p);
 }
 
+static enum act_return action_set_priority_class(struct act_rule *rule, struct proxy *px,
+                                                 struct session *sess, struct stream *s, int flags)
+{
+	struct sample *smp;
+
+	smp = sample_fetch_as_type(px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL, rule->arg.expr, SMP_T_SINT);
+	if (!smp)
+		return ACT_RET_CONT;
+
+	if (smp->data.u.sint < -0x7ff)
+		smp->data.u.sint = -0x7ff;
+	else if (smp->data.u.sint > 0x7ff)
+		smp->data.u.sint = 0x7ff;
+
+	s->priority_class = smp->data.u.sint;
+	return ACT_RET_CONT;
+}
+
+static enum act_return action_set_priority_offset(struct act_rule *rule, struct proxy *px,
+                                                  struct session *sess, struct stream *s, int flags)
+{
+	struct sample *smp;
+
+	smp = sample_fetch_as_type(px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL, rule->arg.expr, SMP_T_SINT);
+	if (!smp)
+		return ACT_RET_CONT;
+
+	if (smp->data.u.sint < -0x7ffff)
+		smp->data.u.sint = -0x7ffff;
+	else if (smp->data.u.sint > 0x7ffff)
+		smp->data.u.sint = 0x7ffff;
+
+	s->priority_offset = smp->data.u.sint;
+
+	return ACT_RET_CONT;
+}
+
+static enum act_parse_ret parse_set_priority_class(const char **args, int *arg, struct proxy *px,
+                                                   struct act_rule *rule, char **err)
+{
+	unsigned int where = 0;
+
+	rule->arg.expr = sample_parse_expr((char **)args, arg, px->conf.args.file,
+	                                   px->conf.args.line, err, &px->conf.args);
+	if (!rule->arg.expr)
+		return ACT_RET_PRS_ERR;
+
+	if (px->cap & PR_CAP_FE)
+		where |= SMP_VAL_FE_HRQ_HDR;
+	if (px->cap & PR_CAP_BE)
+		where |= SMP_VAL_BE_HRQ_HDR;
+
+	if (!(rule->arg.expr->fetch->val & where)) {
+		memprintf(err,
+			  "fetch method '%s' extracts information from '%s', none of which is available here",
+			  args[0], sample_src_names(rule->arg.expr->fetch->use));
+		free(rule->arg.expr);
+		return ACT_RET_PRS_ERR;
+	}
+
+	rule->action     = ACT_CUSTOM;
+	rule->action_ptr = action_set_priority_class;
+	return ACT_RET_PRS_OK;
+}
+
+static enum act_parse_ret parse_set_priority_offset(const char **args, int *arg, struct proxy *px,
+                                                    struct act_rule *rule, char **err)
+{
+	unsigned int where = 0;
+
+	rule->arg.expr = sample_parse_expr((char **)args, arg, px->conf.args.file,
+	                                        px->conf.args.line, err, &px->conf.args);
+	if (!rule->arg.expr)
+		return ACT_RET_PRS_ERR;
+
+	if (px->cap & PR_CAP_FE)
+		where |= SMP_VAL_FE_HRQ_HDR;
+	if (px->cap & PR_CAP_BE)
+		where |= SMP_VAL_BE_HRQ_HDR;
+
+	if (!(rule->arg.expr->fetch->val & where)) {
+		memprintf(err,
+			  "fetch method '%s' extracts information from '%s', none of which is available here",
+			  args[0], sample_src_names(rule->arg.expr->fetch->use));
+		free(rule->arg.expr);
+		return ACT_RET_PRS_ERR;
+	}
+
+	rule->action     = ACT_CUSTOM;
+	rule->action_ptr = action_set_priority_offset;
+	return ACT_RET_PRS_OK;
+}
+
+static struct action_kw_list tcp_cont_kws = {ILH, {
+	{ "set-priority-class", parse_set_priority_class },
+	{ "set-priority-offset", parse_set_priority_offset },
+	{ /* END */ }
+}};
+
+static struct action_kw_list http_req_kws = {ILH, {
+	{ "set-priority-class", parse_set_priority_class },
+	{ "set-priority-offset", parse_set_priority_offset },
+	{ /* END */ }
+}};
+
+__attribute__((constructor))
+static void __queue_init(void)
+{
+	tcp_req_cont_keywords_register(&tcp_cont_kws);
+	http_req_keywords_register(&http_req_kws);
+}
+
 /*
  * Local variables:
  *  c-indent-level: 8
diff --git a/src/server.c b/src/server.c
index ebac357fb..743b8c6cd 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1575,7 +1575,7 @@ static struct server *new_server(struct proxy *proxy)
 	srv->obj_type = OBJ_TYPE_SERVER;
 	srv->proxy = proxy;
 	LIST_INIT(&srv->actconns);
-	LIST_INIT(&srv->pendconns);
+	memset(&srv->pendconns, 0, sizeof(srv->pendconns));
 
 	if ((srv->priv_conns = calloc(global.nbthread, sizeof(*srv->priv_conns))) == NULL)
 		goto free_srv;
diff --git a/src/stream.c b/src/stream.c
index 1d0b22ca3..9b22becc9 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -220,6 +220,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
 	s->target = sess->listener ? sess->listener->default_target : NULL;
 
 	s->pend_pos = NULL;
+	s->priority_class = 0;
+	s->priority_offset = 0;
 
 	/* init store persistence */
 	s->store_count = 0;

Reply via email to