On 2018/5/10 01:35, Patrick Hemmer wrote:
> This adds the set-priority-class and set-priority-offset actions to
> http-request and tcp-request content.
> The priority values are used when connections are queued to determine
> which connections should be served first. The lowest priority class is
> served first. When multiple requests from the same class are found, the
> earliest (according to queue_time + offset) is served first.
> ---
>  include/types/proxy.h  |   2 +-
>  include/types/queue.h  |   2 +-
>  include/types/server.h |   2 +-
>  include/types/stream.h |   2 +
>  src/proxy.c            |   2 +-
>  src/queue.c            | 280
> ++++++++++++++++++++++++++++++++++++++++++-------
>  src/server.c           |   2 +-
>  src/stream.c           |   2 +
>  8 files changed, 250 insertions(+), 44 deletions(-)
>
>
Noticed I had some uncommitted changes. Not much, but previous version
might not work right.

-Patrick
From 240a54488c01c4b0a15a561b8e7533922a487492 Mon Sep 17 00:00:00 2001
From: Patrick Hemmer <hapr...@stormcloud9.net>
Date: Fri, 4 May 2018 16:31:16 -0400
Subject: [PATCH] MEDIUM: add set-priority-class and set-priority-offset

This adds the set-priority-class and set-priority-offset actions to 
http-request and tcp-request content.
The priority values are used when connections are queued to determine which 
connections should be served first. The lowest priority class is served first. 
When multiple requests from the same class are found, the earliest (according 
to queue_time + offset) is served first.
---
 include/types/proxy.h  |   2 +-
 include/types/queue.h  |   2 +-
 include/types/server.h |   2 +-
 include/types/stream.h |   2 +
 src/hlua.c             |   4 +-
 src/proxy.c            |   2 +-
 src/queue.c            | 280 ++++++++++++++++++++++++++++++++++++++++++-------
 src/server.c           |   2 +-
 src/stream.c           |   2 +
 9 files changed, 252 insertions(+), 46 deletions(-)

diff --git a/include/types/proxy.h b/include/types/proxy.h
index 16c13a1c1..2cc1dfd9e 100644
--- a/include/types/proxy.h
+++ b/include/types/proxy.h
@@ -322,7 +322,7 @@ struct proxy {
                int serverfin;                  /* timeout to apply to server 
half-closed connections */
        } timeout;
        char *id, *desc;                        /* proxy id (name) and 
description */
-       struct list pendconns;                  /* pending connections with no 
server assigned yet */
+       struct eb_root pendconns;                       /* pending connections 
with no server assigned yet */
        int nbpend;                             /* number of pending 
connections with no server assigned yet */
        int totpend;                            /* total number of pending 
connections on this instance (for stats) */
        unsigned int feconn, beconn;            /* # of active frontend and 
backends streams */
diff --git a/include/types/queue.h b/include/types/queue.h
index 42dbbd047..03377da69 100644
--- a/include/types/queue.h
+++ b/include/types/queue.h
@@ -35,7 +35,7 @@ struct pendconn {
        struct stream *strm;
        struct proxy  *px;
        struct server *srv;        /* the server we are waiting for, may be 
NULL */
-       struct list    list;       /* next pendconn */
+       struct eb32_node node;
        __decl_hathreads(HA_SPINLOCK_T lock);
 };
 
diff --git a/include/types/server.h b/include/types/server.h
index 0cd20c096..5339c911e 100644
--- a/include/types/server.h
+++ b/include/types/server.h
@@ -213,7 +213,7 @@ struct server {
        struct freq_ctr sess_per_sec;           /* sessions per second on this 
server */
        struct be_counters counters;            /* statistics counters */
 
-       struct list pendconns;                  /* pending connections */
+       struct eb_root pendconns;                       /* pending connections 
*/
        struct list actconns;                   /* active connections */
        struct list *priv_conns;                /* private idle connections 
attached to stream interfaces */
        struct list *idle_conns;                /* sharable idle connections 
attached or not to a stream interface */
diff --git a/include/types/stream.h b/include/types/stream.h
index 0dbc79f44..489a0b648 100644
--- a/include/types/stream.h
+++ b/include/types/stream.h
@@ -125,6 +125,8 @@ struct stream {
 
        struct server *srv_conn;        /* stream already has a slot on a 
server and is not in queue */
        struct pendconn *pend_pos;      /* if not NULL, points to the pending 
position in the pending queue */
+       int16_t priority_class;         /* priority class of the stream for the 
pending queue */
+       int32_t priority_offset;        /* priority offset of the stream for 
the pending queue */
 
        struct http_txn *txn;           /* current HTTP transaction being 
processed. Should become a list. */
 
diff --git a/src/hlua.c b/src/hlua.c
index 65fa62ff6..1df8edfd3 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -7859,7 +7859,7 @@ void hlua_init(void)
        socket_tcp.proxy = &socket_proxy;
        socket_tcp.obj_type = OBJ_TYPE_SERVER;
        LIST_INIT(&socket_tcp.actconns);
-       LIST_INIT(&socket_tcp.pendconns);
+       socket_tcp.pendconns = EB_ROOT;
        socket_tcp.priv_conns = NULL;
        socket_tcp.idle_conns = NULL;
        socket_tcp.safe_conns = NULL;
@@ -7905,7 +7905,7 @@ void hlua_init(void)
        socket_ssl.proxy = &socket_proxy;
        socket_ssl.obj_type = OBJ_TYPE_SERVER;
        LIST_INIT(&socket_ssl.actconns);
-       LIST_INIT(&socket_ssl.pendconns);
+       socket_ssl.pendconns = EB_ROOT;
        socket_tcp.priv_conns = NULL;
        socket_tcp.idle_conns = NULL;
        socket_tcp.safe_conns = NULL;
diff --git a/src/proxy.c b/src/proxy.c
index 31253f14d..63a076855 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -726,7 +726,7 @@ void init_new_proxy(struct proxy *p)
 {
        memset(p, 0, sizeof(struct proxy));
        p->obj_type = OBJ_TYPE_PROXY;
-       LIST_INIT(&p->pendconns);
+       p->pendconns = EB_ROOT;
        LIST_INIT(&p->acl);
        LIST_INIT(&p->http_req_rules);
        LIST_INIT(&p->http_res_rules);
diff --git a/src/queue.c b/src/queue.c
index 1c730c75c..0174ddc75 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -14,12 +14,16 @@
 #include <common/memory.h>
 #include <common/time.h>
 #include <common/hathreads.h>
+#include <eb32tree.h>
 
+#include <proto/proto_http.h>
 #include <proto/queue.h>
+#include <proto/sample.h>
 #include <proto/server.h>
 #include <proto/stream.h>
 #include <proto/stream_interface.h>
 #include <proto/task.h>
+#include <proto/tcp_rules.h>
 
 
 struct pool_head *pool_head_pendconn;
@@ -76,8 +80,69 @@ static void pendconn_unlink(struct pendconn *p)
        else
                p->px->nbpend--;
        HA_ATOMIC_SUB(&p->px->totpend, 1);
-       LIST_DEL(&p->list);
-       LIST_INIT(&p->list);
+       eb32_delete(&p->node);
+}
+
+#define offset_boundary() (now_ms - (TIMER_LOOK_BACK >> 12) & 0xfffff)
+#define key_class(key) (key & 0xfff00000)
+#define key_offset(key) (key & 0xfffff)
+#define key_class_boundary(key) (key_class(key) | offset_boundary())
+static u32 key_incr(u32 key) {
+       u32 key_next;
+
+       key_next = key + 1;
+       if (key_class(key_next) != key_class(key))
+               key_next = key_class(key_next);
+       else if (key_next == key_class_boundary(key))
+               key_next += 0x100000;
+
+       return key_next;
+}
+
+static struct pendconn *next_pendconn(struct eb_root *pendconns, u32 min) {
+       struct eb32_node *node, *node2 = NULL;
+       u32 max;
+
+       // min is inclusive
+       // max is exclusive
+       max = key_class_boundary(min);
+
+       node = eb32_lookup_ge(pendconns, min);
+
+       if (node) {
+               if (node->key < max || (max <= min && key_class(node->key) == 
key_class(min)))
+                       return eb32_entry(node, struct pendconn, node);
+               if (key_class(node->key) != key_class(min))
+                       node2 = node;
+               if (max > min)
+                       goto class_next;
+       }
+
+       if (max <= min)
+               node = eb32_lookup_ge(pendconns, key_class(min));
+       if (!node)
+               return NULL;
+       if (node->key < max && node->key < min)
+               return eb32_entry(node, struct pendconn, node);
+
+class_next:
+       if (node2) {
+               min = key_class_boundary(node2->key);
+               if (node2->key >= min)
+                       return eb32_entry(node2, struct pendconn, node);
+       } else
+               min = key_class_boundary(min) + 0x100000;
+       node = eb32_lookup_ge(pendconns, min);
+       if (node && key_class(node->key) == key_class(min))
+               return eb32_entry(node, struct pendconn, node);
+       if (node2)
+               return eb32_entry(node2, struct pendconn, node);
+       // theoretically this should never find anything we haven't already 
found
+       //node = eb32_lookup_ge(pendconns, key_class(min));
+       //if (node)
+       //      return eb32_entry(node, struct pendconn, node);
+
+       return NULL;
 }
 
 /* Process the next pending connection from either a server or a proxy, and
@@ -100,8 +165,9 @@ static void pendconn_unlink(struct pendconn *p)
  */
 static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
 {
-       struct pendconn *p = NULL;
+       struct pendconn *p = NULL, *pp = NULL;
        struct server   *rsrv;
+       u32 pkey, ppkey;
        int remote;
 
        rsrv = srv->track;
@@ -109,38 +175,53 @@ static int pendconn_process_next_strm(struct server *srv, 
struct proxy *px)
                rsrv = srv;
 
        if (srv->nbpend) {
-               list_for_each_entry(p, &srv->pendconns, list) {
+               for (p = next_pendconn(&srv->pendconns, offset_boundary());
+                    p;
+                    p = next_pendconn(&srv->pendconns, key_incr(p->node.key)))
                        if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock))
-                               goto ps_found;
-               }
-               p = NULL;
+                               break;
        }
-
-  ps_found:
-       if (srv_currently_usable(rsrv) && px->nbpend) {
-               struct pendconn *pp;
-
-               list_for_each_entry(pp, &px->pendconns, list) {
-                       /* If the server pendconn is older than the proxy one,
-                        * we process the server one. */
-                       if (p && !tv_islt(&pp->strm->logs.tv_request, 
&p->strm->logs.tv_request))
-                               goto pendconn_found;
-
-                       if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &pp->lock)) {
-                               /* Let's switch from the server pendconn to the
-                                * proxy pendconn. Don't forget to unlock the
-                                * server pendconn, if any. */
-                               if (p)
-                                       HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
-                               p = pp;
-                               goto pendconn_found;
-                       }
-               }
+       if (px->nbpend) {
+               for (pp = next_pendconn(&px->pendconns, offset_boundary());
+                    pp;
+                    pp = next_pendconn(&px->pendconns, key_incr(pp->node.key)))
+                       if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &pp->lock))
+                               break;
        }
 
-       if (!p)
+       if (!p && !pp)
                return 0;
 
+       if (p && !pp)
+               goto pendconn_found;
+       if (pp && !p) {
+               p = pp;
+               goto pendconn_found;
+       }
+       if (key_class(p->node.key) < key_class(pp->node.key)) {
+               HA_SPIN_UNLOCK(PENDCONN_LOCK, &pp->lock);
+               goto pendconn_found;
+       }
+       if (key_class(pp->node.key) < key_class(p->node.key)) {
+               HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+               p = pp;
+               goto pendconn_found;
+       }
+
+       pkey = key_offset(p->node.key);
+       ppkey = key_offset(pp->node.key);
+       if (pkey < offset_boundary())
+               pkey += (TIMER_LOOK_BACK >> 12);
+       if (ppkey < offset_boundary())
+               ppkey += (TIMER_LOOK_BACK >> 12);
+       if (pkey <= ppkey) {
+               HA_SPIN_UNLOCK(PENDCONN_LOCK, &pp->lock);
+               goto pendconn_found;
+       }
+       HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+       p = pp;
+       goto pendconn_found;
+
   pendconn_found:
        pendconn_unlink(p);
        p->strm_flags |= SF_ASSIGNED;
@@ -206,6 +287,7 @@ struct pendconn *pendconn_add(struct stream *strm)
        srv = objt_server(strm->target);
        px  = strm->be;
 
+       p->node.key = ((u32)(strm->priority_class + 0x7ff) << 20) | 
((u32)(now_ms + strm->priority_offset) & 0xfffff);
        p->srv        = NULL;
        p->px         = px;
        p->strm       = strm;
@@ -219,7 +301,7 @@ struct pendconn *pendconn_add(struct stream *strm)
                strm->logs.srv_queue_size += srv->nbpend;
                if (srv->nbpend > srv->counters.nbpend_max)
                        srv->counters.nbpend_max = srv->nbpend;
-               LIST_ADDQ(&srv->pendconns, &p->list);
+               eb32_insert(&srv->pendconns, &p->node);
                HA_SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
        }
        else {
@@ -228,7 +310,7 @@ struct pendconn *pendconn_add(struct stream *strm)
                strm->logs.prx_queue_size += px->nbpend;
                if (px->nbpend > px->be_counters.nbpend_max)
                        px->be_counters.nbpend_max = px->nbpend;
-               LIST_ADDQ(&px->pendconns, &p->list);
+               eb32_insert(&px->pendconns, &p->node);
                HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock);
        }
        HA_ATOMIC_ADD(&px->totpend, 1);
@@ -241,7 +323,8 @@ struct pendconn *pendconn_add(struct stream *strm)
  */
 int pendconn_redistribute(struct server *s)
 {
-       struct pendconn *p, *pback;
+       struct pendconn *p;
+       struct eb32_node *node;
        int xferred = 0;
        int remote = 0;
 
@@ -251,7 +334,10 @@ int pendconn_redistribute(struct server *s)
                return 0;
 
        HA_SPIN_LOCK(SERVER_LOCK, &s->lock);
-       list_for_each_entry_safe(p, pback, &s->pendconns, list) {
+       for (node = eb32_first(&s->pendconns);
+            node;
+            node = eb32_lookup_ge(&s->pendconns, node->key + 1)) {
+               p = eb32_entry(&node, struct pendconn, node);
                if (p->strm_flags & SF_FORCE_PRST)
                        continue;
 
@@ -280,7 +366,8 @@ int pendconn_redistribute(struct server *s)
  */
 int pendconn_grab_from_px(struct server *s)
 {
-       struct pendconn *p, *pback;
+       struct pendconn *p;
+       struct eb32_node *node;
        int maxconn, xferred = 0;
        int remote = 0;
 
@@ -289,7 +376,10 @@ int pendconn_grab_from_px(struct server *s)
 
        HA_SPIN_LOCK(PROXY_LOCK, &s->proxy->lock);
        maxconn = srv_dynamic_maxconn(s);
-       list_for_each_entry_safe(p, pback, &s->proxy->pendconns, list) {
+       for (node = eb32_first(&s->proxy->pendconns);
+            node;
+            node = eb32_lookup_ge(&s->proxy->pendconns, node->key + 1)) {
+               p = eb32_entry(&node, struct pendconn, node);
                if (s->maxconn && s->served + xferred >= maxconn)
                        break;
 
@@ -337,7 +427,7 @@ int pendconn_dequeue(struct stream *strm)
 
        /* the pendconn is still linked to the server/proxy queue, so unlock it
         * and go away. */
-       if (!LIST_ISEMPTY(&p->list)) {
+       if (p->node.node.leaf_p) {
                HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
                return 1;
        }
@@ -369,19 +459,19 @@ void pendconn_free(struct pendconn *p)
        HA_SPIN_LOCK(PENDCONN_LOCK, &p->lock);
 
        /* The pendconn was already unlinked, just release it. */
-       if (LIST_ISEMPTY(&p->list))
+       if (!p->node.node.leaf_p)
                goto release;
 
        if (p->srv) {
                HA_SPIN_LOCK(SERVER_LOCK, &p->srv->lock);
                p->srv->nbpend--;
-               LIST_DEL(&p->list);
+               eb32_delete(&p->node);
                HA_SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock);
        }
        else {
                HA_SPIN_LOCK(PROXY_LOCK, &p->px->lock);
                p->px->nbpend--;
-               LIST_DEL(&p->list);
+               eb32_delete(&p->node);
                HA_SPIN_UNLOCK(PROXY_LOCK, &p->px->lock);
        }
        HA_ATOMIC_SUB(&p->px->totpend, 1);
@@ -392,6 +482,118 @@ void pendconn_free(struct pendconn *p)
        pool_free(pool_head_pendconn, p);
 }
 
+static enum act_return action_set_priority_class(struct act_rule *rule, struct 
proxy *px,
+                                                 struct session *sess, struct 
stream *s, int flags)
+{
+       struct sample *smp;
+
+       smp = sample_fetch_as_type(px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL, 
rule->arg.expr, SMP_T_SINT);
+       if (!smp)
+               return ACT_RET_CONT;
+
+       if (smp->data.u.sint < -0x7ff)
+               smp->data.u.sint = -0x7ff;
+       else if (smp->data.u.sint > 0x7ff)
+               smp->data.u.sint = 0x7ff;
+
+       s->priority_class = smp->data.u.sint;
+       return ACT_RET_CONT;
+}
+
+static enum act_return action_set_priority_offset(struct act_rule *rule, 
struct proxy *px,
+                                                  struct session *sess, struct 
stream *s, int flags)
+{
+       struct sample *smp;
+
+       smp = sample_fetch_as_type(px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL, 
rule->arg.expr, SMP_T_SINT);
+       if (!smp)
+               return ACT_RET_CONT;
+
+       if (smp->data.u.sint < -0x7ffff)
+               smp->data.u.sint = -0x7ffff;
+       else if (smp->data.u.sint > 0x7ffff)
+               smp->data.u.sint = 0x7ffff;
+
+       s->priority_offset = smp->data.u.sint;
+
+       return ACT_RET_CONT;
+}
+
+static enum act_parse_ret parse_set_priority_class(const char **args, int 
*arg, struct proxy *px,
+                                                   struct act_rule *rule, char 
**err)
+{
+       unsigned int where = 0;
+
+       rule->arg.expr = sample_parse_expr((char **)args, arg, 
px->conf.args.file,
+                                          px->conf.args.line, err, 
&px->conf.args);
+       if (!rule->arg.expr)
+               return ACT_RET_PRS_ERR;
+
+       if (px->cap & PR_CAP_FE)
+               where |= SMP_VAL_FE_HRQ_HDR;
+       if (px->cap & PR_CAP_BE)
+               where |= SMP_VAL_BE_HRQ_HDR;
+
+       if (!(rule->arg.expr->fetch->val & where)) {
+               memprintf(err,
+                         "fetch method '%s' extracts information from '%s', 
none of which is available here",
+                         args[0], 
sample_src_names(rule->arg.expr->fetch->use));
+               free(rule->arg.expr);
+               return ACT_RET_PRS_ERR;
+       }
+
+       rule->action     = ACT_CUSTOM;
+       rule->action_ptr = action_set_priority_class;
+       return ACT_RET_PRS_OK;
+}
+
+static enum act_parse_ret parse_set_priority_offset(const char **args, int 
*arg, struct proxy *px,
+                                                    struct act_rule *rule, 
char **err)
+{
+       unsigned int where = 0;
+
+       rule->arg.expr = sample_parse_expr((char **)args, arg, 
px->conf.args.file,
+                                          px->conf.args.line, err, 
&px->conf.args);
+       if (!rule->arg.expr)
+               return ACT_RET_PRS_ERR;
+
+       if (px->cap & PR_CAP_FE)
+               where |= SMP_VAL_FE_HRQ_HDR;
+       if (px->cap & PR_CAP_BE)
+               where |= SMP_VAL_BE_HRQ_HDR;
+
+       if (!(rule->arg.expr->fetch->val & where)) {
+               memprintf(err,
+                         "fetch method '%s' extracts information from '%s', 
none of which is available here",
+                         args[0], 
sample_src_names(rule->arg.expr->fetch->use));
+               free(rule->arg.expr);
+               return ACT_RET_PRS_ERR;
+       }
+
+       rule->action     = ACT_CUSTOM;
+       rule->action_ptr = action_set_priority_offset;
+       return ACT_RET_PRS_OK;
+}
+
+static struct action_kw_list tcp_cont_kws = {ILH, {
+       { "set-priority-class", parse_set_priority_class },
+       { "set-priority-offset", parse_set_priority_offset },
+       { /* END */ }
+}};
+
+static struct action_kw_list http_req_kws = {ILH, {
+       { "set-priority-class", parse_set_priority_class },
+       { "set-priority-offset", parse_set_priority_offset },
+       { /* END */ }
+}};
+
+__attribute__((constructor))
+static void __queue_init(void)
+{
+       tcp_req_cont_keywords_register(&tcp_cont_kws);
+       http_req_keywords_register(&http_req_kws);
+}
+
 /*
  * Local variables:
  *  c-indent-level: 8
diff --git a/src/server.c b/src/server.c
index ebac357fb..5617d8d67 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1575,7 +1575,7 @@ static struct server *new_server(struct proxy *proxy)
        srv->obj_type = OBJ_TYPE_SERVER;
        srv->proxy = proxy;
        LIST_INIT(&srv->actconns);
-       LIST_INIT(&srv->pendconns);
+       srv->pendconns = EB_ROOT;
 
        if ((srv->priv_conns = calloc(global.nbthread, 
sizeof(*srv->priv_conns))) == NULL)
                goto free_srv;
diff --git a/src/stream.c b/src/stream.c
index 1d0b22ca3..9b22becc9 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -220,6 +220,8 @@ struct stream *stream_new(struct session *sess, enum 
obj_type *origin)
        s->target = sess->listener ? sess->listener->default_target : NULL;
 
        s->pend_pos = NULL;
+       s->priority_class = 0;
+       s->priority_offset = 0;
 
        /* init store persistence */
        s->store_count = 0;
-- 
2.16.3

Reply via email to