Hi Aman,

On Fri, Apr 06, 2012 at 02:24:16AM -0700, Aman Gupta wrote:
> On Thu, Apr 5, 2012 at 1:51 AM, Willy Tarreau <[email protected]> wrote:
> > On Mon, Apr 02, 2012 at 06:57:56PM -0700, Aman Gupta wrote:
> >> diff --git a/include/proto/dumpstats.h b/include/proto/dumpstats.h
> >> index eb44a36..d328881 100644
> >> --- a/include/proto/dumpstats.h
> >> +++ b/include/proto/dumpstats.h
> >> @@ -55,6 +55,8 @@
> >>  #define STAT_CLI_O_TAB  8   /* dump tables */
> >>  #define STAT_CLI_O_CLR  9   /* clear tables */
> >>
> 
> >> +#define STAT_CLI_EVENTS 8   /* event stream */
> >
> > This one should apparently be 10 otherwise it conflicts with dump tables 
> > above.
> > Probably that replacing these defines with an enum would fix the issue once 
> > for
> > all BTW.
> 
> Oops, missed this in the forward port.

OK fixed.

> >
> >> +/* Callback to release a cli session.
> >> + */
> >> +static void cli_session_release(struct stream_interface *si)
> >> +{
> >> +     /* remove if registered as event listener */
> >> +     stats_event_listener_remove(si);
> >
> > Are you sure you don't need to check if (si->applet.st0 == STAT_CLI_EVENTS) 
> > here ?
> 
> Yep, you're right.

Fixed too.

I have noted two other issues :
  - the Close event is not logged in HTTP mode, I didn't yet find why

  - I managed to get a segfault when killing the socat attached to the
    stats socket. I found that it was due to some missing calls to
    stats_event_listener_remove() before switch the st0 state to CLI_END.
    After adding them, the issue is gone.

For now I'm holding on the patch in another branch, hoping to find the
time to fix remaining issues soon enough to get it merged quickly.

I'm attaching the updated patch for reference.

Cheers,
Willy

>From 552228f8d6ca3a85b70048a765e274ee9c1d1f44 Mon Sep 17 00:00:00 2001
From: Aman Gupta <[email protected]>
Date: Mon, 2 Apr 2012 18:57:56 -0700
Subject: [PATCH] MAJOR: Add `debug sess` command to unix socket stats interface

This is used to display accepted, connected and closed sessions
in real time on the stats socket.
---
 doc/configuration.txt            |   22 ++++
 include/proto/dumpstats.h        |    5 +
 include/types/stream_interface.h |    5 +
 src/dumpstats.c                  |  247 +++++++++++++++++++++++++++++++++++++-
 src/session.c                    |    9 ++
 5 files changed, 285 insertions(+), 3 deletions(-)

diff --git a/doc/configuration.txt b/doc/configuration.txt
index 782bda9..35e52ae 100644
--- a/doc/configuration.txt
+++ b/doc/configuration.txt
@@ -10158,6 +10158,28 @@ shutdown sessions <backend>/<server>
   maintenance mode, for instance. Such terminated sessions are reported with a
   'K' flag in the logs.
 
+debug sess [proxy:<proxy_name>[:<server_name>]]
+
+  Dump a stream of events about sessions as they are added and removed.
+  The possible event formats are "Forward" and "Close":
+
+    "F <session_id> <in_peer> - <in_sock> | <out_sock> - <out_peer>\n"
+    "C <session_id>\n"
+
+  Streaming will continue until a new command is received or the
+  connection is closed. If <proxy_name> or <server_name> is specified, limit to
+  events concerning only the proxy and server specified.
+
+  This command is restricted and can only be issued on sockets configured
+  for levels "operator" or "admin".
+
+  Example:
+    >>> $ echo "set timeout cli 3600; debug sess" | socat stdio,ignoreeof 
/tmp/sock1
+        F 1 127.0.0.1:50869 - 127.0.0.1:9418 | 127.0.0.1:50870 - 127.0.0.1:6000
+        C 1
+        F 2 127.0.0.1:50874 - 127.0.0.1:9418 | 127.0.0.1:50875 - 127.0.0.1:6000
+        C 2
+
 /*
  * Local variables:
  *  fill-column: 79
diff --git a/include/proto/dumpstats.h b/include/proto/dumpstats.h
index 319ab48..bf091ed 100644
--- a/include/proto/dumpstats.h
+++ b/include/proto/dumpstats.h
@@ -55,9 +55,14 @@
 #define STAT_CLI_O_TAB  8   /* dump tables */
 #define STAT_CLI_O_CLR  9   /* clear tables */
 
+#define STAT_CLI_EVENTS 10  /* events stream (debug sess) */
+
 extern struct si_applet http_stats_applet;
+extern int stats_event_enabled;
 
 void stats_io_handler(struct stream_interface *si);
+void stats_event_new_session(struct session *s);
+void stats_event_end_session(struct session *s);
 
 
 #endif /* _PROTO_DUMPSTATS_H */
diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h
index dae36ef..403e264 100644
--- a/include/types/stream_interface.h
+++ b/include/types/stream_interface.h
@@ -169,6 +169,11 @@ struct stream_interface {
                                int bol;                /* pointer to beginning 
of current line */
                        } errors;
                        struct {
+                               struct list list;       /* list of stats 
streams in the STAT_CLI_EVENTS state */
+                               struct proxy *px;       /* if not NULL, only 
send events associated with this proxy */
+                               struct server *srv;     /* if not NULL, only 
send events associated with this server */
+                       } events;
+                       struct {
                                void *target;           /* table we want to 
dump, or NULL for all */
                                struct proxy *proxy;    /* table being 
currently dumped (first if NULL) */
                                struct stksess *entry;  /* last entry we were 
trying to dump (or first if NULL) */
diff --git a/src/dumpstats.c b/src/dumpstats.c
index 88a5285..8ec0e71 100644
--- a/src/dumpstats.c
+++ b/src/dumpstats.c
@@ -86,6 +86,7 @@ static const char stats_sock_usage_msg[] =
        "  disable        : put a server or frontend in maintenance mode\n"
        "  enable         : re-enable a server or frontend which is in 
maintenance mode\n"
        "  shutdown       : kill a session or a frontend (eg:to release 
listening ports)\n"
+       "  debug sess     : stream events about proxied sessions\n"
        "";
 
 static const char stats_permission_denied_msg[] =
@@ -116,6 +117,77 @@ enum {
 
 extern const char *stat_status_codes[];
 
+/* Keep track of sessions that want streaming events (STAT_CLI_EVENT).
+ */
+int stats_event_enabled = 0;
+static struct list stats_event_listeners = 
LIST_HEAD_INIT(stats_event_listeners);
+
+/* Add a session to the list of event listeners.
+ */
+static inline void stats_event_listener_add(struct stream_interface *si)
+{
+       LIST_ADDQ(&stats_event_listeners, &si->applet.ctx.events.list);
+       stats_event_enabled = 1;
+}
+
+/* Remove a session from the list of listeners, but only if it is a
+ * registered listener. This enables us to invoke the method on all
+ * disconnecting stats sockets to ensure they are cleaned up, regardless
+ * of how many times they switch between streaming and other commands.
+ */
+static inline void stats_event_listener_remove(struct stream_interface *si)
+{
+       int found = 0;
+       struct stream_interface *curr;
+       list_for_each_entry(curr, &stats_event_listeners, 
applet.ctx.events.list) {
+               if (curr == si) {
+                       found = 1;
+                       break;
+               }
+       }
+
+       if (found) {
+               si->applet.ctx.events.px = NULL;
+               si->applet.ctx.events.srv = NULL;
+               LIST_DEL(&si->applet.ctx.events.list);
+       }
+
+       if (LIST_ISEMPTY(&stats_event_listeners))
+               stats_event_enabled = 0;
+
+       /* Re-initialize stats output */
+       memset(&si->applet.ctx.stats, 0, sizeof(si->applet.ctx.stats));
+}
+
+/* Send a message to all registered event listeners.
+ */
+static inline void stats_event_listener_message_all(char *msg, struct session 
*s)
+{
+       struct stream_interface *curr;
+
+       list_for_each_entry(curr, &stats_event_listeners, 
applet.ctx.events.list) {
+               struct proxy *px;
+               struct server *srv;
+
+               if (!(curr->flags & SI_FL_DONT_WAKE) && curr->owner) {
+                       /* filter by proxy and server if required */
+                       if ((px = curr->applet.ctx.events.px)) {
+                               if (s->be != px && s->fe != px)
+                                       continue; /* ignore */
+                               if ((srv = curr->applet.ctx.events.srv)) {
+                                       if (target_srv(&s->target) != srv)
+                                               continue; /* ignore */
+                               }
+                       }
+
+                       if (buffer_feed(curr->ib, msg) == -1) {
+                               curr->ib->flags |= BF_SEND_DONTWAIT;
+                               task_wakeup(curr->owner, TASK_WOKEN_MSG);
+                       }
+               }
+       }
+}
+
 /* This function is called from the session-level accept() in order to 
instanciate
  * a new stats socket. It returns a positive value upon success, 0 if the 
connection
  * needs to be closed and ignored, or a negative value upon critical failure.
@@ -774,7 +846,54 @@ static int stats_sock_parse_request(struct 
stream_interface *si, char *line)
                args[arg] = line;
 
        si->applet.ctx.stats.flags = 0;
-       if (strcmp(args[0], "show") == 0) {
+       if (strcmp(args[0], "debug") == 0) {
+               if (strcmp(args[1], "sess") == 0) {
+                       if (s->listener->perm.ux.level < ACCESS_LVL_OPER) {
+                               si->applet.ctx.cli.msg = 
stats_permission_denied_msg;
+                               si->applet.st0 = STAT_CLI_PRINT;
+                               return 1;
+                       }
+                       if (*args[2] && !strncmp(args[2], "proxy", 5)) {
+                               struct proxy *px = NULL;
+                               struct server *srv = NULL;
+                               char *px_name = args[2] + 6, *srv_name;
+
+                               if ((srv_name = strchr(px_name, ':'))) {
+                                       *srv_name = 0;
+                                       srv_name += 1;
+                               }
+
+                               px = findproxy(px_name, PR_CAP_FE|PR_CAP_BE);
+                               if (!px) {
+                                       si->applet.ctx.cli.msg = "Invalid proxy 
filter for event stream.";
+                                       si->applet.st0 = STAT_CLI_PRINT;
+                                       return 1;
+                               }
+
+                               if (srv_name && *srv_name) {
+                                       srv = findserver(px, srv_name);
+                                       if (!srv) {
+                                               si->applet.ctx.cli.msg = 
"Invalid server filter for event stream.";
+                                               si->applet.st0 = STAT_CLI_PRINT;
+                                               return 1;
+                                       }
+                               }
+
+                               si->applet.ctx.events.srv = srv;
+                               si->applet.ctx.events.px = px;
+                       } else {
+                               si->applet.ctx.events.srv = NULL;
+                               si->applet.ctx.events.px = NULL;
+                       }
+
+                       stats_event_listener_add(si);
+                       si->applet.st0 = STAT_CLI_EVENTS;
+               }
+               else { /* not "sess" */
+                       return 0;
+               }
+       }
+       else if (strcmp(args[0], "show") == 0) {
                if (strcmp(args[1], "stat") == 0) {
                        if (*args[2] && *args[3] && *args[4]) {
                                si->applet.ctx.stats.flags |= STAT_BOUND;
@@ -1348,6 +1467,15 @@ static int stats_sock_parse_request(struct 
stream_interface *si, char *line)
        return 1;
 }
 
+/* Callback to release a cli session.
+ */
+static void cli_session_release(struct stream_interface *si)
+{
+       /* remove if registered as event listener */
+       if (si->applet.st0 == STAT_CLI_EVENTS)
+               stats_event_listener_remove(si);
+}
+
 /* This I/O handler runs as an applet embedded in a stream interface. It is
  * used to processes I/O from/to the stats unix socket. The system relies on a
  * state machine handling requests and various responses. We read a request,
@@ -1379,7 +1507,7 @@ static void cli_io_handler(struct stream_interface *si)
                        si->shutw(si);
                        break;
                }
-               else if (si->applet.st0 == STAT_CLI_GETREQ) {
+               else if (si->applet.st0 == STAT_CLI_GETREQ || si->applet.st0 == 
STAT_CLI_EVENTS) {
                        /* ensure we have some output room left in the event we
                         * would want to return some info right after parsing.
                         */
@@ -1390,6 +1518,8 @@ static void cli_io_handler(struct stream_interface *si)
                        if (reql <= 0) { /* closed or EOL not found */
                                if (reql == 0)
                                        break;
+                               if (si->applet.st0 == STAT_CLI_EVENTS)
+                                       stats_event_listener_remove(si);
                                si->applet.st0 = STAT_CLI_END;
                                continue;
                        }
@@ -1410,6 +1540,8 @@ static void cli_io_handler(struct stream_interface *si)
                         */
                        len = reql - 1;
                        if (trash[len] != '\n') {
+                               if (si->applet.st0 == STAT_CLI_EVENTS)
+                                       stats_event_listener_remove(si);
                                si->applet.st0 = STAT_CLI_END;
                                continue;
                        }
@@ -1419,7 +1551,10 @@ static void cli_io_handler(struct stream_interface *si)
 
                        trash[len] = '\0';
 
+                       if (si->applet.st0 == STAT_CLI_EVENTS)
+                               stats_event_listener_remove(si);
                        si->applet.st0 = STAT_CLI_PROMPT;
+
                        if (len) {
                                if (strcmp(trash, "quit") == 0) {
                                        si->applet.st0 = STAT_CLI_END;
@@ -3753,6 +3888,112 @@ static int stats_table_request(struct stream_interface 
*si, bool show)
        return 1;
 }
 
+/* Called whenever a new session is successfully established (reaches
+ * SI_ST_EST). If there are any stats sockets listening in the
+ * STAT_CLI_EVENTS state, they will be notified of this session's unique
+ * id, along with the sockname and peername of both sides of the session.
+ */
+void stats_event_new_session(struct session *s)
+{
+       if (LIST_ISEMPTY(&stats_event_listeners))
+               return;
+
+       char addrs[4][INET6_ADDRSTRLEN + strlen(":65535") + 1] = {"","","",""};
+       int i;
+
+       struct stream_interface *si0 = &s->si[0], *si1 = &s->si[1];
+       socklen_t namelen;
+
+       /* si0 from/to = peer/sock */
+       if (!(si0->flags & SI_FL_FROM_SET)) {
+               namelen = sizeof(si0->addr.from);
+               getpeername(si0->fd, (struct sockaddr *)&si0->addr.from, 
&namelen);
+               si0->flags |= SI_FL_FROM_SET;
+       }
+       if (!(si0->flags & SI_FL_TO_SET)) {
+               namelen = sizeof(si0->addr.to);
+               getsockname(si0->fd, (struct sockaddr *)&si0->addr.to, 
&namelen);
+               si0->flags |= SI_FL_TO_SET;
+       }
+
+       /* si1 from/to = sock/peer (reversed) */
+       if (!(si1->flags & SI_FL_FROM_SET)) {
+               namelen = sizeof(si1->addr.from);
+               getsockname(si1->fd, (struct sockaddr *)&si1->addr.from, 
&namelen);
+               si1->flags |= SI_FL_FROM_SET;
+       }
+       if (!(si1->flags & SI_FL_TO_SET)) {
+               namelen = sizeof(si1->addr.to);
+               getpeername(si1->fd, (struct sockaddr *)&si1->addr.to, 
&namelen);
+               si1->flags |= SI_FL_TO_SET;
+       }
+
+       for(i = 0; i < 4; i++) {
+               struct sockaddr_storage *sock;
+               const void *sin_addr = NULL;
+               int port = 0;
+
+               switch (i) {
+               case 0: // inbound peer
+                       sock = &si0->addr.from;
+                       break;
+               case 1: // inbound sock
+                       sock = &si0->addr.to;
+                       break;
+               case 2: // outbound sock
+                       sock = &si1->addr.from;
+                       break;
+               case 3: // outbound peer
+                       sock = &si1->addr.to;
+                       break;
+               }
+
+               switch (sock->ss_family) {
+               case AF_INET:
+                       sin_addr = (const void *)&((struct sockaddr_in 
*)sock)->sin_addr;
+                       port = ntohs(((struct sockaddr_in *)sock)->sin_port);
+                       break;
+               case AF_INET6:
+                       sin_addr = (const void *)&((struct sockaddr_in6 
*)sock)->sin6_addr;
+                       port = ntohs(((struct sockaddr_in6 *)sock)->sin6_port);
+                       break;
+               }
+
+               switch (sock->ss_family) {
+               case AF_INET:
+               case AF_INET6:
+                       inet_ntop(sock->ss_family, sin_addr, addrs[i], 
sizeof(addrs[i]));
+                       snprintf(addrs[i]+strlen(addrs[i]), 
sizeof(addrs[i])-strlen(addrs[i])-1, ":%d", port);
+                       break;
+               case AF_UNIX:
+                       sprintf(addrs[i], "unix:%d", s->listener->luid);
+                       break;
+               default:
+                       sprintf(addrs[i], "%s", "unknown");
+               }
+       }
+
+       snprintf(trash, sizeof(trash), "F %u %s - %s | %s - %s\n", s->uniq_id,
+         addrs[0], // inbound peer
+         addrs[1], // inbound sock
+         addrs[2], // outbound sock
+         addrs[3]  // outbound peer
+       );
+       stats_event_listener_message_all(trash, s);
+}
+
+/* Called when the session argument's s->si[1]->state goes from SI_ST_EST
+ * to SI_ST_CLO. All stats listeners are notified of this destroy event.
+ */
+void stats_event_end_session(struct session *s)
+{
+       if (LIST_ISEMPTY(&stats_event_listeners))
+               return;
+
+       snprintf(trash, sizeof(trash), "C %u\n", s->uniq_id);
+       stats_event_listener_message_all(trash, s);
+}
+
 /* print a line of text buffer (limited to 70 bytes) to <out>. The format is :
  * <2 spaces> <offset=5 digits> <space or plus> <space> <70 chars max> <\n>
  * which is 60 chars per line. Non-printable chars \t, \n, \r and \e are
@@ -3967,7 +4208,7 @@ struct si_applet http_stats_applet = {
 static struct si_applet cli_applet = {
        .name = "<CLI>", /* used for logging */
        .fct = cli_io_handler,
-       .release = NULL,
+       .release = cli_session_release,
 };
 
 static struct cfg_kw_list cfg_kws = {{ },{
diff --git a/src/session.c b/src/session.c
index b90b254..adbd11e 100644
--- a/src/session.c
+++ b/src/session.c
@@ -708,6 +708,9 @@ static void sess_establish(struct session *s, struct 
stream_interface *si)
                rep->rto = s->be->timeout.server;
        }
        req->wex = TICK_ETERNITY;
+
+       if (unlikely(stats_event_enabled))
+               stats_event_new_session(s);
 }
 
 /* Update stream interface status for input states SI_ST_ASS, SI_ST_QUE, 
SI_ST_TAR.
@@ -2203,6 +2206,12 @@ struct task *process_session(struct task *t)
                s->do_log(s);
        }
 
+       if (unlikely(stats_event_enabled)) {
+               if (s->si[1].state      == SI_ST_CLO &&
+                   s->si[1].prev_state == SI_ST_EST)
+                       stats_event_end_session(s);
+       }
+
        /* the task MUST not be in the run queue anymore */
        session_free(s);
        task_delete(t);
-- 
1.7.2.1.45.g54fbc

Reply via email to