On 2018/5/31 00:57, Willy Tarreau wrote: > Hi Patrick, > > On Thu, May 31, 2018 at 12:16:27AM -0400, Patrick Hemmer wrote: >>> I looked at the code to see if something could cause that. I found that the >>> key increment could be a reason (you must restart from the next element, >>> not an upper value since there will be many duplicate keys) >> Gah, I completely forgot about duplicate keys. Will fix. > I'll send you (later today) some splitted patches that will help for > this. This simplifies review and experimentations on the walk algorithm. I kept the patch set you created, and have made some minor adjustments to address the mentioned issues.
I think the only uncertainty I had during implementation was an atomic operator for retrieving the queue_idx value inside stream_process_counters. There's no load operator, but I'm not sure if that is by design, or what the preferred solution is here. In theory if the value is aligned, the load is atomic anyway and we don't need an explicit atomic operator. But I'm not sure if that's an assumption we want to make. >> The warning is addressable. It means the user should add a `timeout >> queue` and set it less than 524287ms. This is similar to the "missing >> timeouts for frontend/backend" warning we print. Though I think it would >> make sense to add "queue" to that existing warning instead, as it >> already mentions the timeouts for client, connect & server. > In fact the problem I'm having is that I've already seen some very high > timeout values in field (eg on a print server), which means that some > people do need to have a large value here. For sure they don't need to > reach 24 days but 8min will definitely be too short for certain extreme > cases. And I understand the impact of such large values with the queue, > I just don't know how we can address this at the moment, it's what I'm > still thinking about. > >>> We need to find a more suitable name for this "cntdepend" as it really >>> doesn't >>> tell me that it has anything to do with what it says. I don't have anything >>> to propose for now I'm afraid. >> Yeah, not fond of the name, but it was something, and is easy to change. >> The name originated from "cnt"=counter, and "pend" to be consistent with >> the naming of "nbpend" and "totpend" right above it. So: counter of >> de-queued pending connections. > OK. In fact given that it's a wrapping counter it's why I considered it > could be an index for the srv and px parts, but the stream part stores > a distance between the recorded index and the current index. > >>> @@ -2467,6 +2469,10 @@ struct task *process_stream(struct task *t, void >>> *context, unsigned short state) >>> return t; /* nothing more to do */ >>> } >>> >>> + // remove from pending queue here so we update counters >>> + if (s->pend_pos) >>> + pendconn_free(s->pend_pos); >>> + >>> if (s->flags & SF_BE_ASSIGNED) >>> HA_ATOMIC_SUB(&s->be->beconn, 1); >>> >>> This part is not supposed to be needed since it's already performed >>> in stream_free() a few lines later. Or if it's required to get the >>> queue values correctly before logging, then something similar will >>> be required at every place where we log after trying to connect to >>> a server (there are a few places depending on the log format and >>> the logasap option). >> Yes, it was put there to update the counters before logging. Re-thinking >> this, updating of the counters needs to be moved into >> pendconn_process_next_stream anyway. Since this function updates >> px->cntdepend and is called in a loop, calculating logs.prx_queue_pos >> after this loop completes will yield incorrect values. > I thought about the same but was not yet completely certain. I suspect in > fact the stream's value should indeed be updated upon dequeue, and that > only the error case has to be taken into account before logging (if there > was no dequeue). In this case I think we can have this one performed here > provided we ensure other log places will not be triggered on error. > >>> Now I think I understand how the cntdepend works : isn't it simply a >>> wrapping queue index that you use to measure the difference between >>> when you queued and when you dequeued ? In this case, wouldn't something >>> like "queue_idx" be more explicit ? At least it becomes more obvious when >>> doing the measure that we measure a length by subtracting two indexes. >> It is a measure of a difference yes, but it's the difference between how >> many items were processed off the queue. I personally wouldn't call it >> an index as index implies position. > I personally see it as a position. Well exactly a sequence number in fact :-) If you are waiting in the line at the cinema, and when you get in the queue you're given a number of how many people the cinema has served since it opened, that's not your position in the queue. Just as when you're served, and are given a new number of how many people have been served, it's still not your queue position. And then if you take the difference between the two, that tells you how many people were served while you were waiting. But it doesn't mean that's how many people were in front of you when you joined the queue, as people could have been inserted before you after you joined. But it's just a name, and the description in the header file is accurate, so I'll keep the name you changed it to. >> Did you have any thoughts on the 64-bit absolute timestamp patch? It was >> my understanding that the earlier objections were due to anticipated >> performance and complexity issues. But the performance is better, the >> code is simpler, and has less issues (queue timeout, wrapping at extreme >> offset values, etc). > Yes I had some thoughts but mixed. It's indeed simpler, but from the beginning > there is something I don't like in it that I'm totally unable to explain. I > really hate this because I know it feels totally unfair but I'm really > convinced that taking it this way we'll regret it later. I'm a bit embarrassed > and have been thinking for some time what bothers me but I don't know very > well. The root cause of the problem is changing the way the timers work. > Everywhere we use a perfectly fine wrapping timer that can run continuously > and ensures that bugs (if any) are quickly met and resolved. Except that in the priority queue case, the wrapping times don't work perfectly fine. Anything which sits in the queue long enough to wrap is a problem. > Here with a > non-wrapping time, we more or less consider that given that we'll all be > dead when it wraps we don't care about what happens. And this "don't care" > approach really doesn't appeal me because we may overlook serious problems > that will trigger much earlier than expected, or could depend on the platform > or anything. You know, a bit like people using "9/9/99" as infinite time 25 > years ago, until their applications stopped on this date, or long-term > contracts were dropped or anything. There's a difference though in that we don't allow the user to enter an absolute timestamp. The configuration they enter is relative. Yes it's possible they enter a priority offset of X years, where after some date, X+now() wraps around, but we can mitigate that by using a time since haproxy start instead of time since unix epoch. Then if we limit max/min offset to restrict a 1000 years from the max/min 52-bit values, haproxy would have to run for 1000 years continuous before any issues arise. -Patrick
From ec45b0b4a5321f0c1aa5ebbb19ecb5d1dd46776f Mon Sep 17 00:00:00 2001 From: Patrick Hemmer <[email protected]> Date: Fri, 11 May 2018 12:52:31 -0400 Subject: [PATCH 1/6] MINOR: stream: rename {srv,prx}_queue_size to *_queue_pos The current name is misleading as it indicates a position in the queue but pretends to be a queue size. It's only the queue size at the exact moment the element is enqueued but this will not be true anymore once we start inserting anywhere in the queue. --- include/types/stream.h | 4 ++-- src/log.c | 4 ++-- src/proto_http.c | 4 ++-- src/queue.c | 4 ++-- src/stream.c | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/include/types/stream.h b/include/types/stream.h index 0dbc79f44..a3137bf31 100644 --- a/include/types/stream.h +++ b/include/types/stream.h @@ -105,8 +105,8 @@ struct strm_logs { long t_connect; /* delay before the connect() to the server succeeds, -1 if never occurs */ long t_data; /* delay before the first data byte from the server ... */ unsigned long t_close; /* total stream duration */ - unsigned long srv_queue_size; /* number of streams waiting for a connect slot on this server at accept() time (in direct assignment) */ - unsigned long prx_queue_size; /* overall number of streams waiting for a connect slot on this instance at accept() time */ + unsigned long srv_queue_pos; /* number of streams de-queued while waiting for a connection slot on this server */ + unsigned long prx_queue_pos; /* number of streams de-qeuued while waiting for a connection slot on this instance */ long long bytes_in; /* number of bytes transferred from the client to the server */ long long bytes_out; /* number of bytes transferred from the server to the client */ }; diff --git a/src/log.c b/src/log.c index b2d4367f4..6ddfbd6c3 100644 --- a/src/log.c +++ b/src/log.c @@ -2131,7 +2131,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list break; case LOG_FMT_SRVQUEUE: // %sq - ret = ltoa_o(s->logs.srv_queue_size, tmplog, dst + maxsize - tmplog); + ret = ltoa_o(s->logs.srv_queue_pos, tmplog, dst + maxsize - tmplog); if (ret == NULL) goto out; tmplog = ret; @@ -2139,7 +2139,7 @@ int build_logline(struct stream *s, char *dst, size_t maxsize, struct list *list break; case LOG_FMT_BCKQUEUE: // %bq - ret = ltoa_o(s->logs.prx_queue_size, tmplog, dst + maxsize - tmplog); + ret = ltoa_o(s->logs.prx_queue_pos, tmplog, dst + maxsize - tmplog); if (ret == NULL) goto out; tmplog = ret; diff --git a/src/proto_http.c b/src/proto_http.c index 4fd5aeb15..26bebaef7 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -4373,8 +4373,8 @@ void http_end_txn_clean_session(struct stream *s) s->logs.t_connect = -1; s->logs.t_data = -1; s->logs.t_close = 0; - s->logs.prx_queue_size = 0; /* we get the number of pending conns before us */ - s->logs.srv_queue_size = 0; /* we will get this number soon */ + s->logs.prx_queue_pos = 0; /* we get the number of pending conns before us */ + s->logs.srv_queue_pos = 0; /* we will get this number soon */ s->logs.bytes_in = s->req.total = s->req.buf->i; s->logs.bytes_out = s->res.total = s->res.buf->i; diff --git a/src/queue.c b/src/queue.c index 1c730c75c..558268c48 100644 --- a/src/queue.c +++ b/src/queue.c @@ -216,7 +216,7 @@ struct pendconn *pendconn_add(struct stream *strm) p->srv = srv; HA_SPIN_LOCK(SERVER_LOCK, &srv->lock); srv->nbpend++; - strm->logs.srv_queue_size += srv->nbpend; + strm->logs.srv_queue_pos += srv->nbpend; if (srv->nbpend > srv->counters.nbpend_max) srv->counters.nbpend_max = srv->nbpend; LIST_ADDQ(&srv->pendconns, &p->list); @@ -225,7 +225,7 @@ struct pendconn *pendconn_add(struct stream *strm) else { HA_SPIN_LOCK(PROXY_LOCK, &px->lock); px->nbpend++; - strm->logs.prx_queue_size += px->nbpend; + strm->logs.prx_queue_pos += px->nbpend; if (px->nbpend > px->be_counters.nbpend_max) px->be_counters.nbpend_max = px->nbpend; LIST_ADDQ(&px->pendconns, &p->list); diff --git a/src/stream.c b/src/stream.c index 9fdf6621b..f47a7cffa 100644 --- a/src/stream.c +++ b/src/stream.c @@ -125,8 +125,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) s->logs.t_data = -1; s->logs.t_close = 0; s->logs.bytes_in = s->logs.bytes_out = 0; - s->logs.prx_queue_size = 0; /* we get the number of pending conns before us */ - s->logs.srv_queue_size = 0; /* we will get this number soon */ + s->logs.prx_queue_pos = 0; /* we get the number of pending conns before us */ + s->logs.srv_queue_pos = 0; /* we will get this number soon */ /* default logging function */ s->do_log = strm_log; -- 2.16.3
From afda6fb76464878d21f70e5a19ade9fca619b913 Mon Sep 17 00:00:00 2001 From: Patrick Hemmer <[email protected]> Date: Fri, 11 May 2018 12:52:31 -0400 Subject: [PATCH 2/6] WIP/MINOR: queue: store the queue index in the stream when enqueuing We store the queue index in the stream and check in on dequeueing to figure how many entries were processed in between. This way we'll be able to count the elements that may later be added before ours. WARNING! a call to pendconn_free() and/or something similar is missing in front of a number of calls to do_log() in order to report the correct dequeue value, especially in case of abort or timeout. --- include/types/proxy.h | 1 + include/types/server.h | 1 + include/types/stream.h | 1 + src/queue.c | 21 +++++++++++++++------ src/stream.c | 13 +++++++++++++ 5 files changed, 31 insertions(+), 6 deletions(-) diff --git a/include/types/proxy.h b/include/types/proxy.h index 16c13a1c1..a233cdb48 100644 --- a/include/types/proxy.h +++ b/include/types/proxy.h @@ -325,6 +325,7 @@ struct proxy { struct list pendconns; /* pending connections with no server assigned yet */ int nbpend; /* number of pending connections with no server assigned yet */ int totpend; /* total number of pending connections on this instance (for stats) */ + unsigned int queue_idx; /* number of pending connections which have been de-queued */ unsigned int feconn, beconn; /* # of active frontend and backends streams */ struct freq_ctr fe_req_per_sec; /* HTTP requests per second on the frontend */ struct freq_ctr fe_conn_per_sec; /* received connections per second on the frontend */ diff --git a/include/types/server.h b/include/types/server.h index 0cd20c096..85c55a269 100644 --- a/include/types/server.h +++ b/include/types/server.h @@ -209,6 +209,7 @@ struct server { int cur_sess; /* number of currently active sessions (including syn_sent) */ unsigned maxconn, minconn; /* max # of active sessions (0 = unlimited), min# for dynamic limit. */ int nbpend; /* number of pending connections */ + unsigned int queue_idx; /* count of pending connections which have been de-queued */ int maxqueue; /* maximum number of pending connections allowed */ struct freq_ctr sess_per_sec; /* sessions per second on this server */ struct be_counters counters; /* statistics counters */ diff --git a/include/types/stream.h b/include/types/stream.h index a3137bf31..8b600e306 100644 --- a/include/types/stream.h +++ b/include/types/stream.h @@ -125,6 +125,7 @@ struct stream { struct server *srv_conn; /* stream already has a slot on a server and is not in queue */ struct pendconn *pend_pos; /* if not NULL, points to the pending position in the pending queue */ + unsigned int queue_idx; /* value of proxy/server queue_idx at time of enqueue */ struct http_txn *txn; /* current HTTP transaction being processed. Should become a list. */ diff --git a/src/queue.c b/src/queue.c index 558268c48..43bbcf74f 100644 --- a/src/queue.c +++ b/src/queue.c @@ -71,10 +71,13 @@ unsigned int srv_dynamic_maxconn(const struct server *s) */ static void pendconn_unlink(struct pendconn *p) { - if (p->srv) + if (p->srv) { + p->strm->logs.srv_queue_pos += p->srv->queue_idx - p->strm->queue_idx; p->srv->nbpend--; - else + } else { + p->strm->logs.prx_queue_pos += p->px->queue_idx - p->strm->queue_idx; p->px->nbpend--; + } HA_ATOMIC_SUB(&p->px->totpend, 1); LIST_DEL(&p->list); LIST_INIT(&p->list); @@ -101,6 +104,7 @@ static void pendconn_unlink(struct pendconn *p) static int pendconn_process_next_strm(struct server *srv, struct proxy *px) { struct pendconn *p = NULL; + struct pendconn *pp = NULL; struct server *rsrv; int remote; @@ -118,8 +122,6 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px) ps_found: if (srv_currently_usable(rsrv) && px->nbpend) { - struct pendconn *pp; - list_for_each_entry(pp, &px->pendconns, list) { /* If the server pendconn is older than the proxy one, * we process the server one. */ @@ -146,6 +148,11 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px) p->strm_flags |= SF_ASSIGNED; p->srv = srv; + if (p != pp) + srv->queue_idx++; + else + px->queue_idx++; + HA_ATOMIC_ADD(&srv->served, 1); HA_ATOMIC_ADD(&srv->proxy->served, 1); if (px->lbprm.server_take_conn) @@ -216,7 +223,7 @@ struct pendconn *pendconn_add(struct stream *strm) p->srv = srv; HA_SPIN_LOCK(SERVER_LOCK, &srv->lock); srv->nbpend++; - strm->logs.srv_queue_pos += srv->nbpend; + strm->queue_idx = srv->queue_idx; if (srv->nbpend > srv->counters.nbpend_max) srv->counters.nbpend_max = srv->nbpend; LIST_ADDQ(&srv->pendconns, &p->list); @@ -225,7 +232,7 @@ struct pendconn *pendconn_add(struct stream *strm) else { HA_SPIN_LOCK(PROXY_LOCK, &px->lock); px->nbpend++; - strm->logs.prx_queue_pos += px->nbpend; + strm->queue_idx = px->queue_idx; if (px->nbpend > px->be_counters.nbpend_max) px->be_counters.nbpend_max = px->nbpend; LIST_ADDQ(&px->pendconns, &p->list); @@ -374,12 +381,14 @@ void pendconn_free(struct pendconn *p) if (p->srv) { HA_SPIN_LOCK(SERVER_LOCK, &p->srv->lock); + p->strm->logs.srv_queue_pos += p->srv->queue_idx - p->strm->queue_idx; p->srv->nbpend--; LIST_DEL(&p->list); HA_SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock); } else { HA_SPIN_LOCK(PROXY_LOCK, &p->px->lock); + p->strm->logs.prx_queue_pos += p->px->queue_idx - p->strm->queue_idx; p->px->nbpend--; LIST_DEL(&p->list); HA_SPIN_UNLOCK(PROXY_LOCK, &p->px->lock); diff --git a/src/stream.c b/src/stream.c index f47a7cffa..d1a5b425d 100644 --- a/src/stream.c +++ b/src/stream.c @@ -488,6 +488,7 @@ void stream_process_counters(struct stream *s) void *ptr1,*ptr2; struct stksess *ts; int i; + unsigned int queue_idx; bytes = s->req.total - s->logs.bytes_in; s->logs.bytes_in = s->req.total; @@ -568,6 +569,18 @@ void stream_process_counters(struct stream *s) stktable_touch_local(stkctr->table, stkctr_entry(stkctr), 0); } } + + if (s->pend_pos && !LIST_ISEMPTY(&s->pend_pos->list)) { + if (s->pend_pos->srv) { + queue_idx = HA_ATOMIC_ADD(&s->pend_pos->srv->queue_idx, 0); + s->logs.srv_queue_pos += queue_idx - s->queue_idx; + s->queue_idx = queue_idx; + } else { + queue_idx = HA_ATOMIC_ADD(&s->pend_pos->px->queue_idx, 0); + s->logs.prx_queue_pos += queue_idx - s->queue_idx; + s->queue_idx = queue_idx; + } + } } /* This function is called with (si->state == SI_ST_CON) meaning that a -- 2.16.3
From 00310093e711a32a1b7598398eb1f84d92fd37d5 Mon Sep 17 00:00:00 2001 From: Patrick Hemmer <[email protected]> Date: Fri, 11 May 2018 12:52:31 -0400 Subject: [PATCH 3/6] MINOR: queue: replace the linked list with a tree We'll need trees to manage the queues by priorities. This change replaces the list with a tree based on a single key. It's effectively a list but allows us to get rid of the list management right now. --- include/types/proxy.h | 2 +- include/types/queue.h | 2 +- include/types/server.h | 2 +- src/hlua.c | 4 ++-- src/proxy.c | 2 +- src/queue.c | 44 ++++++++++++++++++++++++++++++-------------- src/server.c | 2 +- src/stream.c | 2 +- 8 files changed, 38 insertions(+), 22 deletions(-) diff --git a/include/types/proxy.h b/include/types/proxy.h index a233cdb48..dcd7982c3 100644 --- a/include/types/proxy.h +++ b/include/types/proxy.h @@ -322,7 +322,7 @@ struct proxy { int serverfin; /* timeout to apply to server half-closed connections */ } timeout; char *id, *desc; /* proxy id (name) and description */ - struct list pendconns; /* pending connections with no server assigned yet */ + struct eb_root pendconns; /* pending connections with no server assigned yet */ int nbpend; /* number of pending connections with no server assigned yet */ int totpend; /* total number of pending connections on this instance (for stats) */ unsigned int queue_idx; /* number of pending connections which have been de-queued */ diff --git a/include/types/queue.h b/include/types/queue.h index 42dbbd047..03377da69 100644 --- a/include/types/queue.h +++ b/include/types/queue.h @@ -35,7 +35,7 @@ struct pendconn { struct stream *strm; struct proxy *px; struct server *srv; /* the server we are waiting for, may be NULL */ - struct list list; /* next pendconn */ + struct eb32_node node; __decl_hathreads(HA_SPINLOCK_T lock); }; diff --git a/include/types/server.h b/include/types/server.h index 85c55a269..756e136c4 100644 --- a/include/types/server.h +++ b/include/types/server.h @@ -214,7 +214,7 @@ struct server { struct freq_ctr sess_per_sec; /* sessions per second on this server */ struct be_counters counters; /* statistics counters */ - struct list pendconns; /* pending connections */ + struct eb_root pendconns; /* pending connections */ struct list actconns; /* active connections */ struct list *priv_conns; /* private idle connections attached to stream interfaces */ struct list *idle_conns; /* sharable idle connections attached or not to a stream interface */ diff --git a/src/hlua.c b/src/hlua.c index 2f7fe9960..9a3969886 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -7940,7 +7940,7 @@ void hlua_init(void) socket_tcp.proxy = &socket_proxy; socket_tcp.obj_type = OBJ_TYPE_SERVER; LIST_INIT(&socket_tcp.actconns); - LIST_INIT(&socket_tcp.pendconns); + socket_tcp.pendconns = EB_ROOT; socket_tcp.priv_conns = NULL; socket_tcp.idle_conns = NULL; socket_tcp.safe_conns = NULL; @@ -7986,7 +7986,7 @@ void hlua_init(void) socket_ssl.proxy = &socket_proxy; socket_ssl.obj_type = OBJ_TYPE_SERVER; LIST_INIT(&socket_ssl.actconns); - LIST_INIT(&socket_ssl.pendconns); + socket_ssl.pendconns = EB_ROOT; socket_tcp.priv_conns = NULL; socket_tcp.idle_conns = NULL; socket_tcp.safe_conns = NULL; diff --git a/src/proxy.c b/src/proxy.c index c262966a2..26bdd464a 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -726,7 +726,7 @@ void init_new_proxy(struct proxy *p) { memset(p, 0, sizeof(struct proxy)); p->obj_type = OBJ_TYPE_PROXY; - LIST_INIT(&p->pendconns); + p->pendconns = EB_ROOT; LIST_INIT(&p->acl); LIST_INIT(&p->http_req_rules); LIST_INIT(&p->http_res_rules); diff --git a/src/queue.c b/src/queue.c index 43bbcf74f..3f2e0bdcd 100644 --- a/src/queue.c +++ b/src/queue.c @@ -14,6 +14,7 @@ #include <common/memory.h> #include <common/time.h> #include <common/hathreads.h> +#include <eb32tree.h> #include <proto/queue.h> #include <proto/server.h> @@ -79,8 +80,7 @@ static void pendconn_unlink(struct pendconn *p) p->px->nbpend--; } HA_ATOMIC_SUB(&p->px->totpend, 1); - LIST_DEL(&p->list); - LIST_INIT(&p->list); + eb32_delete(&p->node); } /* Process the next pending connection from either a server or a proxy, and @@ -106,6 +106,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px) struct pendconn *p = NULL; struct pendconn *pp = NULL; struct server *rsrv; + struct eb32_node *node; int remote; rsrv = srv->track; @@ -113,7 +114,10 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px) rsrv = srv; if (srv->nbpend) { - list_for_each_entry(p, &srv->pendconns, list) { + for (node = eb32_first(&srv->pendconns); + node; + node = eb32_next(node)) { + p = eb32_entry(node, struct pendconn, node); if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock)) goto ps_found; } @@ -122,7 +126,10 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px) ps_found: if (srv_currently_usable(rsrv) && px->nbpend) { - list_for_each_entry(pp, &px->pendconns, list) { + for (node = eb32_first(&px->pendconns); + node; + node = eb32_next(node)) { + pp = eb32_entry(node, struct pendconn, node); /* If the server pendconn is older than the proxy one, * we process the server one. */ if (p && !tv_islt(&pp->strm->logs.tv_request, &p->strm->logs.tv_request)) @@ -213,6 +220,7 @@ struct pendconn *pendconn_add(struct stream *strm) srv = objt_server(strm->target); px = strm->be; + p->node.key = 0; p->srv = NULL; p->px = px; p->strm = strm; @@ -226,7 +234,7 @@ struct pendconn *pendconn_add(struct stream *strm) strm->queue_idx = srv->queue_idx; if (srv->nbpend > srv->counters.nbpend_max) srv->counters.nbpend_max = srv->nbpend; - LIST_ADDQ(&srv->pendconns, &p->list); + eb32_insert(&srv->pendconns, &p->node); HA_SPIN_UNLOCK(SERVER_LOCK, &srv->lock); } else { @@ -235,7 +243,7 @@ struct pendconn *pendconn_add(struct stream *strm) strm->queue_idx = px->queue_idx; if (px->nbpend > px->be_counters.nbpend_max) px->be_counters.nbpend_max = px->nbpend; - LIST_ADDQ(&px->pendconns, &p->list); + eb32_insert(&px->pendconns, &p->node); HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock); } HA_ATOMIC_ADD(&px->totpend, 1); @@ -248,7 +256,8 @@ struct pendconn *pendconn_add(struct stream *strm) */ int pendconn_redistribute(struct server *s) { - struct pendconn *p, *pback; + struct pendconn *p; + struct eb32_node *node; int xferred = 0; int remote = 0; @@ -258,7 +267,10 @@ int pendconn_redistribute(struct server *s) return 0; HA_SPIN_LOCK(SERVER_LOCK, &s->lock); - list_for_each_entry_safe(p, pback, &s->pendconns, list) { + for (node = eb32_first(&s->pendconns); + node; + node = eb32_next(node)) { + p = eb32_entry(&node, struct pendconn, node); if (p->strm_flags & SF_FORCE_PRST) continue; @@ -287,7 +299,8 @@ int pendconn_redistribute(struct server *s) */ int pendconn_grab_from_px(struct server *s) { - struct pendconn *p, *pback; + struct pendconn *p; + struct eb32_node *node; int maxconn, xferred = 0; int remote = 0; @@ -296,7 +309,10 @@ int pendconn_grab_from_px(struct server *s) HA_SPIN_LOCK(PROXY_LOCK, &s->proxy->lock); maxconn = srv_dynamic_maxconn(s); - list_for_each_entry_safe(p, pback, &s->proxy->pendconns, list) { + for (node = eb32_first(&s->proxy->pendconns); + node; + node = eb32_next(node)) { + p = eb32_entry(&node, struct pendconn, node); if (s->maxconn && s->served + xferred >= maxconn) break; @@ -344,7 +360,7 @@ int pendconn_dequeue(struct stream *strm) /* the pendconn is still linked to the server/proxy queue, so unlock it * and go away. */ - if (!LIST_ISEMPTY(&p->list)) { + if (p->node.node.leaf_p) { HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock); return 1; } @@ -376,21 +392,21 @@ void pendconn_free(struct pendconn *p) HA_SPIN_LOCK(PENDCONN_LOCK, &p->lock); /* The pendconn was already unlinked, just release it. */ - if (LIST_ISEMPTY(&p->list)) + if (!p->node.node.leaf_p) goto release; if (p->srv) { HA_SPIN_LOCK(SERVER_LOCK, &p->srv->lock); p->strm->logs.srv_queue_pos += p->srv->queue_idx - p->strm->queue_idx; p->srv->nbpend--; - LIST_DEL(&p->list); + eb32_delete(&p->node); HA_SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock); } else { HA_SPIN_LOCK(PROXY_LOCK, &p->px->lock); p->strm->logs.prx_queue_pos += p->px->queue_idx - p->strm->queue_idx; p->px->nbpend--; - LIST_DEL(&p->list); + eb32_delete(&p->node); HA_SPIN_UNLOCK(PROXY_LOCK, &p->px->lock); } HA_ATOMIC_SUB(&p->px->totpend, 1); diff --git a/src/server.c b/src/server.c index 277d1405e..36a7ea1d2 100644 --- a/src/server.c +++ b/src/server.c @@ -1575,7 +1575,7 @@ static struct server *new_server(struct proxy *proxy) srv->obj_type = OBJ_TYPE_SERVER; srv->proxy = proxy; LIST_INIT(&srv->actconns); - LIST_INIT(&srv->pendconns); + srv->pendconns = EB_ROOT; if ((srv->priv_conns = calloc(global.nbthread, sizeof(*srv->priv_conns))) == NULL) goto free_srv; diff --git a/src/stream.c b/src/stream.c index d1a5b425d..f774406dd 100644 --- a/src/stream.c +++ b/src/stream.c @@ -570,7 +570,7 @@ void stream_process_counters(struct stream *s) } } - if (s->pend_pos && !LIST_ISEMPTY(&s->pend_pos->list)) { + if (s->pend_pos && s->pend_pos->node.node.leaf_p) { if (s->pend_pos->srv) { queue_idx = HA_ATOMIC_ADD(&s->pend_pos->srv->queue_idx, 0); s->logs.srv_queue_pos += queue_idx - s->queue_idx; -- 2.16.3
From 0a47fc71c1a2fc029b41bbdf1b7c9329c294ef26 Mon Sep 17 00:00:00 2001 From: Patrick Hemmer <[email protected]> Date: Fri, 11 May 2018 12:52:31 -0400 Subject: [PATCH 4/6] MEDIUM: add set-priority-class and set-priority-offset This adds the set-priority-class and set-priority-offset actions to http-request and tcp-request content. At this point they are not used yet, which is the purpose of the next commit, but all the logic to set and clear the values is there. --- doc/configuration.txt | 38 ++++++++++++++ doc/lua-api/index.rst | 18 +++++++ include/proto/queue.h | 19 +++++++ include/types/stream.h | 3 ++ src/hlua.c | 53 +++++++++++++------ src/queue.c | 137 +++++++++++++++++++++++++++++++++++++++++++++++++ src/stream.c | 2 + 7 files changed, 255 insertions(+), 15 deletions(-) diff --git a/doc/configuration.txt b/doc/configuration.txt index e901d7eea..0a0d9a038 100644 --- a/doc/configuration.txt +++ b/doc/configuration.txt @@ -3917,6 +3917,7 @@ http-request { allow | auth [realm <realm>] | redirect <rule> | reject | replace-value <name> <match-regex> <replace-fmt> | set-method <fmt> | set-path <fmt> | set-query <fmt> | set-uri <fmt> | set-tos <tos> | set-mark <mark> | + set-priority-class <expr> | set-priority-offset <expr> add-acl(<file name>) <key fmt> | del-acl(<file name>) <key fmt> | del-map(<file name>) <key fmt> | @@ -4113,6 +4114,24 @@ http-request { allow | auth [realm <realm>] | redirect <rule> | reject | downloads). This works on Linux kernels 2.6.32 and above and requires admin privileges. + - "set-priority-class" is used to set the queue priority class of the + current request. The value must be a sample expression which converts to + an integer in the range -2047..2047. Results outside this range will be + truncated. The priority class determines the order in which queued + requests are processed. Lower values have higher priority. + + - "set-priority-offset" is used to set the queue priority timestamp offset + of the current request. The value must be a sample expression which + converts to an integer in the range -524287..524287. Results outside this + range will be truncated. When a request is queued, it is ordered first by + the priority class, then by the current timestamp adjusted by the given + offset in milliseconds. Lower values have higher priority. + Note that the resulting timestamp is is only tracked with enough precision + for 524,287ms (8m44s287ms). If the request is queued long enough to where + the adjusted timestamp exceeds this value, it will be misidentified as + highest priority. Thus it is important to set "timeout queue" to a value, + where when combined with the offset, does not exceed this limit. + - "add-acl" is used to add a new entry into an ACL. The ACL must be loaded from a file (even a dummy empty file). The file name of the ACL to be updated is passed between parentheses. It takes one argument: <key fmt>, @@ -9452,6 +9471,7 @@ tcp-request content <action> [{if | unless} <condition>] - accept : the request is accepted - reject : the request is rejected and the connection is closed - capture : the specified sample expression is captured + - set-priority-class <expr> | set-priority-offset <expr> - { track-sc0 | track-sc1 | track-sc2 } <key> [table <table>] - sc-inc-gpc0(<sc-id>) - sc-inc-gpc1(<sc-id>) @@ -9513,6 +9533,24 @@ tcp-request content <action> [{if | unless} <condition>] The "unset-var" is used to unset a variable. See above for details about <var-name>. + The "set-priority-class" is used to set the queue priority class of the + current request. The value must be a sample expression which converts to an + integer in the range -2047..2047. Results outside this range will be + truncated. The priority class determines the order in which queued requests + are processed. Lower values have higher priority. + + The "set-priority-offset" is used to set the queue priority timestamp offset + of the current request. The value must be a sample expression which converts + to an integer in the range -524287..524287. Results outside this range will be + truncated. When a request is queued, it is ordered first by the priority + class, then by the current timestamp adjusted by the given offset in + milliseconds. Lower values have higher priority. + Note that the resulting timestamp is is only tracked with enough precision for + 524,287ms (8m44s287ms). If the request is queued long enough to where the + adjusted timestamp exceeds this value, it will be misidentified as highest + priority. Thus it is important to set "timeout queue" to a value, where when + combined with the offset, does not exceed this limit. + The "send-spoe-group" is used to trigger sending of a group of SPOE messages. To do so, the SPOE engine used to send messages must be defined, as well as the SPOE group to send. Of course, the SPOE engine must refer to an diff --git a/doc/lua-api/index.rst b/doc/lua-api/index.rst index ee9dab55c..0c79766eb 100644 --- a/doc/lua-api/index.rst +++ b/doc/lua-api/index.rst @@ -1769,6 +1769,24 @@ TXN class :param class_txn txn: The class txn object containing the data. :param integer mark: The mark value. +.. js:function:: TXN.set_priority_class(txn, prio) + + This function adjusts the priority class of the transaction. The value should + be within the range -2047..2047. Values outside this range will be + truncated. + + See the HAProxy configuration.txt file keyword "http-request" action + "set-priority-class" for details. + +.. js:function:: TXN.set_priority_offset(txn, prio) + + This function adjusts the priority offset of the transaction. The value + should be within the range -524287..524287. Values outside this range will be + truncated. + + See the HAProxy configuration.txt file keyword "http-request" action + "set-priority-offset" for details. + .. _socket_class: Socket class diff --git a/include/proto/queue.h b/include/proto/queue.h index 2d4773a09..27373b960 100644 --- a/include/proto/queue.h +++ b/include/proto/queue.h @@ -59,6 +59,25 @@ static inline int may_dequeue_tasks(const struct server *s, const struct proxy * (!s->maxconn || s->cur_sess < srv_dynamic_maxconn(s))); } +static inline int queue_limit_class(int class) +{ + if (class < -0x7ff) + return -0x7ff; + if (class > 0x7ff) + return 0x7ff; + return class; +} + +static inline int queue_limit_offset(int offset) +{ + if (offset < -0x7ffff) + return -0x7ffff; + if (offset > 0x7ffff) + return 0x7ffff; + return offset; +} + + #endif /* _PROTO_QUEUE_H */ /* diff --git a/include/types/stream.h b/include/types/stream.h index 8b600e306..ff2b1aeb5 100644 --- a/include/types/stream.h +++ b/include/types/stream.h @@ -125,6 +125,7 @@ struct stream { struct server *srv_conn; /* stream already has a slot on a server and is not in queue */ struct pendconn *pend_pos; /* if not NULL, points to the pending position in the pending queue */ + int32_t priority_offset; /* priority offset of the stream for the pending queue */ unsigned int queue_idx; /* value of proxy/server queue_idx at time of enqueue */ struct http_txn *txn; /* current HTTP transaction being processed. Should become a list. */ @@ -132,6 +133,8 @@ struct stream { struct task *task; /* the task associated with this stream */ unsigned short pending_events; /* the pending events not yet processed by the stream. * This is a bit field of TASK_WOKEN_* */ + int16_t priority_class; /* priority class of the stream for the pending queue */ + /* still 32-bit hole here */ struct list list; /* position in global streams list */ struct list by_srv; /* position in server stream list */ diff --git a/src/hlua.c b/src/hlua.c index 9a3969886..d08968379 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -44,6 +44,7 @@ #include <proto/hlua_fcn.h> #include <proto/map.h> #include <proto/obj_type.h> +#include <proto/queue.h> #include <proto/pattern.h> #include <proto/payload.h> #include <proto/proto_http.h> @@ -5321,6 +5322,26 @@ __LJMP static int hlua_txn_set_mark(lua_State *L) return 0; } +__LJMP static int hlua_txn_set_priority_class(lua_State *L) +{ + struct hlua_txn *htxn; + + MAY_LJMP(check_args(L, 2, "set_priority_class")); + htxn = MAY_LJMP(hlua_checktxn(L, 1)); + htxn->s->priority_class = queue_limit_class(MAY_LJMP(luaL_checkinteger(L, 2))); + return 0; +} + +__LJMP static int hlua_txn_set_priority_offset(lua_State *L) +{ + struct hlua_txn *htxn; + + MAY_LJMP(check_args(L, 2, "set_priority_offset")); + htxn = MAY_LJMP(hlua_checktxn(L, 1)); + htxn->s->priority_offset = queue_limit_offset(MAY_LJMP(luaL_checkinteger(L, 2))); + return 0; +} + /* This function is an Lua binding that send pending data * to the client, and close the stream interface. */ @@ -7864,21 +7885,23 @@ void hlua_init(void) lua_newtable(gL.T); /* Register Lua functions. */ - hlua_class_function(gL.T, "set_priv", hlua_set_priv); - hlua_class_function(gL.T, "get_priv", hlua_get_priv); - hlua_class_function(gL.T, "set_var", hlua_set_var); - hlua_class_function(gL.T, "unset_var", hlua_unset_var); - hlua_class_function(gL.T, "get_var", hlua_get_var); - hlua_class_function(gL.T, "done", hlua_txn_done); - hlua_class_function(gL.T, "set_loglevel",hlua_txn_set_loglevel); - hlua_class_function(gL.T, "set_tos", hlua_txn_set_tos); - hlua_class_function(gL.T, "set_mark", hlua_txn_set_mark); - hlua_class_function(gL.T, "deflog", hlua_txn_deflog); - hlua_class_function(gL.T, "log", hlua_txn_log); - hlua_class_function(gL.T, "Debug", hlua_txn_log_debug); - hlua_class_function(gL.T, "Info", hlua_txn_log_info); - hlua_class_function(gL.T, "Warning", hlua_txn_log_warning); - hlua_class_function(gL.T, "Alert", hlua_txn_log_alert); + hlua_class_function(gL.T, "set_priv", hlua_set_priv); + hlua_class_function(gL.T, "get_priv", hlua_get_priv); + hlua_class_function(gL.T, "set_var", hlua_set_var); + hlua_class_function(gL.T, "unset_var", hlua_unset_var); + hlua_class_function(gL.T, "get_var", hlua_get_var); + hlua_class_function(gL.T, "done", hlua_txn_done); + hlua_class_function(gL.T, "set_loglevel", hlua_txn_set_loglevel); + hlua_class_function(gL.T, "set_tos", hlua_txn_set_tos); + hlua_class_function(gL.T, "set_mark", hlua_txn_set_mark); + hlua_class_function(gL.T, "set_priority_class", hlua_txn_set_priority_class); + hlua_class_function(gL.T, "set_priority_offset", hlua_txn_set_priority_offset); + hlua_class_function(gL.T, "deflog", hlua_txn_deflog); + hlua_class_function(gL.T, "log", hlua_txn_log); + hlua_class_function(gL.T, "Debug", hlua_txn_log_debug); + hlua_class_function(gL.T, "Info", hlua_txn_log_info); + hlua_class_function(gL.T, "Warning", hlua_txn_log_warning); + hlua_class_function(gL.T, "Alert", hlua_txn_log_alert); lua_rawset(gL.T, -3); diff --git a/src/queue.c b/src/queue.c index 3f2e0bdcd..a92cea4b6 100644 --- a/src/queue.c +++ b/src/queue.c @@ -16,11 +16,14 @@ #include <common/hathreads.h> #include <eb32tree.h> +#include <proto/proto_http.h> #include <proto/queue.h> +#include <proto/sample.h> #include <proto/server.h> #include <proto/stream.h> #include <proto/stream_interface.h> #include <proto/task.h> +#include <proto/tcp_rules.h> struct pool_head *pool_head_pendconn; @@ -417,6 +420,140 @@ void pendconn_free(struct pendconn *p) pool_free(pool_head_pendconn, p); } +static enum act_return action_set_priority_class(struct act_rule *rule, struct proxy *px, + struct session *sess, struct stream *s, int flags) +{ + struct sample *smp; + + smp = sample_fetch_as_type(px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL, rule->arg.expr, SMP_T_SINT); + if (!smp) + return ACT_RET_CONT; + + s->priority_class = queue_limit_class(smp->data.u.sint); + return ACT_RET_CONT; +} + +static enum act_return action_set_priority_offset(struct act_rule *rule, struct proxy *px, + struct session *sess, struct stream *s, int flags) +{ + struct sample *smp; + + smp = sample_fetch_as_type(px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL, rule->arg.expr, SMP_T_SINT); + if (!smp) + return ACT_RET_CONT; + + s->priority_offset = queue_limit_offset(smp->data.u.sint); + + return ACT_RET_CONT; +} + +static enum act_parse_ret parse_set_priority_class(const char **args, int *arg, struct proxy *px, + struct act_rule *rule, char **err) +{ + unsigned int where = 0; + + rule->arg.expr = sample_parse_expr((char **)args, arg, px->conf.args.file, + px->conf.args.line, err, &px->conf.args); + if (!rule->arg.expr) + return ACT_RET_PRS_ERR; + + if (px->cap & PR_CAP_FE) + where |= SMP_VAL_FE_HRQ_HDR; + if (px->cap & PR_CAP_BE) + where |= SMP_VAL_BE_HRQ_HDR; + + if (!(rule->arg.expr->fetch->val & where)) { + memprintf(err, + "fetch method '%s' extracts information from '%s', none of which is available here", + args[0], sample_src_names(rule->arg.expr->fetch->use)); + free(rule->arg.expr); + return ACT_RET_PRS_ERR; + } + + rule->action = ACT_CUSTOM; + rule->action_ptr = action_set_priority_class; + return ACT_RET_PRS_OK; +} + +static enum act_parse_ret parse_set_priority_offset(const char **args, int *arg, struct proxy *px, + struct act_rule *rule, char **err) +{ + unsigned int where = 0; + + rule->arg.expr = sample_parse_expr((char **)args, arg, px->conf.args.file, + px->conf.args.line, err, &px->conf.args); + if (!rule->arg.expr) + return ACT_RET_PRS_ERR; + + if (px->cap & PR_CAP_FE) + where |= SMP_VAL_FE_HRQ_HDR; + if (px->cap & PR_CAP_BE) + where |= SMP_VAL_BE_HRQ_HDR; + + if (!(rule->arg.expr->fetch->val & where)) { + memprintf(err, + "fetch method '%s' extracts information from '%s', none of which is available here", + args[0], sample_src_names(rule->arg.expr->fetch->use)); + free(rule->arg.expr); + return ACT_RET_PRS_ERR; + } + + rule->action = ACT_CUSTOM; + rule->action_ptr = action_set_priority_offset; + return ACT_RET_PRS_OK; +} + +static struct action_kw_list tcp_cont_kws = {ILH, { + { "set-priority-class", parse_set_priority_class }, + { "set-priority-offset", parse_set_priority_offset }, + { /* END */ } +}}; + +static struct action_kw_list http_req_kws = {ILH, { + { "set-priority-class", parse_set_priority_class }, + { "set-priority-offset", parse_set_priority_offset }, + { /* END */ } +}}; + +static int +smp_fetch_priority_class(const struct arg *args, struct sample *smp, const char *kw, void *private) +{ + if (!smp->strm) + return 0; + + smp->data.type = SMP_T_SINT; + smp->data.u.sint = smp->strm->priority_class; + + return 1; +} + +static int +smp_fetch_priority_offset(const struct arg *args, struct sample *smp, const char *kw, void *private) +{ + if (!smp->strm) + return 0; + + smp->data.type = SMP_T_SINT; + smp->data.u.sint = smp->strm->priority_offset; + + return 1; +} + + +static struct sample_fetch_kw_list smp_kws = {ILH, { + { "prio_class", smp_fetch_priority_class, 0, NULL, SMP_T_SINT, SMP_USE_INTRN, }, + { "prio_offset", smp_fetch_priority_offset, 0, NULL, SMP_T_SINT, SMP_USE_INTRN, }, + { /* END */}, +}}; + +__attribute__((constructor)) +static void __queue_init(void) +{ + tcp_req_cont_keywords_register(&tcp_cont_kws); + http_req_keywords_register(&http_req_kws); + sample_register_fetches(&smp_kws); +} + /* * Local variables: * c-indent-level: 8 diff --git a/src/stream.c b/src/stream.c index f774406dd..6771302bb 100644 --- a/src/stream.c +++ b/src/stream.c @@ -220,6 +220,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin) s->target = sess->listener ? sess->listener->default_target : NULL; s->pend_pos = NULL; + s->priority_class = 0; + s->priority_offset = 0; /* init store persistence */ s->store_count = 0; -- 2.16.3
From 8d9a9aaab2f617cf31de9528dcc08995d72612c6 Mon Sep 17 00:00:00 2001 From: Patrick Hemmer <[email protected]> Date: Fri, 11 May 2018 12:52:31 -0400 Subject: [PATCH 5/6] MEDIUM: queue: adjust position based on priority-class and priority-offset The priority values are used when connections are queued to determine which connections should be served first. The lowest priority class is served first. When multiple requests from the same class are found, the earliest (according to queue_time + offset) is served first. --- src/queue.c | 157 +++++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 119 insertions(+), 38 deletions(-) diff --git a/src/queue.c b/src/queue.c index a92cea4b6..23569abac 100644 --- a/src/queue.c +++ b/src/queue.c @@ -26,6 +26,11 @@ #include <proto/tcp_rules.h> +#define NOW_OFFSET_BOUNDARY() (now_ms - (TIMER_LOOK_BACK >> 12) & 0xfffff) +#define KEY_CLASS(key) (key & 0xfff00000) +#define KEY_OFFSET(key) (key & 0xfffff) +#define KEY_CLASS_OFFSET_BOUNDARY(key) (KEY_CLASS(key) | NOW_OFFSET_BOUNDARY()) + struct pool_head *pool_head_pendconn; /* perform minimal intializations, report 0 in case of error, 1 if OK. */ @@ -86,6 +91,65 @@ static void pendconn_unlink(struct pendconn *p) eb32_delete(&p->node); } +/* Retrieve the next pending connection. + * + * See pendconn_add for an explanation of the key & queue behavior. + * + * This function handles all the cases where due to the timestamp wrapping + * the first node in the tree is not the highest priority. + */ +static struct pendconn *pendconn_next(struct eb_root *pendconns, struct pendconn *curconn) +{ + struct eb32_node *node, *node2 = NULL; + u32 min, max; + + /* min and max are the lower and upper bounds of the priority offset within + * the current class to search for the next node. It is possible that + * (max <= min) if max wraps around. + * min is inclusive. max is exclusive. + */ + if (curconn == NULL) { + min = NOW_OFFSET_BOUNDARY(); + max = min; + node = eb32_lookup_ge(pendconns, min); + } else { + min = curconn->node.key; + max = KEY_CLASS_OFFSET_BOUNDARY(min); + node = eb32_next(&curconn->node); + } + + if (node) { + if (node->key < max || (max <= min && KEY_CLASS(node->key) == KEY_CLASS(min))) + return eb32_entry(node, struct pendconn, node); + if (KEY_CLASS(node->key) != KEY_CLASS(min)) + node2 = node; + if (max > min) + goto class_next; + } + + if (max <= min) + node = eb32_lookup_ge(pendconns, KEY_CLASS(min)); + if (!node) + return NULL; + if (node->key < max && node->key < min) + return eb32_entry(node, struct pendconn, node); + +class_next: + if (node2) { + min = KEY_CLASS_OFFSET_BOUNDARY(node2->key); + if (node2->key >= min) + return eb32_entry(node2, struct pendconn, node); + } else + min = KEY_CLASS_OFFSET_BOUNDARY(min) + 0x100000; + node = eb32_lookup_ge(pendconns, min); + if (node && KEY_CLASS(node->key) == KEY_CLASS(min)) + return eb32_entry(node, struct pendconn, node); + if (node2) + return eb32_entry(node2, struct pendconn, node); + + return NULL; +} + /* Process the next pending connection from either a server or a proxy, and * returns a strictly positive value on success (see below). If no pending * connection is found, 0 is returned. Note that neither <srv> nor <px> may be @@ -109,7 +173,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px) struct pendconn *p = NULL; struct pendconn *pp = NULL; struct server *rsrv; - struct eb32_node *node; + u32 pkey, ppkey; int remote; rsrv = srv->track; @@ -117,42 +181,52 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px) rsrv = srv; if (srv->nbpend) { - for (node = eb32_first(&srv->pendconns); - node; - node = eb32_next(node)) { - p = eb32_entry(node, struct pendconn, node); + for (p = pendconn_next(&srv->pendconns, NULL); + p; + p = pendconn_next(&srv->pendconns, p)) if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock)) - goto ps_found; - } - p = NULL; + break; } - - ps_found: - if (srv_currently_usable(rsrv) && px->nbpend) { - for (node = eb32_first(&px->pendconns); - node; - node = eb32_next(node)) { - pp = eb32_entry(node, struct pendconn, node); - /* If the server pendconn is older than the proxy one, - * we process the server one. */ - if (p && !tv_islt(&pp->strm->logs.tv_request, &p->strm->logs.tv_request)) - goto pendconn_found; - - if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &pp->lock)) { - /* Let's switch from the server pendconn to the - * proxy pendconn. Don't forget to unlock the - * server pendconn, if any. */ - if (p) - HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock); - p = pp; - goto pendconn_found; - } - } + if (px->nbpend) { + for (pp = pendconn_next(&px->pendconns, NULL); + pp; + pp = pendconn_next(&px->pendconns, pp)) + if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &pp->lock)) + break; } - if (!p) + if (!p && !pp) return 0; + if (p && !pp) + goto pendconn_found; + if (pp && !p) { + p = pp; + goto pendconn_found; + } + if (KEY_CLASS(p->node.key) < KEY_CLASS(pp->node.key)) { + HA_SPIN_UNLOCK(PENDCONN_LOCK, &pp->lock); + goto pendconn_found; + } + if (KEY_CLASS(pp->node.key) < KEY_CLASS(p->node.key)) { + HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock); + p = pp; + goto pendconn_found; + } + + pkey = KEY_OFFSET(p->node.key); + ppkey = KEY_OFFSET(pp->node.key); + if (pkey < NOW_OFFSET_BOUNDARY()) + pkey += 0x100000; + if (ppkey < NOW_OFFSET_BOUNDARY()) + ppkey += 0x100000; + if (pkey <= ppkey) { + HA_SPIN_UNLOCK(PENDCONN_LOCK, &pp->lock); + } else { + HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock); + p = pp; + } + pendconn_found: pendconn_unlink(p); p->strm_flags |= SF_ASSIGNED; @@ -201,12 +275,20 @@ void process_srv_queue(struct server *s) THREAD_WANT_SYNC(); } -/* Adds the stream <strm> to the pending connection list of server <strm>->srv +/* Adds the stream <strm> to the pending connection queue of server <strm>->srv * or to the one of <strm>->proxy if srv is NULL. All counters and back pointers * are updated accordingly. Returns NULL if no memory is available, otherwise the * pendconn itself. If the stream was already marked as served, its flag is * cleared. It is illegal to call this function with a non-NULL strm->srv_conn. * + * The queue is sorted by the composition of the priority_class, and the current + * timestamp offset by strm->priority_offset. The timestamp is in milliseconds + * and truncated to 20 bits, so will wrap every 17m28s575ms. + * The offset can be positive or negative, and an offset of 0 puts it in the + * middle of this range (~ 8 min). Note that this also means if the adjusted + * timestamp wraps around, the request will be misinterpreted as being of + * the higest priority for that priority class. + * * This function must be called by the stream itself, so in the context of * process_stream. */ @@ -223,7 +305,8 @@ struct pendconn *pendconn_add(struct stream *strm) srv = objt_server(strm->target); px = strm->be; - p->node.key = 0; + p->node.key = ((u32)(strm->priority_class + 0x7ff) << 20) | + ((u32)(now_ms + strm->priority_offset) & 0xfffff); p->srv = NULL; p->px = px; p->strm = strm; @@ -303,7 +386,6 @@ int pendconn_redistribute(struct server *s) int pendconn_grab_from_px(struct server *s) { struct pendconn *p; - struct eb32_node *node; int maxconn, xferred = 0; int remote = 0; @@ -312,10 +394,9 @@ int pendconn_grab_from_px(struct server *s) HA_SPIN_LOCK(PROXY_LOCK, &s->proxy->lock); maxconn = srv_dynamic_maxconn(s); - for (node = eb32_first(&s->proxy->pendconns); - node; - node = eb32_next(node)) { - p = eb32_entry(&node, struct pendconn, node); + for (p = pendconn_next(&s->proxy->pendconns, NULL); + p; + p = pendconn_next(&s->proxy->pendconns, p)) { if (s->maxconn && s->served + xferred >= maxconn) break; -- 2.16.3
From 7393339d8cd8028ed7881822cbbe0881a4564e27 Mon Sep 17 00:00:00 2001 From: Patrick Hemmer <[email protected]> Date: Fri, 11 May 2018 12:52:31 -0400 Subject: [PATCH 6/6] WIP/config: warning on too high timeout queue The purpose is to emit a warning if the user configures a larger queue timeout than the highest we support in the tree. [WT: I don't think it's a good idea as some people do use very large timeouts in TCP. Maybe our values are too low in the end and we should refine them or change the tree size] --- src/cfgparse.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/cfgparse.c b/src/cfgparse.c index f3c2be499..3d8ba1866 100644 --- a/src/cfgparse.c +++ b/src/cfgparse.c @@ -8244,6 +8244,21 @@ out_uri_auth_compat: } } + if ((curproxy->mode == PR_MODE_TCP || curproxy->mode == PR_MODE_HTTP) && + (curproxy->cap & PR_CAP_BE) && (curproxy->srv) && + (!curproxy->timeout.queue || curproxy->timeout.queue > (((TIMER_LOOK_BACK >> 12) & 0xfffff) / 2))) { + // Note that this warning isn't comprehensive. + // if the user specifies set-priority-offset > 'timeout queue`, wrapping + // may occur and get de-queued out of order. But logging this every + // single time might be too noisy. + ha_warning("config : excessive timeout queue for backend '%s'.\n" + " | The 'timeout queue' setting is either missing, or exceeds the maximum\n" + " | priority offset. If a request sits in the queue for longer than the maximum\n" + " | priority offset, it may get de-queued out of order.\n", + curproxy->id); + err_code |= ERR_WARN; + } + if ((curproxy->options2 & PR_O2_CHK_ANY) == PR_O2_SSL3_CHK) { curproxy->check_len = sizeof(sslv3_client_hello_pkt) - 1; curproxy->check_req = malloc(curproxy->check_len); -- 2.16.3

