Everything is in the attached patch file.

Please note that this patch has already been reviewed by Emeric Brun who is the peers protocol maintener.


Regards.

Fred.
>From d1e7b23187db580fe2445e80ff1abefecfcf2241 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20L=C3=A9caille?= <flecai...@haproxy.com>
Date: Wed, 12 Oct 2016 17:30:30 +0200
Subject: [PATCH] BUG/MINOR: peers: Fix a peer stick-tables synchronization
 issue.
X-Bogosity: Ham, tests=bogofilter, spamicity=0.000000, version=1.2.4

During the stick-table teaching process which occurs at reloading/restart time,
expiration dates of stick-tables entries were not synchronized between peers.

This patch adds two new stick-table messages to provide such a synchronization feature.

As these new messages are not supported by older haproxy peers protocol versions,
this patch increments peers protol version, from 2.0 to 2.1, to help in detecting/supporting
such older peers protocol implementations so that new versions might still be able
to transparently communicate with a newer one.
---
 include/proto/stick_table.h |   4 ++
 src/peers.c                 | 136 +++++++++++++++++++++++++++++++++++++-------
 src/stick_table.c           |  30 +++++++++-
 3 files changed, 149 insertions(+), 21 deletions(-)

diff --git a/include/proto/stick_table.h b/include/proto/stick_table.h
index 8c97f73..941e2ff 100644
--- a/include/proto/stick_table.h
+++ b/include/proto/stick_table.h
@@ -42,6 +42,10 @@ int stktable_init(struct stktable *t);
 int stktable_parse_type(char **args, int *idx, unsigned long *type, size_t *key_size);
 struct stksess *stktable_get_entry(struct stktable *table, struct stktable_key *key);
 struct stksess *stktable_store(struct stktable *t, struct stksess *ts, int local);
+struct stksess *stktable_store_with_exp(struct stktable *t, struct stksess *ts,
+                                        int local, int expire);
+struct stksess *stktable_touch_with_exp(struct stktable *t, struct stksess *ts,
+                                        int local, int expire);
 struct stksess *stktable_touch(struct stktable *t, struct stksess *ts, int local);
 struct stksess *stktable_lookup(struct stktable *t, struct stksess *ts);
 struct stksess *stktable_lookup_key(struct stktable *t, struct stktable_key *key);
diff --git a/src/peers.c b/src/peers.c
index 6f88c19..d11792f 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -23,6 +23,7 @@
 #include <common/compat.h>
 #include <common/config.h>
 #include <common/time.h>
+#include <common/standard.h>
 
 #include <types/global.h>
 #include <types/listener.h>
@@ -80,6 +81,7 @@
 #define	PEER_F_TEACH_COMPLETE		0x00000010 /* All that we know already taught to current peer, used only for a local peer */
 #define	PEER_F_LEARN_ASSIGN		0x00000100 /* Current peer was assigned for a lesson */
 #define	PEER_F_LEARN_NOTUP2DATE		0x00000200 /* Learn from peer finished but peer is not up to date */
+#define	PEER_F_DWNGRD		        0x80000000 /* When this flag is enabled, we must downgrade the supported version announced during peer sessions. */
 
 #define	PEER_TEACH_RESET		~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
 #define	PEER_LEARN_RESET		~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
@@ -124,6 +126,8 @@ enum {
 	PEER_MSG_STKT_DEFINE,
 	PEER_MSG_STKT_SWITCH,
 	PEER_MSG_STKT_ACK,
+	PEER_MSG_STKT_UPDATE_TIMED,
+	PEER_MSG_STKT_INCUPDATE_TIMED,
 };
 
 /**********************************/
@@ -163,6 +167,9 @@ enum {
 #define	PEER_SESS_SC_ERRPEER		504 /* unknown peer */
 
 #define PEER_SESSION_PROTO_NAME         "HAProxyS"
+#define PEER_MAJOR_VER        2
+#define PEER_MINOR_VER        1
+#define PEER_DWNGRD_MINOR_VER 0
 
 struct peers *peers = NULL;
 static void peer_session_forceshutdown(struct stream * stream);
@@ -230,6 +237,26 @@ uint64_t intdecode(char **str, char *end) {
 	return i;
 }
 
+/* Set the stick-table UPDATE message type byte at <msg_type> address,
+ * depending on <use_identifier> and <use_timed> boolean parameters.
+ * Always successful.
+ */
+static inline void peer_set_update_msg_type(char *msg_type, int use_identifier, int use_timed)
+{
+	if (use_timed) {
+		if (use_identifier)
+			*msg_type = PEER_MSG_STKT_UPDATE_TIMED;
+		else
+			*msg_type = PEER_MSG_STKT_INCUPDATE_TIMED;
+	}
+	else {
+		if (use_identifier)
+			*msg_type = PEER_MSG_STKT_UPDATE;
+		else
+			*msg_type = PEER_MSG_STKT_INCUPDATE;
+	}
+
+}
 /*
  * This prepare the data update message on the stick session <ts>, <st> is the considered
  * stick table.
@@ -237,7 +264,7 @@ uint64_t intdecode(char **str, char *end) {
  * If function returns 0, the caller should consider we were unable to encode this message (TODO:
  * check size)
  */
-static int peer_prepare_updatemsg(struct stksess *ts, struct shared_table *st, char *msg, size_t size, int use_identifier)
+static int peer_prepare_updatemsg(struct stksess *ts, struct shared_table *st, char *msg, size_t size, int use_identifier, int use_timed)
 {
 	uint32_t netinteger;
 	unsigned short datalen;
@@ -261,6 +288,12 @@ static int peer_prepare_updatemsg(struct stksess *ts, struct shared_table *st, c
 		cursor += sizeof(netinteger);
 	}
 
+	if (use_timed) {
+		netinteger = htonl(tick_remain(now_ms, ts->expire));
+		memcpy(cursor, &netinteger, sizeof(netinteger));
+		cursor += sizeof(netinteger);
+	}
+
 	/* encode the key */
 	if (st->table->type == SMP_T_STR) {
 		int stlen = strlen((char *)ts->key.key);
@@ -324,11 +357,7 @@ static int peer_prepare_updatemsg(struct stksess *ts, struct shared_table *st, c
 
 	/*  prepare message header */
 	msg[0] = PEER_MSG_CLASS_STICKTABLE;
-	if (use_identifier)
-		msg[1] = PEER_MSG_STKT_UPDATE;
-	else
-		msg[1] = PEER_MSG_STKT_INCUPDATE;
-
+	peer_set_update_msg_type(&msg[1], use_identifier, use_timed);
 	cursor = &msg[2];
 	intencode(datalen, &cursor);
 
@@ -477,6 +506,34 @@ static void peer_session_release(struct appctx *appctx)
 	}
 }
 
+/* Retrieve the major and minor versions of peers protocol
+ * announced by a remote peer. <str> is a null-terminated
+ * string with the following format: "<maj_ver>.<min_ver>".
+ */
+static int peer_get_version(const char *str,
+                            unsigned int *maj_ver, unsigned int *min_ver)
+{
+	unsigned int majv, minv;
+	const char *pos, *saved;
+	const char *end;
+
+	saved = pos = str;
+	end = str + strlen(str);
+
+	majv = read_uint(&pos, end);
+	if (saved == pos || *pos++ != '.')
+		return -1;
+
+	saved = pos;
+	minv = read_uint(&pos, end);
+	if (saved == pos || pos != end)
+		return -1;
+
+	*maj_ver = majv;
+	*min_ver = minv;
+
+	return 0;
+}
 
 /*
  * IO Handler to handle message exchance with a peer
@@ -488,9 +545,12 @@ static void peer_io_handler(struct appctx *appctx)
 	struct peers *curpeers = strm_fe(s)->parent;
 	int reql = 0;
 	int repl = 0;
+	size_t proto_len = strlen(PEER_SESSION_PROTO_NAME);
+	unsigned int maj_ver, min_ver;
 
 	while (1) {
 switchstate:
+		maj_ver = min_ver = (unsigned int)-1;
 		switch(appctx->st0) {
 			case PEER_SESS_ST_ACCEPT:
 				appctx->ctx.peers.ptr = NULL;
@@ -515,13 +575,16 @@ switchstate:
 
 				bo_skip(si_oc(si), reql);
 
-				/* test version */
-				if (strcmp(PEER_SESSION_PROTO_NAME " 2.0", trash.str) != 0) {
+				/* test protocol */
+				if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.str, proto_len + 1) != 0) {
+					appctx->st0 = PEER_SESS_ST_EXIT;
+					appctx->st1 = PEER_SESS_SC_ERRPROTO;
+					goto switchstate;
+				}
+				if (peer_get_version(trash.str + proto_len + 1, &maj_ver, &min_ver) == -1 ||
+				    maj_ver != PEER_MAJOR_VER || min_ver > PEER_MINOR_VER) {
 					appctx->st0 = PEER_SESS_ST_EXIT;
 					appctx->st1 = PEER_SESS_SC_ERRVERSION;
-					/* test protocol */
-					if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.str, strlen(PEER_SESSION_PROTO_NAME)+1) != 0)
-						appctx->st1 = PEER_SESS_SC_ERRPROTO;
 					goto switchstate;
 				}
 
@@ -608,6 +671,14 @@ switchstate:
 					}
 					peer_session_forceshutdown(curpeer->stream);
 				}
+				if (maj_ver != (unsigned int)-1 && min_ver != (unsigned int)-1) {
+					if (min_ver == PEER_DWNGRD_MINOR_VER) {
+						curpeer->flags |= PEER_F_DWNGRD;
+					}
+					else {
+						curpeer->flags &= ~PEER_F_DWNGRD;
+					}
+				}
 				curpeer->stream = s;
 				curpeer->appctx = appctx;
 				appctx->ctx.peers.ptr = curpeer;
@@ -674,7 +745,9 @@ switchstate:
 
 				/* Send headers */
 				repl = snprintf(trash.str, trash.size,
-				                PEER_SESSION_PROTO_NAME " 2.0\n%s\n%s %d %d\n",
+				                PEER_SESSION_PROTO_NAME " %u.%u\n%s\n%s %d %d\n",
+				                PEER_MAJOR_VER,
+				                (curpeer->flags & PEER_F_DWNGRD) ? PEER_DWNGRD_MINOR_VER : PEER_MINOR_VER,
 				                curpeer->id,
 				                localpeer,
 				                (int)getpid(),
@@ -762,6 +835,8 @@ switchstate:
 
 				}
 				else {
+					if (curpeer->statuscode == PEER_SESS_SC_ERRVERSION)
+						curpeer->flags |= PEER_F_DWNGRD;
 					/* Status code is not success, abort */
 					appctx->st0 = PEER_SESS_ST_END;
 					goto switchstate;
@@ -1007,9 +1082,12 @@ switchstate:
 
 					}
 					else if (msg_head[1] == PEER_MSG_STKT_UPDATE
-						 || msg_head[1] == PEER_MSG_STKT_INCUPDATE) {
+						 || msg_head[1] == PEER_MSG_STKT_INCUPDATE
+						 || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED
+						 || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
 						struct shared_table *st = curpeer->remote_table;
 						uint32_t update;
+						int expire;
 						unsigned int data_type;
 						void *data_ptr;
 
@@ -1017,7 +1095,10 @@ switchstate:
 						if (!st)
 							goto ignore_msg;
 
-						if (msg_head[1] == PEER_MSG_STKT_UPDATE) {
+						expire = MS_TO_TICKS(st->table->expire);
+
+						if (msg_head[1] == PEER_MSG_STKT_UPDATE ||
+						    msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED) {
 							if (msg_len < sizeof(update)) {
 								/* malformed message */
 								appctx->st0 = PEER_SESS_ST_ERRPROTO;
@@ -1031,6 +1112,19 @@ switchstate:
 							st->last_get++;
 						}
 
+						if (msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED ||
+						    msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
+							size_t expire_sz = sizeof expire;
+
+							if (msg_cur + expire_sz > msg_end) {
+								appctx->st0 = PEER_SESS_ST_ERRPROTO;
+								goto switchstate;
+							}
+							memcpy(&expire, msg_cur, expire_sz);
+							msg_cur += expire_sz;
+							expire = ntohl(expire);
+						}
+
 						newts = stksess_new(st->table, NULL);
 						if (!newts)
 							goto ignore_msg;
@@ -1086,7 +1180,7 @@ switchstate:
 						ts = stktable_lookup(st->table, newts);
 						if (ts) {
 							/* the entry already exist, we can free ours */
-							stktable_touch(st->table, ts, 0);
+							stktable_touch_with_exp(st->table, ts, 0, tick_add(now_ms, expire));
 							stksess_free(st->table, newts);
 							newts = NULL;
 						}
@@ -1094,7 +1188,7 @@ switchstate:
 							struct eb32_node *eb;
 
 							/* create new entry */
-							ts = stktable_store(st->table, newts, 0);
+							ts = stktable_store_with_exp(st->table, newts, 0, tick_add(now_ms, expire));
 							newts = NULL; /* don't reuse it */
 
 							ts->upd.key= (++st->table->update)+(2147483648U);
@@ -1343,7 +1437,7 @@ incomplete:
 									}
 
 									ts = eb32_entry(eb, struct stksess, upd);
-									msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed);
+									msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed, 0);
 									if (!msglen) {
 										/* internal error: message does not fit in trash */
 										appctx->st0 = PEER_SESS_ST_END;
@@ -1404,6 +1498,7 @@ incomplete:
 								while (1) {
 									uint32_t msglen;
 									struct stksess *ts;
+									int use_timed;
 
 									/* push local updates */
 									if (!eb) {
@@ -1415,7 +1510,8 @@ incomplete:
 									}
 
 									ts = eb32_entry(eb, struct stksess, upd);
-									msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed);
+									use_timed = !(curpeer->flags & PEER_F_DWNGRD);
+									msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed, use_timed);
 									if (!msglen) {
 										/* internal error: message does not fit in trash */
 										appctx->st0 = PEER_SESS_ST_END;
@@ -1473,6 +1569,7 @@ incomplete:
 								while (1) {
 									uint32_t msglen;
 									struct stksess *ts;
+									int use_timed;
 
 									/* push local updates */
 									if (!eb || eb->key > st->teaching_origin) {
@@ -1481,7 +1578,8 @@ incomplete:
 									}
 
 									ts = eb32_entry(eb, struct stksess, upd);
-									msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed);
+									use_timed = !(curpeer->flags & PEER_F_DWNGRD);
+									msglen = peer_prepare_updatemsg(ts, st, trash.str, trash.size, new_pushed, use_timed);
 									if (!msglen) {
 										/* internal error: message does not fit in trash */
 										appctx->st0 = PEER_SESS_ST_END;
diff --git a/src/stick_table.c b/src/stick_table.c
index b0aabd4..b269bc1 100644
--- a/src/stick_table.c
+++ b/src/stick_table.c
@@ -244,10 +244,11 @@ struct stksess *stktable_lookup(struct stktable *t, struct stksess *ts)
 /* Update the expiration timer for <ts> but do not touch its expiration node.
  * The table's expiration timer is updated if set.
  */
-struct stksess *stktable_touch(struct stktable *t, struct stksess *ts, int local)
+struct stksess *stktable_touch_with_exp(struct stktable *t, struct stksess *ts,
+                                        int local, int expire)
 {
 	struct eb32_node * eb;
-	ts->expire = tick_add(now_ms, MS_TO_TICKS(t->expire));
+	ts->expire = expire;
 	if (t->expire) {
 		t->exp_task->expire = t->exp_next = tick_first(ts->expire, t->exp_next);
 		task_queue(t->exp_task);
@@ -274,6 +275,17 @@ struct stksess *stktable_touch(struct stktable *t, struct stksess *ts, int local
 	return ts;
 }
 
+/* Update the expiration timer for <ts> but do not touch its expiration node.
+ * The table's expiration timer is updated if set. The date of expiration coming from
+ * <t> stick-table configuration.
+ */
+struct stksess *stktable_touch(struct stktable *t, struct stksess *ts, int local)
+{
+	int expire = tick_add(now_ms, MS_TO_TICKS(t->expire));
+
+	return stktable_touch_with_exp(t, ts, local, expire);
+}
+
 /* Insert new sticky session <ts> in the table. It is assumed that it does not
  * yet exist (the caller must check this). The table's timeout is updated if it
  * is set. <ts> is returned.
@@ -287,6 +299,20 @@ struct stksess *stktable_store(struct stktable *t, struct stksess *ts, int local
 	return ts;
 }
 
+/* Same function as stktable_store(), but with <expire> as supplementary argument
+ * to set the date of expiration of <ts> new sticky session thanks to
+ * stktable_touch_with_exp().
+ */
+struct stksess *stktable_store_with_exp(struct stktable *t, struct stksess *ts,
+                                        int local, int expire)
+{
+	ebmb_insert(&t->keys, &ts->key, t->key_size);
+	stktable_touch_with_exp(t, ts, local, expire);
+	ts->exp.key = ts->expire;
+	eb32_insert(&t->exps, &ts->exp);
+	return ts;
+}
+
 /* Returns a valid or initialized stksess for the specified stktable_key in the
  * specified table, or NULL if the key was NULL, or if no entry was found nor
  * could be created. The entry's expiration is updated.
-- 
2.1.4

Reply via email to