Le 09/11/2020 à 13:10, Maciej Zdeb a écrit :
I've played little bit with the patch and it led me to backend.c file and connect_server() function

int connect_server(struct stream *s)
{
[...]
if (!conn_xprt_ready(srv_conn) && !srv_conn->mux) {
                 /* set the correct protocol on the output stream interface */
                 if (srv)
                        conn_prepare(srv_conn, protocol_by_family(srv_conn->dst->ss_family), srv->xprt);
                 else if (obj_type(s->target) == OBJ_TYPE_PROXY) {
                         /* proxies exclusively run on raw_sock right now */
                        conn_prepare(srv_conn, protocol_by_family(srv_conn->dst->ss_family), xprt_get(XPRT_RAW));
                         if (!(srv_conn->ctrl)) {
                                 conn_free(srv_conn);
                                 return SF_ERR_INTERNAL;
                         }
                 }
                 else {
                         conn_free(srv_conn);
                         return SF_ERR_INTERNAL;  /* how did we get there ? */
                 }
// THIS ONE IS OK
TEST_STRM(s);
//////////////////////////////
                 srv_cs = si_alloc_cs(&s->si[1], srv_conn);
// FAIL
TEST_STRM(s);
//////////////////////////////
                 if (!srv_cs) {
                         conn_free(srv_conn);
                         return SF_ERR_RESOURCE;
                 }

Hi,

In fact, this crash occurs because of the Willy's patch. It was not design to handle non-h2 connections. Here the crash happens on a TCP connection, used by a SPOE applet for instance.

I updated its patch. First, I added some calls to TEST_STRM() in the SPOE code, to be sure. I also explicitly set the stream task to NULL in stream_free() to catch late wakeups in the SPOE. Finally, I modified testcorrupt(). I hope this one is correct. But if I missed something, you may only keep the last ABORT_NOW() in testcorrupt() and replace others by a return statement, just like in the Willy's patch.

--
Christopher Faulet
>From ba99e0eedf1730970f1d0b5bb67f24ef79117207 Mon Sep 17 00:00:00 2001
From: Christopher Faulet <cfau...@haproxy.com>
Date: Mon, 9 Nov 2020 14:37:57 +0100
Subject: [PATCH] EXP: try to spot where h2s->subs changes

---
 include/haproxy/bug.h |  7 ++++++
 src/flt_spoe.c        |  8 +++++++
 src/mux_h2.c          | 25 ++++++++++++++++++++
 src/stream.c          | 55 +++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 95 insertions(+)

diff --git a/include/haproxy/bug.h b/include/haproxy/bug.h
index a008126f5c..c650f60b8c 100644
--- a/include/haproxy/bug.h
+++ b/include/haproxy/bug.h
@@ -166,6 +166,13 @@ struct mem_stats {
 })
 #endif /* DEBUG_MEM_STATS*/
 
+
+#define TEST_CS(ptr) do { extern void testcorrupt(const void *); testcorrupt(ptr); } while (0)
+
+#define TEST_SI(si) do { if ((si)) TEST_CS((si)->end); } while (0)
+
+#define TEST_STRM(s) do { if ((s)) { TEST_SI(&(s)->si[0]); TEST_SI(&(s)->si[1]);} } while (0)
+
 #endif /* _HAPROXY_BUG_H */
 
 /*
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index cf5fc7a4c0..6899b16e66 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -1255,6 +1255,7 @@ spoe_release_appctx(struct appctx *appctx)
 		spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
 		ctx->state = SPOE_CTX_ST_ERROR;
 		ctx->status_code = (spoe_appctx->status_code + 0x100);
+		TEST_STRM(ctx->strm);
 		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 	}
 
@@ -1265,6 +1266,7 @@ spoe_release_appctx(struct appctx *appctx)
 		ctx->spoe_appctx = NULL;
 		ctx->state = SPOE_CTX_ST_ERROR;
 		ctx->status_code = (spoe_appctx->status_code + 0x100);
+		TEST_STRM(ctx->strm);
 		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 	}
 
@@ -1279,6 +1281,7 @@ spoe_release_appctx(struct appctx *appctx)
 		spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
 		ctx->state = SPOE_CTX_ST_ERROR;
 		ctx->status_code = (spoe_appctx->status_code + 0x100);
+		TEST_STRM(ctx->strm);
 		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 	}
 	list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) {
@@ -1288,6 +1291,7 @@ spoe_release_appctx(struct appctx *appctx)
 		spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
 		ctx->state = SPOE_CTX_ST_ERROR;
 		ctx->status_code = (spoe_appctx->status_code + 0x100);
+		TEST_STRM(ctx->strm);
 		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 	}
 
@@ -1491,6 +1495,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
 			ctx->spoe_appctx = NULL;
 			ctx->state = SPOE_CTX_ST_ERROR;
 			ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
+			TEST_STRM(ctx->strm);
 			task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 			*skip = 1;
 			break;
@@ -1524,6 +1529,7 @@ spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
 	SPOE_APPCTX(appctx)->frag_ctx.cursid = ctx->stream_id;
 	SPOE_APPCTX(appctx)->frag_ctx.curfid = ctx->frame_id;
 	ctx->state = SPOE_CTX_ST_ENCODING_MSGS;
+	TEST_STRM(ctx->strm);
 	task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 	goto end;
 
@@ -1613,6 +1619,7 @@ spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip)
 			}
 			else if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
 				appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+			TEST_STRM(ctx->strm);
 			task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 			break;
 	}
@@ -2830,6 +2837,7 @@ spoe_release_buffer(struct buffer *buf, struct buffer_wait *buffer_wait)
 static int
 spoe_wakeup_context(struct spoe_context *ctx)
 {
+	TEST_STRM(ctx->strm);
 	task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 	return 1;
 }
diff --git a/src/mux_h2.c b/src/mux_h2.c
index 5830fdbd90..a0d31911a3 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -6251,3 +6251,28 @@ static int init_h2()
 }
 
 REGISTER_POST_CHECK(init_h2);
+
+void testcorrupt(void *ptr)
+{
+	const struct conn_stream *cs = objt_cs(ptr);
+	const struct h2s *h2s;
+
+	if (!cs || cs->conn->mux != &h2_ops)
+		return;
+
+	/* mux installed, thus the conn-stream ctx must be a h2s */
+	h2s = cs->ctx;
+	if (!h2s)
+		ABORT_NOW();
+
+	/* The h2s must point on this conn-stream */
+	if (h2s->cs != cs)
+		ABORT_NOW();
+
+	/* The h2s must have a h2c, the same than the cs one */
+	if (!h2s->h2c || !h2s->h2c->conn || h2s->h2c->conn != cs->conn)
+		ABORT_NOW();
+
+	if ((long)h2s->subs & 1)
+		ABORT_NOW();
+}
diff --git a/src/stream.c b/src/stream.c
index 43f1432979..931c8abd95 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -531,6 +531,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
 	 * the caller must handle the task_wakeup
 	 */
 	DBG_TRACE_LEAVE(STRM_EV_STRM_NEW, s);
+	TEST_STRM(s);
 	return s;
 
 	/* Error unrolling */
@@ -542,6 +543,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
 out_fail_alloc_si1:
 	tasklet_free(s->si[0].wait_event.tasklet);
  out_fail_alloc:
+	TEST_STRM(s);
 	pool_free(pool_head_stream, s);
 	DBG_TRACE_DEVEL("leaving on error", STRM_EV_STRM_NEW|STRM_EV_STRM_ERR);
 	return NULL;
@@ -565,6 +567,7 @@ static void stream_free(struct stream *s)
 	 * that walking over a task list never exhibits a dying stream.
 	 */
 	s->task->context = NULL;
+	s->task = NULL;
 	__ha_barrier_store();
 
 	pendconn_free(s);
@@ -1497,6 +1500,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 	struct stream_interface *si_f, *si_b;
 	unsigned int rate;
 
+	TEST_STRM(s);
+
 	DBG_TRACE_ENTER(STRM_EV_STRM_PROC, s);
 
 	activity[tid].stream_calls++;
@@ -1594,6 +1599,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 	}
 
  resync_stream_interface:
+	TEST_STRM(s);
+
 	/* below we may emit error messages so we have to ensure that we have
 	 * our buffers properly allocated.
 	 */
@@ -1658,6 +1665,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 		/* note: maybe we should process connection errors here ? */
 	}
 
+	TEST_STRM(s);
+
 	if (si_state_in(si_b->state, SI_SB_CON|SI_SB_RDY)) {
 		/* we were trying to establish a connection on the server side,
 		 * maybe it succeeded, maybe it failed, maybe we timed out, ...
@@ -1677,6 +1686,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 		 * SI_ST_ASS/SI_ST_TAR/SI_ST_REQ for retryable errors.
 		 */
 	}
+	TEST_STRM(s);
+
 
 	rq_prod_last = si_f->state;
 	rq_cons_last = si_b->state;
@@ -1707,12 +1718,16 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 		}
 	}
 
+	TEST_STRM(s);
+
 	/*
 	 * Note: of the transient states (REQ, CER, DIS), only REQ may remain
 	 * at this point.
 	 */
 
  resync_request:
+	TEST_STRM(s);
+
 	/* Analyse request */
 	if (((req->flags & ~rqf_last) & CF_MASK_ANALYSER) ||
 	    ((req->flags ^ rqf_last) & CF_MASK_STATIC) ||
@@ -1811,8 +1826,12 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 	 * analysers (eg: HTTP keep-alive).
 	 */
 	req_ana_back = req->analysers;
+	TEST_STRM(s);
+
 
  resync_response:
+	TEST_STRM(s);
+
 	/* Analyse response */
 
 	if (((res->flags & ~rpf_last) & CF_MASK_ANALYSER) ||
@@ -1875,6 +1894,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 			goto resync_response;
 	}
 
+	TEST_STRM(s);
+
 	/* maybe someone has added some request analysers, so we must check and loop */
 	if (req->analysers & ~req_ana_back)
 		goto resync_request;
@@ -1886,6 +1907,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 	 * both buffers.
 	 */
 
+	TEST_STRM(s);
+
 
 	/*
 	 * Now we propagate unhandled errors to the stream. Normally
@@ -1990,6 +2013,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 			sess_set_term_flags(s);
 		}
 	}
+	TEST_STRM(s);
+
 
 	/*
 	 * Here we take care of forwarding unhandled data. This also includes
@@ -2034,6 +2059,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 		}
 	}
 
+	TEST_STRM(s);
+
 	/* check if it is wise to enable kernel splicing to forward request data */
 	if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
 	    req->to_forward &&
@@ -2081,6 +2108,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 	}
 
 
+	TEST_STRM(s);
+
 	/* we may have a pending connection request, or a connection waiting
 	 * for completion.
 	 */
@@ -2120,6 +2149,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 
 	/* Let's see if we can send the pending request now */
 	si_sync_send(si_b);
+	TEST_STRM(s);
+
 
 	/*
 	 * Now forward all shutdown requests between both sides of the request buffer
@@ -2154,6 +2185,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 			si_f->flags |= SI_FL_NOLINGER;
 		si_shutr(si_f);
 	}
+	TEST_STRM(s);
+
 
 	/* Benchmarks have shown that it's optimal to do a full resync now */
 	if (si_f->state == SI_ST_DIS ||
@@ -2168,6 +2201,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 
 	/* perform output updates to the response buffer */
 
+	TEST_STRM(s);
+
 	/* If noone is interested in analysing data, it's time to forward
 	 * everything. We configure the buffer to forward indefinitely.
 	 * Note that we're checking CF_SHUTR_NOW as an indication of a possible
@@ -2227,6 +2262,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 		}
 	}
 
+	TEST_STRM(s);
+
 	/* check if it is wise to enable kernel splicing to forward response data */
 	if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
 	    res->to_forward &&
@@ -2242,12 +2279,16 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 		res->flags |= CF_KERN_SPLICING;
 	}
 
+	TEST_STRM(s);
+
 	/* reflect what the L7 analysers have seen last */
 	rpf_last = res->flags;
 
 	/* Let's see if we can send the pending response now */
 	si_sync_send(si_f);
 
+	TEST_STRM(s);
+
 	/*
 	 * Now forward all shutdown requests between both sides of the buffer
 	 */
@@ -2280,6 +2321,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 		si_shutr(si_b);
 	}
 
+	TEST_STRM(s);
+
 	if (si_f->state == SI_ST_DIS ||
 	    si_state_in(si_b->state, SI_SB_RDY|SI_SB_DIS) ||
 	    (si_f->flags & SI_FL_ERR && si_f->state != SI_ST_CLO) ||
@@ -2295,6 +2338,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 	if (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER)
 		goto resync_request;
 
+	TEST_STRM(s);
+
 	/* we're interested in getting wakeups again */
 	si_f->flags &= ~SI_FL_DONT_WAKE;
 	si_b->flags &= ~SI_FL_DONT_WAKE;
@@ -2326,6 +2371,8 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 		}
 	}
 
+	TEST_STRM(s);
+
 	if (likely((si_f->state != SI_ST_CLO) || !si_state_in(si_b->state, SI_SB_INI|SI_SB_CLO))) {
 		if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED) && !(s->flags & SF_IGNORE))
 			stream_process_counters(s);
@@ -2349,7 +2396,10 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 		/* Reset pending events now */
 		s->pending_events = 0;
 
+
 	update_exp_and_leave:
+	TEST_STRM(s);
+
 		/* Note: please ensure that if you branch here you disable SI_FL_DONT_WAKE */
 		t->expire = tick_first((tick_is_expired(t->expire, now_ms) ? 0 : t->expire),
 				       tick_first(tick_first(req->rex, req->wex),
@@ -2373,6 +2423,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 
 		s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
 		stream_release_buffers(s);
+	TEST_STRM(s);
 
 		DBG_TRACE_DEVEL("queuing", STRM_EV_STRM_PROC, s);
 		return t; /* nothing more to do */
@@ -2392,10 +2443,12 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 		DISGUISE(write(1, trash.area, trash.data));
 	}
 
+	TEST_STRM(s);
 	s->logs.t_close = tv_ms_elapsed(&s->logs.tv_accept, &now);
 	if (!(s->flags & SF_IGNORE))
 		stream_process_counters(s);
 
+	TEST_STRM(s);
 	if (s->txn && s->txn->status) {
 		int n;
 
@@ -2413,6 +2466,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 		}
 	}
 
+	TEST_STRM(s);
 	/* let's do a final log if we need it */
 	if (!LIST_ISEMPTY(&sess->fe->logformat) && s->logs.logwait &&
 	    !(s->flags & SF_MONITOR) &&
@@ -2426,6 +2480,7 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
 	stream_update_time_stats(s);
 
 	/* the task MUST not be in the run queue anymore */
+	TEST_STRM(s);
 	stream_free(s);
 	task_destroy(t);
 	return NULL;
-- 
2.26.2

Reply via email to