The ring forwarder applet (used by "ring" sections, and by "log" backends, to
forward buffered events to a remote server) closed its session immediately as
soon as a soft-stop was requested:

        /* if stopping was requested, close immediately */
        if (unlikely(stopping))
                goto soft_close;

This was meant to let the worker exit quickly, since the forwarder is a job
the soft-stop waits for. But it does not only abandon the messages still
present in the ring at that instant: it also stops forwarding the messages
that in-flight streams keep producing for the rest of the soft-stop window.
As a consequence, on every reload a leaving worker loses all the log/event
lines generated between the moment stopping is requested and the moment its
data plane is fully drained.

This is easy to observe with a typical access-log pipeline
(frontend -> ring -> log-forward/log backend -> remote collector). At 3000
req/s with a reload every 10s, about 3400 access-log lines out of ~120000
never reach the collector per run; with no reload during the same load, none
are lost. Instrumentation shows the lost lines correspond to requests that
complete a few hundred milliseconds *after* the reload, i.e. while the old
worker is still draining - exactly the window during which the forwarder had
already closed.

Instead of closing immediately, keep the forwarder attached and flushing
during the soft-stop, and only close once the data plane is drained, i.e. once
there is no stoppable active connection left and hence no stream can produce a
new message. This is detected with "actconn <= unstoppable_jobs": actconn
counts the active front connections (it is not inflated by the forwarder
sessions nor by the listeners), and unstoppable_jobs counts the connections
that do not hold the soft-stop back (the master CLI connection). When the ring
is empty and that condition holds, the forwarder closes. It is woken again on
new messages, and also re-checks shortly after (a closing stream does not wake
it by itself). This follows HAProxy's normal soft-stop drain: the forwarder
becomes the last job to leave and never extends the worker's lifetime.

If the ring still holds messages that cannot be pushed (output full, e.g. the
downstream is wedged or unreachable), the wait is bounded by SINK_FWD_WEDGE_MS
so a stuck downstream cannot hold the process up to "hard-stop-after". If the
downstream connection was never established there is nothing to flush to, so
we close right away as before.

With this change the same test loses 0 line, and the old worker exits about
210 ms after the reload (essentially the baseline soft-stop time, i.e. no
added latency), both with a healthy downstream and with a down or wedged one.

This should be backported to stable branches that still close the forwarder
immediately on soft-stop, at least as far back as 3.2. Note that older
branches use the previous applet API in this function (se_fl_test() on the
sedesc for SE_FL_APPLET_NEED_CONN, and se_fl_set() in soft_close()), so the
backport needs a small adaptation of the not-established test and the close
path; the rest is identical.
---
v2: instead of a fixed quiet-time window (v1), detect the data-plane drain
    via "actconn <= unstoppable_jobs", which removes the extra shutdown
    latency that v1 added on every reload; the old worker now exits at the
    normal soft-stop time again. No change in the access-log loss result
    (still 0 on the same test).

 include/haproxy/sink-t.h |  1 +
 src/sink.c               | 59 ++++++++++++++++++++++++++++++++++++++--
 2 files changed, 58 insertions(+), 2 deletions(-)

diff --git a/include/haproxy/sink-t.h b/include/haproxy/sink-t.h
index a9db336..955263c 100644
--- a/include/haproxy/sink-t.h
+++ b/include/haproxy/sink-t.h
@@ -41,6 +41,7 @@ struct sink_forward_target {
        struct server *srv;    // used server
        struct appctx *appctx; // appctx of current session
        uint last_conn;        // copy of now_ms for last session establishment 
attempt
+       int flush_exp;         // soft-stop: close when no message forwarded by 
this tick; TICK_ETERNITY/0 = unset
        size_t ofs;            // ring buffer reader offset
        size_t e_processed;    // processed events
        struct sink *sink;     // the associated sink
diff --git a/src/sink.c b/src/sink.c
index ec0eb9d..8274ae0 100644
--- a/src/sink.c
+++ b/src/sink.c
@@ -422,6 +422,14 @@ void sink_setup_proxy(struct proxy *px)
        sink_proxies_list = px;
 }
 
+/* While stopping but the data plane is not drained yet, re-check that often
+ * (a stream closing does not wake the forwarder by itself). */
+#define SINK_FWD_DRAIN_POLL_MS 20
+/* If the downstream is wedged (we still hold messages but make no progress),
+ * give up after this long so a stuck downstream cannot hold the process up to
+ * "hard-stop-after". */
+#define SINK_FWD_WEDGE_MS      1000
+
 static void _sink_forward_io_handler(struct appctx *appctx,
                                      ssize_t (*msg_handler)(void *ctx, struct 
ist v1, struct ist v2, size_t ofs, size_t len, char delim))
 {
@@ -435,8 +443,16 @@ static void _sink_forward_io_handler(struct appctx *appctx,
        if (unlikely(applet_fl_test(appctx, APPCTX_FL_EOS|APPCTX_FL_ERROR)))
                goto out;
 
-       /* if stopping was requested, close immediately */
-       if (unlikely(stopping))
+       /* If stopping was requested, we used to close immediately. That dropped
+        * not only the messages still buffered in the ring, but also every
+        * message that in-flight streams keep producing during the soft-stop
+        * window (e.g. access logs lost on every reload). Instead we keep the
+        * forwarder flushing and decide whether to close after the dispatch
+        * below, based on whether the data plane is drained. If the downstream
+        * connection is not established there is nothing to flush to, so we
+        * close right away.
+        */
+       if (unlikely(stopping) && se_fl_test(appctx->sedesc, 
SE_FL_APPLET_NEED_CONN))
                goto soft_close;
 
        /* if the connection is not established, inform the stream that we want
@@ -481,6 +497,20 @@ static void _sink_forward_io_handler(struct appctx *appctx,
        }
 
        if (ret) {
+               /* the ring is empty: everything available has been forwarded */
+               if (unlikely(stopping) &&
+                   _HA_ATOMIC_LOAD(&actconn) <= 
_HA_ATOMIC_LOAD(&unstoppable_jobs)) {
+                       /* Soft-stop and the data plane is drained: there is no
+                        * stoppable active connection left (actconn is down to 
the
+                        * non-stoppable ones such as the master CLI 
connection), so
+                        * no stream can produce a new message anymore. Nothing 
left
+                        * to flush, close now. This follows HAProxy's normal
+                        * soft-stop drain, so the forwarder is the last thing 
to go
+                        * and never extends the worker's lifetime.
+                        */
+                       HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+                       goto soft_close;
+               }
                /* let's be woken up once new data arrive */
                MT_LIST_APPEND(&ring->waiters, &appctx->wait_entry);
                ofs = ring_tail(ring);
@@ -492,6 +522,31 @@ static void _sink_forward_io_handler(struct appctx *appctx,
                        applet_have_more_data(appctx);
                } else
                        applet_have_no_more_data(appctx);
+               /* Still draining: the data plane may keep producing messages 
and
+                * a closing stream won't wake us, so re-check the drain state
+                * shortly. No wedge timeout here since the ring is empty (we 
are
+                * only waiting, not stuck on a busy downstream).
+                */
+               if (unlikely(stopping)) {
+                       sft->flush_exp = TICK_ETERNITY;
+                       task_schedule(appctx->t, tick_add(now_ms, 
SINK_FWD_DRAIN_POLL_MS));
+               }
+       }
+       else if (unlikely(stopping)) {
+               /* The ring is not empty but the output is full: we still hold
+                * messages we cannot push right now. We are woken again once 
the
+                * buffer drains, but bound the wait so a wedged or unreachable
+                * downstream cannot hold the process up to "hard-stop-after":
+                * give up if no message gets forwarded for SINK_FWD_WEDGE_MS
+                * (the deadline is pushed back on every forwarded message).
+                */
+               if (processed || !tick_isset(sft->flush_exp))
+                       sft->flush_exp = tick_add(now_ms, SINK_FWD_WEDGE_MS);
+               if (tick_is_expired(sft->flush_exp, now_ms)) {
+                       HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+                       goto soft_close;
+               }
+               task_schedule(appctx->t, sft->flush_exp);
        }
        HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
 
-- 
2.47.3



Reply via email to