---
doc/configuration.txt | 22 ++++
include/proto/dumpstats.h | 5 +
include/types/stream_interface.h | 5 +
src/dumpstats.c | 242 +++++++++++++++++++++++++++++++++++++-
src/session.c | 9 ++
5 files changed, 280 insertions(+), 3 deletions(-)
diff --git a/doc/configuration.txt b/doc/configuration.txt
index 2ede208..5b1ca85 100644
--- a/doc/configuration.txt
+++ b/doc/configuration.txt
@@ -10031,6 +10031,28 @@ shutdown sessions <backend>/<server>
maintenance mode, for instance. Such terminated sessions are reported with a
'K' flag in the logs.
+debug sess [proxy:<proxy_name>[:<server_name>]]
+
+ Dump a stream of events about sessions as they are added and removed.
+ The possible event formats are "Forward" and "Close":
+
+ "F <session_id> <in_peer> - <in_sock> | <out_sock> - <out_peer>\n"
+ "C <session_id>\n"
+
+ Streaming will continue until a new command is received or the
+ connection is closed. If <proxy_name> or <server_name> is specified, limit to
+ events concerning only the proxy and server specified.
+
+ This command is restricted and can only be issued on sockets configured
+ for levels "operator" or "admin".
+
+ Example:
+ >>> $ echo "set timeout cli 3600; debug sess" | socat stdio,ignoreeof
/tmp/sock1
+ F 1 127.0.0.1:50869 - 127.0.0.1:9418 | 127.0.0.1:50870 - 127.0.0.1:6000
+ C 1
+ F 2 127.0.0.1:50874 - 127.0.0.1:9418 | 127.0.0.1:50875 - 127.0.0.1:6000
+ C 2
+
/*
* Local variables:
* fill-column: 79
diff --git a/include/proto/dumpstats.h b/include/proto/dumpstats.h
index eb44a36..d328881 100644
--- a/include/proto/dumpstats.h
+++ b/include/proto/dumpstats.h
@@ -55,6 +55,8 @@
#define STAT_CLI_O_TAB 8 /* dump tables */
#define STAT_CLI_O_CLR 9 /* clear tables */
+#define STAT_CLI_EVENTS 8 /* event stream */
+
/* status codes (strictly 4 chars) used in the URL to display a message */
#define STAT_STATUS_UNKN "UNKN" /* an unknown error occured, shouldn't
happen */
#define STAT_STATUS_DONE "DONE" /* the action is successful */
@@ -63,8 +65,11 @@
#define STAT_STATUS_DENY "DENY" /* action denied */
extern struct si_applet http_stats_applet;
+extern int stats_event_enabled;
void stats_io_handler(struct stream_interface *si);
+void stats_event_new_session(struct session *s);
+void stats_event_end_session(struct session *s);
#endif /* _PROTO_DUMPSTATS_H */
diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h
index 16af806..d730559 100644
--- a/include/types/stream_interface.h
+++ b/include/types/stream_interface.h
@@ -169,6 +169,11 @@ struct stream_interface {
int bol; /* pointer to beginning
of current line */
} errors;
struct {
+ struct list list; /* list of stats
streams in the STAT_CLI_EVENTS state */
+ struct proxy *px; /* if not NULL, only
send events associated with this proxy */
+ struct server *srv; /* if not NULL, only
send events associated with this server */
+ } events;
+ struct {
void *target; /* table we want to
dump, or NULL for all */
struct proxy *proxy; /* table being
currently dumped (first if NULL) */
struct stksess *entry; /* last entry we were
trying to dump (or first if NULL) */
diff --git a/src/dumpstats.c b/src/dumpstats.c
index 74ad966..b3f4423 100644
--- a/src/dumpstats.c
+++ b/src/dumpstats.c
@@ -86,6 +86,7 @@ static const char stats_sock_usage_msg[] =
" disable : put a server or frontend in maintenance mode\n"
" enable : re-enable a server or frontend which is in
maintenance mode\n"
" shutdown : kill a session or a frontend (eg:to release
listening ports)\n"
+ " debug sess : stream events about proxied sessions\n"
"";
static const char stats_permission_denied_msg[] =
@@ -114,6 +115,77 @@ enum {
STAT_PX_ST_FIN,
};
+/* Keep track of sessions that want streaming events (STAT_CLI_EVENT).
+ */
+int stats_event_enabled = 0;
+static struct list stats_event_listeners =
LIST_HEAD_INIT(stats_event_listeners);
+
+/* Add a session to the list of event listeners.
+ */
+static inline void stats_event_listener_add(struct stream_interface *si)
+{
+ LIST_ADDQ(&stats_event_listeners, &si->applet.ctx.events.list);
+ stats_event_enabled = 1;
+}
+
+/* Remove a session from the list of listeners, but only if it is a
+ * registered listener. This enables us to invoke the method on all
+ * disconnecting stats sockets to ensure they are cleaned up, regardless
+ * of how many times they switch between streaming and other commands.
+ */
+static inline void stats_event_listener_remove(struct stream_interface *si)
+{
+ int found = 0;
+ struct stream_interface *curr;
+ list_for_each_entry(curr, &stats_event_listeners,
applet.ctx.events.list) {
+ if (curr == si) {
+ found = 1;
+ break;
+ }
+ }
+
+ if (found) {
+ si->applet.ctx.events.px = NULL;
+ si->applet.ctx.events.srv = NULL;
+ LIST_DEL(&si->applet.ctx.events.list);
+ }
+
+ if (LIST_ISEMPTY(&stats_event_listeners))
+ stats_event_enabled = 0;
+
+ /* Re-initialize stats output */
+ memset(&si->applet.ctx.stats, 0, sizeof(si->applet.ctx.stats));
+}
+
+/* Send a message to all registered event listeners.
+ */
+static inline void stats_event_listener_message_all(char *msg, struct session
*s)
+{
+ struct stream_interface *curr;
+
+ list_for_each_entry(curr, &stats_event_listeners,
applet.ctx.events.list) {
+ struct proxy *px;
+ struct server *srv;
+
+ if (!(curr->flags & SI_FL_DONT_WAKE) && curr->owner) {
+ /* filter by proxy and server if required */
+ if ((px = curr->applet.ctx.events.px)) {
+ if (s->be != px && s->fe != px)
+ continue; /* ignore */
+ if ((srv = curr->applet.ctx.events.srv)) {
+ if (target_srv(&s->target) != srv)
+ continue; /* ignore */
+ }
+ }
+
+ if (buffer_feed(curr->ib, msg) == -1) {
+ curr->ib->flags |= BF_SEND_DONTWAIT;
+ task_wakeup(curr->owner, TASK_WOKEN_MSG);
+ }
+ }
+ }
+}
+
/* This function is called from the session-level accept() in order to
instanciate
* a new stats socket. It returns a positive value upon success, 0 if the
connection
* needs to be closed and ignored, or a negative value upon critical failure.
@@ -772,7 +844,54 @@ static int stats_sock_parse_request(struct
stream_interface *si, char *line)
args[arg] = line;
si->applet.ctx.stats.flags = 0;
- if (strcmp(args[0], "show") == 0) {
+ if (strcmp(args[0], "debug") == 0) {
+ if (strcmp(args[1], "sess") == 0) {
+ if (s->listener->perm.ux.level < ACCESS_LVL_OPER) {
+ si->applet.ctx.cli.msg =
stats_permission_denied_msg;
+ si->applet.st0 = STAT_CLI_PRINT;
+ return 1;
+ }
+ if (*args[2] && !strncmp(args[2], "proxy", 5)) {
+ struct proxy *px = NULL;
+ struct server *srv = NULL;
+ char *px_name = args[2] + 6, *srv_name;
+
+ if ((srv_name = strchr(px_name, ':'))) {
+ *srv_name = 0;
+ srv_name += 1;
+ }
+
+ px = findproxy(px_name, PR_CAP_FE|PR_CAP_BE);
+ if (!px) {
+ si->applet.ctx.cli.msg = "Invalid proxy
filter for event stream.";
+ si->applet.st0 = STAT_CLI_PRINT;
+ return 1;
+ }
+
+ if (srv_name && *srv_name) {
+ srv = findserver(px, srv_name);
+ if (!srv) {
+ si->applet.ctx.cli.msg =
"Invalid server filter for event stream.";
+ si->applet.st0 = STAT_CLI_PRINT;
+ return 1;
+ }
+ }
+
+ si->applet.ctx.events.srv = srv;
+ si->applet.ctx.events.px = px;
+ } else {
+ si->applet.ctx.events.srv = NULL;
+ si->applet.ctx.events.px = NULL;
+ }
+
+ stats_event_listener_add(si);
+ si->applet.st0 = STAT_CLI_EVENTS;
+ }
+ else { /* not "sess" */
+ return 0;
+ }
+ }
+ else if (strcmp(args[0], "show") == 0) {
if (strcmp(args[1], "stat") == 0) {
if (*args[2] && *args[3] && *args[4]) {
si->applet.ctx.stats.flags |= STAT_BOUND;
@@ -1346,6 +1465,14 @@ static int stats_sock_parse_request(struct
stream_interface *si, char *line)
return 1;
}
+/* Callback to release a cli session.
+ */
+static void cli_session_release(struct stream_interface *si)
+{
+ /* remove if registered as event listener */
+ stats_event_listener_remove(si);
+}
+
/* This I/O handler runs as an applet embedded in a stream interface. It is
* used to processes I/O from/to the stats unix socket. The system relies on a
* state machine handling requests and various responses. We read a request,
@@ -1377,7 +1504,7 @@ static void cli_io_handler(struct stream_interface *si)
si->shutw(si);
break;
}
- else if (si->applet.st0 == STAT_CLI_GETREQ) {
+ else if (si->applet.st0 == STAT_CLI_GETREQ || si->applet.st0 ==
STAT_CLI_EVENTS) {
/* ensure we have some output room left in the event we
* would want to return some info right after parsing.
*/
@@ -1417,7 +1544,10 @@ static void cli_io_handler(struct stream_interface *si)
trash[len] = '\0';
+ if (si->applet.st0 == STAT_CLI_EVENTS)
+ stats_event_listener_remove(si);
si->applet.st0 = STAT_CLI_PROMPT;
+
if (len) {
if (strcmp(trash, "quit") == 0) {
si->applet.st0 = STAT_CLI_END;
@@ -3725,6 +3855,112 @@ static int stats_table_request(struct stream_interface
*si, bool show)
return 1;
}
+/* Called whenever a new session is successfully established (reaches
+ * SI_ST_EST). If there are any stats sockets listening in the
+ * STAT_CLI_EVENTS state, they will be notified of this session's unique
+ * id, along with the sockname and peername of both sides of the session.
+ */
+void stats_event_new_session(struct session *s)
+{
+ if (LIST_ISEMPTY(&stats_event_listeners))
+ return;
+
+ char addrs[4][INET6_ADDRSTRLEN + strlen(":65535") + 1] = {"","","",""};
+ int i;
+
+ struct stream_interface *si0 = &s->si[0], *si1 = &s->si[1];
+ socklen_t namelen;
+
+ /* si0 from/to = peer/sock */
+ if (!(si0->flags & SI_FL_FROM_SET)) {
+ namelen = sizeof(si0->addr.from);
+ getpeername(si0->fd, (struct sockaddr *)&si0->addr.from,
&namelen);
+ si0->flags |= SI_FL_FROM_SET;
+ }
+ if (!(si0->flags & SI_FL_TO_SET)) {
+ namelen = sizeof(si0->addr.to);
+ getsockname(si0->fd, (struct sockaddr *)&si0->addr.to,
&namelen);
+ si0->flags |= SI_FL_TO_SET;
+ }
+
+ /* si1 from/to = sock/peer (reversed) */
+ if (!(si1->flags & SI_FL_FROM_SET)) {
+ namelen = sizeof(si1->addr.from);
+ getsockname(si1->fd, (struct sockaddr *)&si1->addr.from,
&namelen);
+ si1->flags |= SI_FL_FROM_SET;
+ }
+ if (!(si1->flags & SI_FL_TO_SET)) {
+ namelen = sizeof(si1->addr.to);
+ getpeername(si1->fd, (struct sockaddr *)&si1->addr.to,
&namelen);
+ si1->flags |= SI_FL_TO_SET;
+ }
+
+ for(i = 0; i < 4; i++) {
+ struct sockaddr_storage *sock;
+ const void *sin_addr = NULL;
+ int port = 0;
+
+ switch (i) {
+ case 0: // inbound peer
+ sock = &si0->addr.from;
+ break;
+ case 1: // inbound sock
+ sock = &si0->addr.to;
+ break;
+ case 2: // outbound sock
+ sock = &si1->addr.from;
+ break;
+ case 3: // outbound peer
+ sock = &si1->addr.to;
+ break;
+ }
+
+ switch (sock->ss_family) {
+ case AF_INET:
+ sin_addr = (const void *)&((struct sockaddr_in
*)sock)->sin_addr;
+ port = ntohs(((struct sockaddr_in *)sock)->sin_port);
+ break;
+ case AF_INET6:
+ sin_addr = (const void *)&((struct sockaddr_in6
*)sock)->sin6_addr;
+ port = ntohs(((struct sockaddr_in6 *)sock)->sin6_port);
+ break;
+ }
+
+ switch (sock->ss_family) {
+ case AF_INET:
+ case AF_INET6:
+ inet_ntop(sock->ss_family, sin_addr, addrs[i],
sizeof(addrs[i]));
+ snprintf(addrs[i]+strlen(addrs[i]),
sizeof(addrs[i])-strlen(addrs[i])-1, ":%d", port);
+ break;
+ case AF_UNIX:
+ sprintf(addrs[i], "unix:%d", s->listener->luid);
+ break;
+ default:
+ sprintf(addrs[i], "%s", "unknown");
+ }
+ }
+
+ snprintf(trash, sizeof(trash), "F %u %s - %s | %s - %s\n", s->uniq_id,
+ addrs[0], // inbound peer
+ addrs[1], // inbound sock
+ addrs[2], // outbound sock
+ addrs[3] // outbound peer
+ );
+ stats_event_listener_message_all(trash, s);
+}
+
+/* Called when the session argument's s->si[1]->state goes from SI_ST_EST
+ * to SI_ST_CLO. All stats listeners are notified of this destroy event.
+ */
+void stats_event_end_session(struct session *s)
+{
+ if (LIST_ISEMPTY(&stats_event_listeners))
+ return;
+
+ snprintf(trash, sizeof(trash), "C %u\n", s->uniq_id);
+ stats_event_listener_message_all(trash, s);
+}
+
/* print a line of text buffer (limited to 70 bytes) to <out>. The format is :
* <2 spaces> <offset=5 digits> <space or plus> <space> <70 chars max> <\n>
* which is 60 chars per line. Non-printable chars \t, \n, \r and \e are
@@ -3939,7 +4175,7 @@ struct si_applet http_stats_applet = {
static struct si_applet cli_applet = {
.name = "<CLI>", /* used for logging */
.fct = cli_io_handler,
- .release = NULL,
+ .release = cli_session_release,
};
static struct cfg_kw_list cfg_kws = {{ },{
diff --git a/src/session.c b/src/session.c
index e5b76eb..48c5435 100644
--- a/src/session.c
+++ b/src/session.c
@@ -708,6 +708,9 @@ static void sess_establish(struct session *s, struct
stream_interface *si)
rep->rto = s->be->timeout.server;
}
req->wex = TICK_ETERNITY;
+
+ if (unlikely(stats_event_enabled))
+ stats_event_new_session(s);
}
/* Update stream interface status for input states SI_ST_ASS, SI_ST_QUE,
SI_ST_TAR.
@@ -2148,6 +2151,12 @@ struct task *process_session(struct task *t)
s->do_log(s);
}
+ if (unlikely(stats_event_enabled)) {
+ if (s->si[1].state == SI_ST_CLO &&
+ s->si[1].prev_state == SI_ST_EST)
+ stats_event_end_session(s);
+ }
+
/* the task MUST not be in the run queue anymore */
session_free(s);
task_delete(t);
--
1.7.5.3