Re-adding the mailing list.

On 2018/8/6 22:37, Willy Tarreau wrote:
> Hi Patrick,
>>> I *think* that the change made to stream_process_counters() is not needed,
>>> because stream_process_counters() is normally used to keep the stats up
>>> to date so that when refreshing the stats page they don't make huge jumps.
>>> If you faced any corner case that this part managed to address, I'm
>>> interested in taking a look.
>> If I recall correctly, the scenario this was meant to address was when a
>> connection was removed from the queue ungracefully (e.g. a timeout).
>> In process_stream(), s->do_log(s) is called, which needs the counters
>> updated. However in this flow, the counters wouldn't get updated until
>> stream_free(s) a few lines later. stream_process_counters() seemed to be
>> made to address this scenario, so I put the code there.
>> I'll review this again though.
> Now I remember I thought about this case initially and I think I figured
> a few rare cases where it could possibly not be enough (maybe for aborts
> processed in HTTP keep-alive but I'm not sure and I prefer not to say too
> much bullshit). I remember sometimes getting an incorrect queue length at
> log time. Thus I have added a few calls to pendconn_dequeue() before all
> calls to do_log() which in my opinion are more robust for the long term.
> Thus it probably means that we can safely get rid of the calls in
> stream_process_counters(), which would be nice.
>
>>> You will notice that the code was significantly simplified thanks to the
>>> most painful part you had to deal with : the possibility for the trylock
>>> to fail. Now there is no trylock anymore and it cannot fail, so the very
>>> complex pendconn_next() is not needed anymore and I could replace it with
>>> a much simpler pendconn_first() which simply gives you the first element
>>> that is expected to be dequeued from a queue.
>> Found one spot in the code that looks like still falls into the scenario
>> where pendconn_next() is needed. This bit:
>>
>>     int pendconn_redistribute(struct server *s)
>>         ...
>>         while ((p = pendconn_first(&s->pendconns))) {
>>             if (p->strm_flags & SF_FORCE_PRST)
>>                 continue;
>>
>> This is going to infinitely loop.
> You're absolutely right, I didn't even notice this one! I think I was
> a bit too fast in replacing all loops with pendconn_first()!
>
>> An alternative option instead of pendconn_next() is on SF_FORCE_PRST we
>> build a new tree (or have some sort of temporary list), and pop them off
>> the current tree.
> In fact I don't think it's an issue, because here we need to remove all
> of them from the tree except the ones with the flag, so we don't even
> need to have something as complex as pendconn_next() to pop them up in
> order. Instead we can safely walk over the whole tree and only ignore
> SF_FORCE_PRST as the original code used to do, but using eb32_next().
> In fact my use of eb32_first() in the "while" loop in the previous
> patch was already wrong due to the "continue". We should simply have :
>
>         for (node = eb32_first(&s->pendconns); node; node = eb32_next(node)) {
>               p = eb32_entry(&node, struct pendconn, node);
>               if (p->strm_flags & SF_FORCE_PRST)
>                       continue;
>               ...
>       }
>
> If you want I'll update the patches once I'm at the office. Thanks for
> spotting this one.
>
> Willy
So I went and did that. It looks to work fine.
I also went and removed the queue position counter code from
stream_process_counters(), and the logging still appears to work fine
(but I could have easily missed some potential scenarios).

In regards to the idea you added to the commit message on the queue index
> It could even be further improved to avoid manipulating the stream
> from the queue :
>   - queue_idx = initial position when enqueuing
>   - queue_idx = measured position when dequeuing

Because the stream can pass through multiple queues, we'd have to make
sure that every time we de-queue, that the stream code pulls the value
and increments itself. However looking at all the various places
pendconn_unlink() gets called, I think it would be difficult for the
stream code to know when the value needs to be pulled.

Anyway, attached is the latest patch set for review again.

-Patrick
From e4acba5847fb11981c72af8430f6e6846211954f Mon Sep 17 00:00:00 2001
From: Patrick Hemmer <[email protected]>
Date: Fri, 11 May 2018 12:52:31 -0400
Subject: [PATCH 6/7] MEDIUM: queue: adjust position based on priority-class
 and priority-offset

The priority values are used when connections are queued to determine
which connections should be served first. The lowest priority class is
served first. When multiple requests from the same class are found, the
earliest (according to queue_time + offset) is served first. The queue
offsets can span over roughly 17 minutes after which the offsets will
wrap around. This allows up to 8 minutes spent in the queue with no
reordering.
---
 src/queue.c | 105 +++++++++++++++++++++++++++++++++++++++++++++---------------
 1 file changed, 79 insertions(+), 26 deletions(-)

diff --git a/src/queue.c b/src/queue.c
index 12020a084..3e9371bfa 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -85,6 +85,12 @@
 #include <proto/tcp_rules.h>
 
 
+#define NOW_OFFSET_BOUNDARY()          ((now_ms - (TIMER_LOOK_BACK >> 12)) & 
0xfffff)
+#define KEY_CLASS(key)                 ((u32)key & 0xfff00000)
+#define KEY_OFFSET(key)                ((u32)key & 0x000fffff)
+#define KEY_CLASS_OFFSET_BOUNDARY(key) (KEY_CLASS(key) | NOW_OFFSET_BOUNDARY())
+#define MAKE_KEY(class, offset)        (((u32)(class + 0x7ff) << 20) | 
((u32)(now_ms + offset) & 0xfffff))
+
 struct pool_head *pool_head_pendconn;
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
@@ -184,6 +190,33 @@ void pendconn_unlink(struct pendconn *p)
        pendconn_queue_unlock(p);
 }
 
+/* Retrieve the first pendconn from tree <pendconns>. Classes are always
+ * considered first, then the time offset. The time does wrap, so the
+ * lookup is performed twice, one to retrieve the first class and a second
+ * time to retrieve the earliest time in this class.
+ */
+static struct pendconn *pendconn_first(struct eb_root *pendconns)
+{
+       struct eb32_node *node, *node2 = NULL;
+       u32 key;
+
+       node = eb32_first(pendconns);
+       if (!node)
+               return NULL;
+
+       key = KEY_CLASS_OFFSET_BOUNDARY(node->key);
+       node2 = eb32_lookup_ge(pendconns, key);
+
+       if (!node2 ||
+           KEY_CLASS(node2->key) != KEY_CLASS(node->key)) {
+               /* no other key in the tree, or in this class */
+               return eb32_entry(node, struct pendconn, node);
+       }
+
+       /* found a better key */
+       return eb32_entry(node2, struct pendconn, node);
+}
+
 /* Process the next pending connection from either a server or a proxy, and
  * returns a strictly positive value on success (see below). If no pending
  * connection is found, 0 is returned.  Note that neither <srv> nor <px> may be
@@ -207,40 +240,54 @@ static int pendconn_process_next_strm(struct server *srv, 
struct proxy *px)
        struct pendconn *p = NULL;
        struct pendconn *pp = NULL;
        struct server   *rsrv;
-       struct eb32_node *node;
+       u32 pkey, ppkey;
 
        rsrv = srv->track;
        if (!rsrv)
                rsrv = srv;
 
        p = NULL;
-       if (srv->nbpend) {
-               node = eb32_first(&srv->pendconns);
-               p = eb32_entry(node, struct pendconn, node);
-       }
+       if (srv->nbpend)
+               p = pendconn_first(&srv->pendconns);
 
+       pp = NULL;
        if (srv_currently_usable(rsrv) && px->nbpend &&
            (!(srv->flags & SRV_F_BACKUP) ||
             (!px->srv_act &&
-             (srv == px->lbprm.fbck || (px->options & PR_O_USE_ALL_BK))))) {
-               node = eb32_first(&px->pendconns);
-               pp = eb32_entry(node, struct pendconn, node);
-
-               /* If the server pendconn is older than the proxy one,
-                * we process the server one.
-                */
-               if (p && !tv_islt(&pp->strm->logs.tv_request, 
&p->strm->logs.tv_request))
-                       goto pendconn_found;
-
-               /* Let's switch from the server pendconn to the proxy pendconn 
*/
-               p = pp;
-               goto pendconn_found;
-       }
+             (srv == px->lbprm.fbck || (px->options & PR_O_USE_ALL_BK)))))
+               pp = pendconn_first(&px->pendconns);
 
-       if (!p)
+       if (!p && !pp)
                return 0;
 
-  pendconn_found:
+       if (p && !pp)
+               goto use_p;
+
+       if (pp && !p)
+               goto use_pp;
+
+       if (KEY_CLASS(p->node.key) < KEY_CLASS(pp->node.key))
+               goto use_p;
+
+       if (KEY_CLASS(pp->node.key) < KEY_CLASS(p->node.key))
+               goto use_pp;
+
+       pkey  = KEY_OFFSET(p->node.key);
+       ppkey = KEY_OFFSET(pp->node.key);
+
+       if (pkey < NOW_OFFSET_BOUNDARY())
+               pkey += 0x100000; // key in the future
+
+       if (ppkey < NOW_OFFSET_BOUNDARY())
+               ppkey += 0x100000; // key in the future
+
+       if (pkey <= ppkey)
+               goto use_p;
+
+ use_pp:
+       /* Let's switch from the server pendconn to the proxy pendconn */
+       p = pp;
+ use_p:
        __pendconn_unlink(p);
        p->strm_flags |= SF_ASSIGNED;
        p->target = srv;
@@ -281,7 +328,7 @@ void process_srv_queue(struct server *s)
        HA_SPIN_UNLOCK(PROXY_LOCK,  &p->lock);
 }
 
-/* Adds the stream <strm> to the pending connection list of server <strm>->srv
+/* Adds the stream <strm> to the pending connection queue of server <strm>->srv
  * or to the one of <strm>->proxy if srv is NULL. All counters and back 
pointers
  * are updated accordingly. Returns NULL if no memory is available, otherwise 
the
  * pendconn itself. If the stream was already marked as served, its flag is
@@ -289,6 +336,14 @@ void process_srv_queue(struct server *s)
  * The stream's queue position is counted with an offset of -1 because we want
  * to make sure that being at the first position in the queue reports 1.
  *
+ * The queue is sorted by the composition of the priority_class, and the 
current
+ * timestamp offset by strm->priority_offset. The timestamp is in milliseconds
+ * and truncated to 20 bits, so will wrap every 17m28s575ms.
+ * The offset can be positive or negative, and an offset of 0 puts it in the
+ * middle of this range (~ 8 min). Note that this also means if the adjusted
+ * timestamp wraps around, the request will be misinterpreted as being of
+ * the higest priority for that priority class.
+ *
  * This function must be called by the stream itself, so in the context of
  * process_stream.
  */
@@ -310,7 +365,7 @@ struct pendconn *pendconn_add(struct stream *strm)
        px            = strm->be;
        p->target     = NULL;
        p->srv        = srv;
-       p->node.key   = 0;
+       p->node.key   = MAKE_KEY(strm->priority_class, strm->priority_offset);
        p->px         = px;
        p->strm       = strm;
        p->strm_flags = strm->flags;
@@ -377,7 +432,6 @@ int pendconn_redistribute(struct server *s)
 int pendconn_grab_from_px(struct server *s)
 {
        struct pendconn *p;
-       struct eb32_node *node;
        int maxconn, xferred = 0;
 
        if (!srv_currently_usable(s))
@@ -394,8 +448,7 @@ int pendconn_grab_from_px(struct server *s)
 
        HA_SPIN_LOCK(PROXY_LOCK, &s->proxy->lock);
        maxconn = srv_dynamic_maxconn(s);
-       while ((node = eb32_first(&s->proxy->pendconns))) {
-               p = eb32_entry(&node, struct pendconn, node);
+       while ((p = pendconn_first(&s->proxy->pendconns))) {
                if (s->maxconn && s->served + xferred >= maxconn)
                        break;
 
-- 
2.16.3

From 62573be114e1ffc562cf65b37dce26ca9ccc4416 Mon Sep 17 00:00:00 2001
From: Patrick Hemmer <[email protected]>
Date: Fri, 11 May 2018 12:52:31 -0400
Subject: [PATCH 5/7] MEDIUM: add set-priority-class and set-priority-offset

This adds the set-priority-class and set-priority-offset actions to
http-request and tcp-request content. At this point they are not used
yet, which is the purpose of the next commit, but all the logic to
set and clear the values is there.
---
 doc/configuration.txt  |  38 ++++++++++++++
 doc/lua-api/index.rst  |  18 +++++++
 include/proto/queue.h  |  19 +++++++
 include/types/stream.h |   2 +
 src/hlua.c             |  53 +++++++++++++------
 src/queue.c            | 137 +++++++++++++++++++++++++++++++++++++++++++++++++
 src/stream.c           |   2 +
 7 files changed, 254 insertions(+), 15 deletions(-)

diff --git a/doc/configuration.txt b/doc/configuration.txt
index 295e27c40..48b69a5bd 100644
--- a/doc/configuration.txt
+++ b/doc/configuration.txt
@@ -3917,6 +3917,7 @@ http-request { allow | auth [realm <realm>] | redirect 
<rule> | reject |
               replace-value <name> <match-regex> <replace-fmt> |
               set-method <fmt> | set-path <fmt> | set-query <fmt> |
               set-uri <fmt> | set-tos <tos> | set-mark <mark> |
+              set-priority-class <expr> | set-priority-offset <expr>
               add-acl(<file name>) <key fmt> |
               del-acl(<file name>) <key fmt> |
               del-map(<file name>) <key fmt> |
@@ -4113,6 +4114,24 @@ http-request { allow | auth [realm <realm>] | redirect 
<rule> | reject |
       downloads). This works on Linux kernels 2.6.32 and above and requires
       admin privileges.
 
+    - "set-priority-class" is used to set the queue priority class of the
+      current request. The value must be a sample expression which converts to
+      an integer in the range -2047..2047. Results outside this range will be
+      truncated. The priority class determines the order in which queued
+      requests are processed. Lower values have higher priority.
+
+    - "set-priority-offset" is used to set the queue priority timestamp offset
+      of the current request. The value must be a sample expression which
+      converts to an integer in the range -524287..524287. Results outside this
+      range will be truncated. When a request is queued, it is ordered first by
+      the priority class, then by the current timestamp adjusted by the given
+      offset in milliseconds. Lower values have higher priority.
+      Note that the resulting timestamp is is only tracked with enough 
precision
+      for 524,287ms (8m44s287ms). If the request is queued long enough to where
+      the adjusted timestamp exceeds this value, it will be misidentified as
+      highest priority. Thus it is important to set "timeout queue" to a value,
+      where when combined with the offset, does not exceed this limit.
+
     - "add-acl" is used to add a new entry into an ACL. The ACL must be loaded
       from a file (even a dummy empty file). The file name of the ACL to be
       updated is passed between parentheses. It takes one argument: <key fmt>,
@@ -9452,6 +9471,7 @@ tcp-request content <action> [{if | unless} <condition>]
     - accept : the request is accepted
     - reject : the request is rejected and the connection is closed
     - capture : the specified sample expression is captured
+    - set-priority-class <expr> | set-priority-offset <expr>
     - { track-sc0 | track-sc1 | track-sc2 } <key> [table <table>]
     - sc-inc-gpc0(<sc-id>)
     - sc-inc-gpc1(<sc-id>)
@@ -9513,6 +9533,24 @@ tcp-request content <action> [{if | unless} <condition>]
   The "unset-var" is used to unset a variable. See above for details about
   <var-name>.
 
+  The "set-priority-class" is used to set the queue priority class of the
+  current request. The value must be a sample expression which converts to an
+  integer in the range -2047..2047. Results outside this range will be
+  truncated. The priority class determines the order in which queued requests
+  are processed. Lower values have higher priority.
+
+  The "set-priority-offset" is used to set the queue priority timestamp offset
+  of the current request. The value must be a sample expression which converts
+  to an integer in the range -524287..524287. Results outside this range will 
be
+  truncated. When a request is queued, it is ordered first by the priority
+  class, then by the current timestamp adjusted by the given offset in
+  milliseconds. Lower values have higher priority.
+  Note that the resulting timestamp is is only tracked with enough precision 
for
+  524,287ms (8m44s287ms). If the request is queued long enough to where the
+  adjusted timestamp exceeds this value, it will be misidentified as highest
+  priority.  Thus it is important to set "timeout queue" to a value, where when
+  combined with the offset, does not exceed this limit.
+
   The "send-spoe-group" is used to trigger sending of a group of SPOE
   messages. To do so, the SPOE engine used to send messages must be defined, as
   well as the SPOE group to send. Of course, the SPOE engine must refer to an
diff --git a/doc/lua-api/index.rst b/doc/lua-api/index.rst
index ee9dab55c..0c79766eb 100644
--- a/doc/lua-api/index.rst
+++ b/doc/lua-api/index.rst
@@ -1769,6 +1769,24 @@ TXN class
   :param class_txn txn: The class txn object containing the data.
   :param integer mark: The mark value.
 
+.. js:function:: TXN.set_priority_class(txn, prio)
+
+  This function adjusts the priority class of the transaction. The value should
+  be within the range -2047..2047. Values outside this range will be
+  truncated.
+
+  See the HAProxy configuration.txt file keyword "http-request" action
+  "set-priority-class" for details.
+
+.. js:function:: TXN.set_priority_offset(txn, prio)
+
+  This function adjusts the priority offset of the transaction. The value
+  should be within the range -524287..524287. Values outside this range will be
+  truncated.
+
+  See the HAProxy configuration.txt file keyword "http-request" action
+  "set-priority-offset" for details.
+
 .. _socket_class:
 
 Socket class
diff --git a/include/proto/queue.h b/include/proto/queue.h
index 166da12f4..28709fa99 100644
--- a/include/proto/queue.h
+++ b/include/proto/queue.h
@@ -90,6 +90,25 @@ static inline int may_dequeue_tasks(const struct server *s, 
const struct proxy *
                (!s->maxconn || s->cur_sess < srv_dynamic_maxconn(s)));
 }
 
+static inline int queue_limit_class(int class)
+{
+       if (class < -0x7ff)
+               return -0x7ff;
+       if (class > 0x7ff)
+               return 0x7ff;
+       return class;
+}
+
+static inline int queue_limit_offset(int offset)
+{
+       if (offset < -0x7ffff)
+               return -0x7ffff;
+       if (offset > 0x7ffff)
+               return 0x7ffff;
+       return offset;
+}
+
+
 #endif /* _PROTO_QUEUE_H */
 
 /*
diff --git a/include/types/stream.h b/include/types/stream.h
index a3137bf31..feeb56b12 100644
--- a/include/types/stream.h
+++ b/include/types/stream.h
@@ -131,6 +131,8 @@ struct stream {
        struct task *task;              /* the task associated with this stream 
*/
        unsigned short pending_events;  /* the pending events not yet processed 
by the stream.
                                         * This is a bit field of TASK_WOKEN_* 
*/
+       int16_t priority_class;         /* priority class of the stream for the 
pending queue */
+       int32_t priority_offset;        /* priority offset of the stream for 
the pending queue */
 
        struct list list;               /* position in global streams list */
        struct list by_srv;             /* position in server stream list */
diff --git a/src/hlua.c b/src/hlua.c
index bea9c4694..c29a7cc4e 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -44,6 +44,7 @@
 #include <proto/hlua_fcn.h>
 #include <proto/map.h>
 #include <proto/obj_type.h>
+#include <proto/queue.h>
 #include <proto/pattern.h>
 #include <proto/payload.h>
 #include <proto/proto_http.h>
@@ -5388,6 +5389,26 @@ __LJMP static int hlua_txn_set_mark(lua_State *L)
        return 0;
 }
 
+__LJMP static int hlua_txn_set_priority_class(lua_State *L)
+{
+       struct hlua_txn *htxn;
+
+       MAY_LJMP(check_args(L, 2, "set_priority_class"));
+       htxn = MAY_LJMP(hlua_checktxn(L, 1));
+       htxn->s->priority_class = 
queue_limit_class(MAY_LJMP(luaL_checkinteger(L, 2)));
+       return 0;
+}
+
+__LJMP static int hlua_txn_set_priority_offset(lua_State *L)
+{
+       struct hlua_txn *htxn;
+
+       MAY_LJMP(check_args(L, 2, "set_priority_offset"));
+       htxn = MAY_LJMP(hlua_checktxn(L, 1));
+       htxn->s->priority_offset = 
queue_limit_offset(MAY_LJMP(luaL_checkinteger(L, 2)));
+       return 0;
+}
+
 /* This function is an Lua binding that send pending data
  * to the client, and close the stream interface.
  */
@@ -7931,21 +7952,23 @@ void hlua_init(void)
        lua_newtable(gL.T);
 
        /* Register Lua functions. */
-       hlua_class_function(gL.T, "set_priv",    hlua_set_priv);
-       hlua_class_function(gL.T, "get_priv",    hlua_get_priv);
-       hlua_class_function(gL.T, "set_var",     hlua_set_var);
-       hlua_class_function(gL.T, "unset_var",   hlua_unset_var);
-       hlua_class_function(gL.T, "get_var",     hlua_get_var);
-       hlua_class_function(gL.T, "done",        hlua_txn_done);
-       hlua_class_function(gL.T, "set_loglevel",hlua_txn_set_loglevel);
-       hlua_class_function(gL.T, "set_tos",     hlua_txn_set_tos);
-       hlua_class_function(gL.T, "set_mark",    hlua_txn_set_mark);
-       hlua_class_function(gL.T, "deflog",      hlua_txn_deflog);
-       hlua_class_function(gL.T, "log",         hlua_txn_log);
-       hlua_class_function(gL.T, "Debug",       hlua_txn_log_debug);
-       hlua_class_function(gL.T, "Info",        hlua_txn_log_info);
-       hlua_class_function(gL.T, "Warning",     hlua_txn_log_warning);
-       hlua_class_function(gL.T, "Alert",       hlua_txn_log_alert);
+       hlua_class_function(gL.T, "set_priv",            hlua_set_priv);
+       hlua_class_function(gL.T, "get_priv",            hlua_get_priv);
+       hlua_class_function(gL.T, "set_var",             hlua_set_var);
+       hlua_class_function(gL.T, "unset_var",           hlua_unset_var);
+       hlua_class_function(gL.T, "get_var",             hlua_get_var);
+       hlua_class_function(gL.T, "done",                hlua_txn_done);
+       hlua_class_function(gL.T, "set_loglevel",        hlua_txn_set_loglevel);
+       hlua_class_function(gL.T, "set_tos",             hlua_txn_set_tos);
+       hlua_class_function(gL.T, "set_mark",            hlua_txn_set_mark);
+       hlua_class_function(gL.T, "set_priority_class",  
hlua_txn_set_priority_class);
+       hlua_class_function(gL.T, "set_priority_offset", 
hlua_txn_set_priority_offset);
+       hlua_class_function(gL.T, "deflog",              hlua_txn_deflog);
+       hlua_class_function(gL.T, "log",                 hlua_txn_log);
+       hlua_class_function(gL.T, "Debug",               hlua_txn_log_debug);
+       hlua_class_function(gL.T, "Info",                hlua_txn_log_info);
+       hlua_class_function(gL.T, "Warning",             hlua_txn_log_warning);
+       hlua_class_function(gL.T, "Alert",               hlua_txn_log_alert);
 
        lua_rawset(gL.T, -3);
 
diff --git a/src/queue.c b/src/queue.c
index 6bcaba5c1..12020a084 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -75,11 +75,14 @@
 #include <common/hathreads.h>
 #include <eb32tree.h>
 
+#include <proto/proto_http.h>
 #include <proto/queue.h>
+#include <proto/sample.h>
 #include <proto/server.h>
 #include <proto/stream.h>
 #include <proto/stream_interface.h>
 #include <proto/task.h>
+#include <proto/tcp_rules.h>
 
 
 struct pool_head *pool_head_pendconn;
@@ -456,6 +459,140 @@ int pendconn_dequeue(struct stream *strm)
        return 0;
 }
 
+static enum act_return action_set_priority_class(struct act_rule *rule, struct 
proxy *px,
+                                                 struct session *sess, struct 
stream *s, int flags)
+{
+       struct sample *smp;
+
+       smp = sample_fetch_as_type(px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL, 
rule->arg.expr, SMP_T_SINT);
+       if (!smp)
+               return ACT_RET_CONT;
+
+       s->priority_class = queue_limit_class(smp->data.u.sint);
+       return ACT_RET_CONT;
+}
+
+static enum act_return action_set_priority_offset(struct act_rule *rule, 
struct proxy *px,
+                                                  struct session *sess, struct 
stream *s, int flags)
+{
+       struct sample *smp;
+
+       smp = sample_fetch_as_type(px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL, 
rule->arg.expr, SMP_T_SINT);
+       if (!smp)
+               return ACT_RET_CONT;
+
+       s->priority_offset = queue_limit_offset(smp->data.u.sint);
+
+       return ACT_RET_CONT;
+}
+
+static enum act_parse_ret parse_set_priority_class(const char **args, int 
*arg, struct proxy *px,
+                                                   struct act_rule *rule, char 
**err)
+{
+       unsigned int where = 0;
+
+       rule->arg.expr = sample_parse_expr((char **)args, arg, 
px->conf.args.file,
+                                          px->conf.args.line, err, 
&px->conf.args);
+       if (!rule->arg.expr)
+               return ACT_RET_PRS_ERR;
+
+       if (px->cap & PR_CAP_FE)
+               where |= SMP_VAL_FE_HRQ_HDR;
+       if (px->cap & PR_CAP_BE)
+               where |= SMP_VAL_BE_HRQ_HDR;
+
+       if (!(rule->arg.expr->fetch->val & where)) {
+               memprintf(err,
+                         "fetch method '%s' extracts information from '%s', 
none of which is available here",
+                         args[0], 
sample_src_names(rule->arg.expr->fetch->use));
+               free(rule->arg.expr);
+               return ACT_RET_PRS_ERR;
+       }
+
+       rule->action     = ACT_CUSTOM;
+       rule->action_ptr = action_set_priority_class;
+       return ACT_RET_PRS_OK;
+}
+
+static enum act_parse_ret parse_set_priority_offset(const char **args, int 
*arg, struct proxy *px,
+                                                    struct act_rule *rule, 
char **err)
+{
+       unsigned int where = 0;
+
+       rule->arg.expr = sample_parse_expr((char **)args, arg, 
px->conf.args.file,
+                                          px->conf.args.line, err, 
&px->conf.args);
+       if (!rule->arg.expr)
+               return ACT_RET_PRS_ERR;
+
+       if (px->cap & PR_CAP_FE)
+               where |= SMP_VAL_FE_HRQ_HDR;
+       if (px->cap & PR_CAP_BE)
+               where |= SMP_VAL_BE_HRQ_HDR;
+
+       if (!(rule->arg.expr->fetch->val & where)) {
+               memprintf(err,
+                         "fetch method '%s' extracts information from '%s', 
none of which is available here",
+                         args[0], 
sample_src_names(rule->arg.expr->fetch->use));
+               free(rule->arg.expr);
+               return ACT_RET_PRS_ERR;
+       }
+
+       rule->action     = ACT_CUSTOM;
+       rule->action_ptr = action_set_priority_offset;
+       return ACT_RET_PRS_OK;
+}
+
+static struct action_kw_list tcp_cont_kws = {ILH, {
+       { "set-priority-class", parse_set_priority_class },
+       { "set-priority-offset", parse_set_priority_offset },
+       { /* END */ }
+}};
+
+static struct action_kw_list http_req_kws = {ILH, {
+       { "set-priority-class", parse_set_priority_class },
+       { "set-priority-offset", parse_set_priority_offset },
+       { /* END */ }
+}};
+
+static int
+smp_fetch_priority_class(const struct arg *args, struct sample *smp, const 
char *kw, void *private)
+{
+       if (!smp->strm)
+               return 0;
+
+       smp->data.type = SMP_T_SINT;
+       smp->data.u.sint = smp->strm->priority_class;
+
+       return 1;
+}
+
+static int
+smp_fetch_priority_offset(const struct arg *args, struct sample *smp, const 
char *kw, void *private)
+{
+       if (!smp->strm)
+               return 0;
+
+       smp->data.type = SMP_T_SINT;
+       smp->data.u.sint = smp->strm->priority_offset;
+
+       return 1;
+}
+
+
+static struct sample_fetch_kw_list smp_kws = {ILH, {
+       { "prio_class", smp_fetch_priority_class, 0, NULL, SMP_T_SINT, 
SMP_USE_INTRN, },
+       { "prio_offset", smp_fetch_priority_offset, 0, NULL, SMP_T_SINT, 
SMP_USE_INTRN, },
+       { /* END */},
+}};
+
+__attribute__((constructor))
+static void __queue_init(void)
+{
+       tcp_req_cont_keywords_register(&tcp_cont_kws);
+       http_req_keywords_register(&http_req_kws);
+       sample_register_fetches(&smp_kws);
+}
+
 /*
  * Local variables:
  *  c-indent-level: 8
diff --git a/src/stream.c b/src/stream.c
index 50b0d6d51..9dbced0d3 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -221,6 +221,8 @@ struct stream *stream_new(struct session *sess, enum 
obj_type *origin)
        s->target = sess->listener ? sess->listener->default_target : NULL;
 
        s->pend_pos = NULL;
+       s->priority_class = 0;
+       s->priority_offset = 0;
 
        /* init store persistence */
        s->store_count = 0;
-- 
2.16.3

From acfde25a97bc1706939b188b16454c7979d0925e Mon Sep 17 00:00:00 2001
From: Patrick Hemmer <[email protected]>
Date: Fri, 11 May 2018 12:52:31 -0400
Subject: [PATCH 4/7] MINOR: queue: replace the linked list with a tree

We'll need trees to manage the queues by priorities. This change replaces
the list with a tree based on a single key. It's effectively a list but
allows us to get rid of the list management right now.
---
 include/proto/queue.h  |  2 +-
 include/types/proxy.h  |  2 +-
 include/types/queue.h  |  3 ++-
 include/types/server.h |  2 +-
 src/hlua.c             |  4 ++--
 src/proxy.c            |  2 +-
 src/queue.c            | 35 ++++++++++++++++++++++-------------
 src/server.c           |  2 +-
 8 files changed, 31 insertions(+), 21 deletions(-)

diff --git a/include/proto/queue.h b/include/proto/queue.h
index 11696dbc4..166da12f4 100644
--- a/include/proto/queue.h
+++ b/include/proto/queue.h
@@ -52,7 +52,7 @@ void pendconn_unlink(struct pendconn *p);
  */
 static inline void pendconn_cond_unlink(struct pendconn *p)
 {
-       if (p && !LIST_ISEMPTY(&p->list))
+       if (p && p->node.node.leaf_p)
                pendconn_unlink(p);
 }
 
diff --git a/include/types/proxy.h b/include/types/proxy.h
index 234a142c0..37a50609c 100644
--- a/include/types/proxy.h
+++ b/include/types/proxy.h
@@ -322,7 +322,7 @@ struct proxy {
                int serverfin;                  /* timeout to apply to server 
half-closed connections */
        } timeout;
        char *id, *desc;                        /* proxy id (name) and 
description */
-       struct list pendconns;                  /* pending connections with no 
server assigned yet */
+       struct eb_root pendconns;               /* pending connections with no 
server assigned yet */
        int nbpend;                             /* number of pending 
connections with no server assigned yet */
        int totpend;                            /* total number of pending 
connections on this instance (for stats) */
        unsigned int queue_idx;                 /* number of pending 
connections which have been de-queued */
diff --git a/include/types/queue.h b/include/types/queue.h
index 575cc5929..ca7191c34 100644
--- a/include/types/queue.h
+++ b/include/types/queue.h
@@ -37,7 +37,8 @@ struct pendconn {
        struct proxy  *px;
        struct server *srv;        /* the server we are waiting for, may be 
NULL if don't care */
        struct server *target;     /* the server that was assigned, = srv 
except if srv==NULL */
-       struct list    list;       /* next pendconn */
+       struct eb32_node node;
+       __decl_hathreads(HA_SPINLOCK_T lock);
 };
 
 #endif /* _TYPES_QUEUE_H */
diff --git a/include/types/server.h b/include/types/server.h
index 7d0ba4571..88259281d 100644
--- a/include/types/server.h
+++ b/include/types/server.h
@@ -215,7 +215,7 @@ struct server {
        struct freq_ctr sess_per_sec;           /* sessions per second on this 
server */
        struct be_counters counters;            /* statistics counters */
 
-       struct list pendconns;                  /* pending connections */
+       struct eb_root pendconns;               /* pending connections */
        struct list actconns;                   /* active connections */
        struct list *priv_conns;                /* private idle connections 
attached to stream interfaces */
        struct list *idle_conns;                /* sharable idle connections 
attached or not to a stream interface */
diff --git a/src/hlua.c b/src/hlua.c
index 4d42b1fec..bea9c4694 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -8007,7 +8007,7 @@ void hlua_init(void)
        socket_tcp.proxy = &socket_proxy;
        socket_tcp.obj_type = OBJ_TYPE_SERVER;
        LIST_INIT(&socket_tcp.actconns);
-       LIST_INIT(&socket_tcp.pendconns);
+       socket_tcp.pendconns = EB_ROOT;
        socket_tcp.priv_conns = NULL;
        socket_tcp.idle_conns = NULL;
        socket_tcp.safe_conns = NULL;
@@ -8053,7 +8053,7 @@ void hlua_init(void)
        socket_ssl.proxy = &socket_proxy;
        socket_ssl.obj_type = OBJ_TYPE_SERVER;
        LIST_INIT(&socket_ssl.actconns);
-       LIST_INIT(&socket_ssl.pendconns);
+       socket_ssl.pendconns = EB_ROOT;
        socket_tcp.priv_conns = NULL;
        socket_tcp.idle_conns = NULL;
        socket_tcp.safe_conns = NULL;
diff --git a/src/proxy.c b/src/proxy.c
index bf87a2f9c..ff094d210 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -726,7 +726,7 @@ void init_new_proxy(struct proxy *p)
 {
        memset(p, 0, sizeof(struct proxy));
        p->obj_type = OBJ_TYPE_PROXY;
-       LIST_INIT(&p->pendconns);
+       p->pendconns = EB_ROOT;
        LIST_INIT(&p->acl);
        LIST_INIT(&p->http_req_rules);
        LIST_INIT(&p->http_res_rules);
diff --git a/src/queue.c b/src/queue.c
index 4c8c4c9cd..6bcaba5c1 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -21,7 +21,7 @@
  * A stream does not necessarily have such a pendconn. Thus the pendconn is
  * designated by the stream->pend_pos pointer. This results in some properties 
:
  *   - pendconn->strm->pend_pos is never NULL for any valid pendconn
- *   - if LIST_ISEMPTY(pendconn->list) is true, the element is unlinked,
+ *   - if p->node.node.leaf_p is NULL, the element is unlinked,
  *     otherwise it necessarily belongs to one of the other lists ; this may
  *     not be atomically checked under threads though ;
  *   - pendconn->px is never NULL if pendconn->list is not empty
@@ -73,6 +73,7 @@
 #include <common/memory.h>
 #include <common/time.h>
 #include <common/hathreads.h>
+#include <eb32tree.h>
 
 #include <proto/queue.h>
 #include <proto/server.h>
@@ -137,8 +138,7 @@ static void __pendconn_unlink(struct pendconn *p)
                p->px->nbpend--;
        }
        HA_ATOMIC_SUB(&p->px->totpend, 1);
-       LIST_DEL(&p->list);
-       LIST_INIT(&p->list);
+       eb32_delete(&p->node);
 }
 
 /* Locks the queue the pendconn element belongs to. This relies on both p->px
@@ -204,20 +204,24 @@ static int pendconn_process_next_strm(struct server *srv, 
struct proxy *px)
        struct pendconn *p = NULL;
        struct pendconn *pp = NULL;
        struct server   *rsrv;
+       struct eb32_node *node;
 
        rsrv = srv->track;
        if (!rsrv)
                rsrv = srv;
 
        p = NULL;
-       if (srv->nbpend)
-               p = LIST_ELEM(srv->pendconns.n, struct pendconn *, list);
+       if (srv->nbpend) {
+               node = eb32_first(&srv->pendconns);
+               p = eb32_entry(node, struct pendconn, node);
+       }
 
        if (srv_currently_usable(rsrv) && px->nbpend &&
            (!(srv->flags & SRV_F_BACKUP) ||
             (!px->srv_act &&
              (srv == px->lbprm.fbck || (px->options & PR_O_USE_ALL_BK))))) {
-               pp = LIST_ELEM(px->pendconns.n, struct pendconn *, list);
+               node = eb32_first(&px->pendconns);
+               pp = eb32_entry(node, struct pendconn, node);
 
                /* If the server pendconn is older than the proxy one,
                 * we process the server one.
@@ -303,6 +307,7 @@ struct pendconn *pendconn_add(struct stream *strm)
        px            = strm->be;
        p->target     = NULL;
        p->srv        = srv;
+       p->node.key   = 0;
        p->px         = px;
        p->strm       = strm;
        p->strm_flags = strm->flags;
@@ -314,14 +319,14 @@ struct pendconn *pendconn_add(struct stream *strm)
                if (srv->nbpend > srv->counters.nbpend_max)
                        srv->counters.nbpend_max = srv->nbpend;
                p->queue_idx = srv->queue_idx - 1; // for increment
-               LIST_ADDQ(&srv->pendconns, &p->list);
+               eb32_insert(&srv->pendconns, &p->node);
        }
        else {
                px->nbpend++;
                if (px->nbpend > px->be_counters.nbpend_max)
                        px->be_counters.nbpend_max = px->nbpend;
                p->queue_idx = px->queue_idx - 1; // for increment
-               LIST_ADDQ(&px->pendconns, &p->list);
+               eb32_insert(&px->pendconns, &p->node);
        }
        strm->pend_pos = p;
 
@@ -336,7 +341,8 @@ struct pendconn *pendconn_add(struct stream *strm)
  */
 int pendconn_redistribute(struct server *s)
 {
-       struct pendconn *p, *pback;
+       struct pendconn *p;
+       struct eb32_node *node;
        int xferred = 0;
 
        /* The REDISP option was specified. We will ignore cookie and force to
@@ -345,7 +351,8 @@ int pendconn_redistribute(struct server *s)
                return 0;
 
        HA_SPIN_LOCK(SERVER_LOCK, &s->lock);
-       list_for_each_entry_safe(p, pback, &s->pendconns, list) {
+       for (node = eb32_first(&s->pendconns); node; node = eb32_next(node)) {
+               p = eb32_entry(&node, struct pendconn, node);
                if (p->strm_flags & SF_FORCE_PRST)
                        continue;
 
@@ -366,7 +373,8 @@ int pendconn_redistribute(struct server *s)
  */
 int pendconn_grab_from_px(struct server *s)
 {
-       struct pendconn *p, *pback;
+       struct pendconn *p;
+       struct eb32_node *node;
        int maxconn, xferred = 0;
 
        if (!srv_currently_usable(s))
@@ -383,7 +391,8 @@ int pendconn_grab_from_px(struct server *s)
 
        HA_SPIN_LOCK(PROXY_LOCK, &s->proxy->lock);
        maxconn = srv_dynamic_maxconn(s);
-       list_for_each_entry_safe(p, pback, &s->proxy->pendconns, list) {
+       while ((node = eb32_first(&s->proxy->pendconns))) {
+               p = eb32_entry(&node, struct pendconn, node);
                if (s->maxconn && s->served + xferred >= maxconn)
                        break;
 
@@ -428,7 +437,7 @@ int pendconn_dequeue(struct stream *strm)
         * unlinked, these functions were completely done.
         */
        pendconn_queue_lock(p);
-       is_unlinked = LIST_ISEMPTY(&p->list);
+       is_unlinked = !p->node.node.leaf_p;
        pendconn_queue_unlock(p);
 
        if (!is_unlinked)
diff --git a/src/server.c b/src/server.c
index c885debca..1d7a5a771 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1627,7 +1627,7 @@ static struct server *new_server(struct proxy *proxy)
        srv->obj_type = OBJ_TYPE_SERVER;
        srv->proxy = proxy;
        LIST_INIT(&srv->actconns);
-       LIST_INIT(&srv->pendconns);
+       srv->pendconns = EB_ROOT;
 
        if ((srv->priv_conns = calloc(global.nbthread, 
sizeof(*srv->priv_conns))) == NULL)
                goto free_srv;
-- 
2.16.3

From e1ef0051745ad16c80d299092958a615f2369478 Mon Sep 17 00:00:00 2001
From: Patrick Hemmer <[email protected]>
Date: Fri, 11 May 2018 12:52:31 -0400
Subject: [PATCH 3/7] MINOR: queue: store the queue index in the stream when
 enqueuing

We store the queue index in the stream and check it on dequeueing to
figure how many entries were processed in between. This way we'll be
able to count the elements that may later be added before ours.

WARNING! a call to pendconn_free() and/or something similar is missing
in front of a number of calls to do_log() in order to report the correct
dequeue value, especially in case of abort or timeout.
---
 include/types/proxy.h  |  1 +
 include/types/queue.h  |  1 +
 include/types/server.h |  1 +
 src/queue.c            | 21 +++++++++++++++------
 src/stream.c           |  1 +
 5 files changed, 19 insertions(+), 6 deletions(-)

diff --git a/include/types/proxy.h b/include/types/proxy.h
index ec95286b6..234a142c0 100644
--- a/include/types/proxy.h
+++ b/include/types/proxy.h
@@ -325,6 +325,7 @@ struct proxy {
        struct list pendconns;                  /* pending connections with no 
server assigned yet */
        int nbpend;                             /* number of pending 
connections with no server assigned yet */
        int totpend;                            /* total number of pending 
connections on this instance (for stats) */
+       unsigned int queue_idx;                 /* number of pending 
connections which have been de-queued */
        unsigned int feconn, beconn;            /* # of active frontend and 
backends streams */
        struct freq_ctr fe_req_per_sec;         /* HTTP requests per second on 
the frontend */
        struct freq_ctr fe_conn_per_sec;        /* received connections per 
second on the frontend */
diff --git a/include/types/queue.h b/include/types/queue.h
index c025b9ce6..575cc5929 100644
--- a/include/types/queue.h
+++ b/include/types/queue.h
@@ -32,6 +32,7 @@ struct stream;
 
 struct pendconn {
        int            strm_flags; /* stream flags */
+       unsigned int   queue_idx;  /* value of proxy/server queue_idx at time 
of enqueue */
        struct stream *strm;
        struct proxy  *px;
        struct server *srv;        /* the server we are waiting for, may be 
NULL if don't care */
diff --git a/include/types/server.h b/include/types/server.h
index 7c6d2257b..7d0ba4571 100644
--- a/include/types/server.h
+++ b/include/types/server.h
@@ -210,6 +210,7 @@ struct server {
        int cur_sess;                           /* number of currently active 
sessions (including syn_sent) */
        unsigned maxconn, minconn;              /* max # of active sessions (0 
= unlimited), min# for dynamic limit. */
        int nbpend;                             /* number of pending 
connections */
+       unsigned int queue_idx;                 /* count of pending connections 
which have been de-queued */
        int maxqueue;                           /* maximum number of pending 
connections allowed */
        struct freq_ctr sess_per_sec;           /* sessions per second on this 
server */
        struct be_counters counters;            /* statistics counters */
diff --git a/src/queue.c b/src/queue.c
index aa22256b1..4c8c4c9cd 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -129,10 +129,13 @@ unsigned int srv_dynamic_maxconn(const struct server *s)
  */
 static void __pendconn_unlink(struct pendconn *p)
 {
-       if (p->srv)
+       if (p->srv) {
+               p->strm->logs.srv_queue_pos += p->srv->queue_idx - p->queue_idx;
                p->srv->nbpend--;
-       else
+       } else {
+               p->strm->logs.prx_queue_pos += p->px->queue_idx - p->queue_idx;
                p->px->nbpend--;
+       }
        HA_ATOMIC_SUB(&p->px->totpend, 1);
        LIST_DEL(&p->list);
        LIST_INIT(&p->list);
@@ -199,6 +202,7 @@ void pendconn_unlink(struct pendconn *p)
 static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
 {
        struct pendconn *p = NULL;
+       struct pendconn *pp = NULL;
        struct server   *rsrv;
 
        rsrv = srv->track;
@@ -213,8 +217,6 @@ static int pendconn_process_next_strm(struct server *srv, 
struct proxy *px)
            (!(srv->flags & SRV_F_BACKUP) ||
             (!px->srv_act &&
              (srv == px->lbprm.fbck || (px->options & PR_O_USE_ALL_BK))))) {
-               struct pendconn *pp;
-
                pp = LIST_ELEM(px->pendconns.n, struct pendconn *, list);
 
                /* If the server pendconn is older than the proxy one,
@@ -236,6 +238,11 @@ static int pendconn_process_next_strm(struct server *srv, 
struct proxy *px)
        p->strm_flags |= SF_ASSIGNED;
        p->target = srv;
 
+       if (p != pp)
+               srv->queue_idx++;
+       else
+               px->queue_idx++;
+
        HA_ATOMIC_ADD(&srv->served, 1);
        HA_ATOMIC_ADD(&srv->proxy->served, 1);
        if (px->lbprm.server_take_conn)
@@ -272,6 +279,8 @@ void process_srv_queue(struct server *s)
  * are updated accordingly. Returns NULL if no memory is available, otherwise 
the
  * pendconn itself. If the stream was already marked as served, its flag is
  * cleared. It is illegal to call this function with a non-NULL strm->srv_conn.
+ * The stream's queue position is counted with an offset of -1 because we want
+ * to make sure that being at the first position in the queue reports 1.
  *
  * This function must be called by the stream itself, so in the context of
  * process_stream.
@@ -302,16 +311,16 @@ struct pendconn *pendconn_add(struct stream *strm)
 
        if (srv) {
                srv->nbpend++;
-               strm->logs.srv_queue_pos += srv->nbpend;
                if (srv->nbpend > srv->counters.nbpend_max)
                        srv->counters.nbpend_max = srv->nbpend;
+               p->queue_idx = srv->queue_idx - 1; // for increment
                LIST_ADDQ(&srv->pendconns, &p->list);
        }
        else {
                px->nbpend++;
-               strm->logs.prx_queue_pos += px->nbpend;
                if (px->nbpend > px->be_counters.nbpend_max)
                        px->be_counters.nbpend_max = px->nbpend;
+               p->queue_idx = px->queue_idx - 1; // for increment
                LIST_ADDQ(&px->pendconns, &p->list);
        }
        strm->pend_pos = p;
diff --git a/src/stream.c b/src/stream.c
index d741a86b9..50b0d6d51 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -488,6 +488,7 @@ void stream_process_counters(struct stream *s)
        void *ptr1,*ptr2;
        struct stksess *ts;
        int i;
+       unsigned int queue_idx;
 
        bytes = s->req.total - s->logs.bytes_in;
        s->logs.bytes_in = s->req.total;
-- 
2.16.3

From dc43cf091a950f08d152f0626a0fd2703bf9e3f3 Mon Sep 17 00:00:00 2001
From: Patrick Hemmer <[email protected]>
Date: Fri, 11 May 2018 12:52:31 -0400
Subject: [PATCH 2/7] MINOR: stream: rename {srv,prx}_queue_size to *_queue_pos

The current name is misleading as it implies a queue size, but the value
instead indicates a position in the queue.
The value is only the queue size at the exact moment the element is enqueued.
Soon we will gain the ability to insert anywhere into the queue, upon which
clarity of the name is more important.
---
 include/types/stream.h | 4 ++--
 src/log.c              | 4 ++--
 src/proto_http.c       | 4 ++--
 src/queue.c            | 4 ++--
 src/stream.c           | 4 ++--
 5 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/include/types/stream.h b/include/types/stream.h
index 0dbc79f44..a3137bf31 100644
--- a/include/types/stream.h
+++ b/include/types/stream.h
@@ -105,8 +105,8 @@ struct strm_logs {
        long  t_connect;                /* delay before the connect() to the 
server succeeds, -1 if never occurs */
        long  t_data;                   /* delay before the first data byte 
from the server ... */
        unsigned long t_close;          /* total stream duration */
-       unsigned long srv_queue_size;   /* number of streams waiting for a 
connect slot on this server at accept() time (in direct assignment) */
-       unsigned long prx_queue_size;   /* overall number of streams waiting 
for a connect slot on this instance at accept() time */
+       unsigned long srv_queue_pos;    /* number of streams de-queued while 
waiting for a connection slot on this server */
+       unsigned long prx_queue_pos;    /* number of streams de-qeuued while 
waiting for a connection slot on this instance */
        long long bytes_in;             /* number of bytes transferred from the 
client to the server */
        long long bytes_out;            /* number of bytes transferred from the 
server to the client */
 };
diff --git a/src/log.c b/src/log.c
index e2ced80d8..dcb2ba069 100644
--- a/src/log.c
+++ b/src/log.c
@@ -2135,7 +2135,7 @@ int build_logline(struct stream *s, char *dst, size_t 
maxsize, struct list *list
                                break;
 
                        case LOG_FMT_SRVQUEUE: // %sq
-                               ret = ltoa_o(s->logs.srv_queue_size, tmplog, 
dst + maxsize - tmplog);
+                               ret = ltoa_o(s->logs.srv_queue_pos, tmplog, dst 
+ maxsize - tmplog);
                                if (ret == NULL)
                                        goto out;
                                tmplog = ret;
@@ -2143,7 +2143,7 @@ int build_logline(struct stream *s, char *dst, size_t 
maxsize, struct list *list
                                break;
 
                        case LOG_FMT_BCKQUEUE:  // %bq
-                               ret = ltoa_o(s->logs.prx_queue_size, tmplog, 
dst + maxsize - tmplog);
+                               ret = ltoa_o(s->logs.prx_queue_pos, tmplog, dst 
+ maxsize - tmplog);
                                if (ret == NULL)
                                        goto out;
                                tmplog = ret;
diff --git a/src/proto_http.c b/src/proto_http.c
index 9cbf3edda..999c39ebf 100644
--- a/src/proto_http.c
+++ b/src/proto_http.c
@@ -4426,8 +4426,8 @@ void http_end_txn_clean_session(struct stream *s)
        s->logs.t_connect = -1;
        s->logs.t_data = -1;
        s->logs.t_close = 0;
-       s->logs.prx_queue_size = 0;  /* we get the number of pending conns 
before us */
-       s->logs.srv_queue_size = 0; /* we will get this number soon */
+       s->logs.prx_queue_pos = 0;  /* we get the number of pending conns 
before us */
+       s->logs.srv_queue_pos = 0; /* we will get this number soon */
 
        s->logs.bytes_in = s->req.total = ci_data(&s->req);
        s->logs.bytes_out = s->res.total = ci_data(&s->res);
diff --git a/src/queue.c b/src/queue.c
index 57fd087bf..aa22256b1 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -302,14 +302,14 @@ struct pendconn *pendconn_add(struct stream *strm)
 
        if (srv) {
                srv->nbpend++;
-               strm->logs.srv_queue_size += srv->nbpend;
+               strm->logs.srv_queue_pos += srv->nbpend;
                if (srv->nbpend > srv->counters.nbpend_max)
                        srv->counters.nbpend_max = srv->nbpend;
                LIST_ADDQ(&srv->pendconns, &p->list);
        }
        else {
                px->nbpend++;
-               strm->logs.prx_queue_size += px->nbpend;
+               strm->logs.prx_queue_pos += px->nbpend;
                if (px->nbpend > px->be_counters.nbpend_max)
                        px->be_counters.nbpend_max = px->nbpend;
                LIST_ADDQ(&px->pendconns, &p->list);
diff --git a/src/stream.c b/src/stream.c
index aa021b0b3..d741a86b9 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -125,8 +125,8 @@ struct stream *stream_new(struct session *sess, enum 
obj_type *origin)
        s->logs.t_data = -1;
        s->logs.t_close = 0;
        s->logs.bytes_in = s->logs.bytes_out = 0;
-       s->logs.prx_queue_size = 0;  /* we get the number of pending conns 
before us */
-       s->logs.srv_queue_size = 0; /* we will get this number soon */
+       s->logs.prx_queue_pos = 0;  /* we get the number of pending conns 
before us */
+       s->logs.srv_queue_pos = 0; /* we will get this number soon */
 
        /* default logging function */
        s->do_log = strm_log;
-- 
2.16.3

From 734fcb3408a21e909bbc487c8e7d66acaa78f377 Mon Sep 17 00:00:00 2001
From: Willy Tarreau <[email protected]>
Date: Wed, 25 Jul 2018 06:55:12 +0200
Subject: [PATCH 1/7] MINOR: queue: make sure the pendconn is released before
 logging

We'll soon need to rely on the pendconn position at the time of dequeuing
to figure the position a stream took in the queue. Usually it's not a
problem since pendconn_free() is called once the connection starts, but
it will make a difference for failed dequeues (eg: queue timeout reached).
Thus it's important to call pendconn_free() before logging in cases we are
not certain whether it was already performed, and to call pendconn_unlink()
after we know the pendconn will not be used so that we collect the queue
state as accurately as possible. As a benefit it will also make the
server's and backend's queues count more accurate in these cases.
---
 src/proto_http.c |  5 +++--
 src/stream.c     | 14 ++++++++++++++
 2 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/src/proto_http.c b/src/proto_http.c
index 42c9d1a62..9cbf3edda 100644
--- a/src/proto_http.c
+++ b/src/proto_http.c
@@ -4403,6 +4403,9 @@ void http_end_txn_clean_session(struct stream *s)
        s->logs.bytes_in  -= ci_data(&s->req);
        s->logs.bytes_out -= ci_data(&s->res);
 
+       /* we may need to know the position in the queue */
+       pendconn_free(s);
+
        /* let's do a final log if we need it */
        if (!LIST_ISEMPTY(&fe->logformat) && s->logs.logwait &&
            !(s->flags & SF_MONITOR) &&
@@ -4429,8 +4432,6 @@ void http_end_txn_clean_session(struct stream *s)
        s->logs.bytes_in = s->req.total = ci_data(&s->req);
        s->logs.bytes_out = s->res.total = ci_data(&s->res);
 
-       pendconn_free(s);
-
        if (objt_server(s->target)) {
                if (s->flags & SF_CURR_SESS) {
                        s->flags &= ~SF_CURR_SESS;
diff --git a/src/stream.c b/src/stream.c
index d794c28a9..aa021b0b3 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -799,6 +799,7 @@ static void sess_establish(struct stream *s)
                /* if the user wants to log as soon as possible, without 
counting
                 * bytes from the server, then this is the right moment. */
                if (!LIST_ISEMPTY(&strm_fe(s)->logformat) && !(s->logs.logwait 
& LW_BYTES)) {
+                       /* note: no pend_pos here, session is established */
                        s->logs.t_close = s->logs.t_connect; /* to get a valid 
end date */
                        s->do_log(s);
                }
@@ -910,6 +911,9 @@ static void sess_update_stream_int(struct stream *s)
 
                        s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, 
&now);
 
+                       /* we may need to know the position in the queue for 
logging */
+                       pendconn_cond_unlink(s->pend_pos);
+
                        /* no stream was ever accounted for this server */
                        si->state = SI_ST_CLO;
                        if (s->srv_error)
@@ -950,6 +954,10 @@ static void sess_update_stream_int(struct stream *s)
                        /* ... and timeout expired */
                        si->exp = TICK_ETERNITY;
                        s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, 
&now);
+
+                       /* we may need to know the position in the queue for 
logging */
+                       pendconn_cond_unlink(s->pend_pos);
+
                        if (srv)
                                HA_ATOMIC_ADD(&srv->counters.failed_conns, 1);
                        HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1);
@@ -967,6 +975,10 @@ static void sess_update_stream_int(struct stream *s)
                /* Connection remains in queue, check if we have to abort it */
                if (check_req_may_abort(req, s)) {
                        s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, 
&now);
+
+                       /* we may need to know the position in the queue for 
logging */
+                       pendconn_cond_unlink(s->pend_pos);
+
                        si->err_type |= SI_ET_QUEUE_ABRT;
                        goto abort_connection;
                }
@@ -2503,6 +2515,8 @@ struct task *process_stream(struct task *t, void 
*context, unsigned short state)
        if (!LIST_ISEMPTY(&sess->fe->logformat) && s->logs.logwait &&
            !(s->flags & SF_MONITOR) &&
            (!(sess->fe->options & PR_O_NULLNOLOG) || req->total)) {
+               /* we may need to know the position in the queue */
+               pendconn_free(s);
                s->do_log(s);
        }
 
-- 
2.16.3

Reply via email to