This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new e401cb1  DISPATCH-2108: Aggregate statistics for TCP listeners and 
connectors
e401cb1 is described below

commit e401cb19bc243bd240eb61cae9a1bef9fdd2e549
Author: Chuck Rolke <c...@apache.org>
AuthorDate: Wed Jun 16 15:32:09 2021 -0400

    DISPATCH-2108: Aggregate statistics for TCP listeners and connectors
    
    This commit adds counters
    
      * bytesIn
      * bytesOut
      * connectionsOpened
      * connectionsClosed
    
    to tcpListener and tcpConnector entities. Individual bytesIn and bytesOut
    for tcpConnections are tracked in the connection entity. The new aggregated
    statistics are running totals for all connections created through the
    listener and connector entities.
    
    Internally the tcp_bridge_config structure was renamed to tcp_bridge as
    the struct is not just config any more. The qd_tcp_connection no longer
    holds the actual structure in 'config'. Instead it holds a pointer to
    the repurposed structure in 'bridge'.
    
    qd_tcp_bridge_t is now a pooled object. Its lifetime is controlled by
    a ref count. A bridge is created synchronously with a listener or a
    connector and those entities hold the first reference. Tcp connections
    hold a reference to the bridge and target that bridge for ongoing
    statistics accumulation.
    
    With the ref counting a connection may be created, say, through a listener.
    Then before the connection is closed the listener may be deleted. The
    ref count keeps the bridge object alive so that the connection statistics
    are still aggregated. Only when the bridge object loses all of its
    references is the bridge finally disposed. Bridge objects dump their final
    statistics to the TCP_ADAPTOR log at INFO level.
    
    This commit also creates LOCK and UNLOCK macros to aid developers trying
    to read the code.
    
    A self test is added to make sure the entity attributes are present and
    hold reasonable values.
    
    This closes #2144
---
 python/qpid_dispatch/management/qdrouter.json |  41 +++++-
 src/adaptors/tcp_adaptor.c                    | 192 +++++++++++++++++++-------
 src/adaptors/tcp_adaptor.h                    |  34 +++--
 tests/system_tests_tcp_adaptor.py             |  41 ++++++
 4 files changed, 244 insertions(+), 64 deletions(-)

diff --git a/python/qpid_dispatch/management/qdrouter.json 
b/python/qpid_dispatch/management/qdrouter.json
index 0441752..2619ad0 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1279,6 +1279,26 @@
                     "required": false,
                     "description": "Used to identify where connection is 
handled.",
                     "create": true
+                },
+                "connectionsOpened": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The number of connections opened by this 
listener."
+                },
+                "connectionsClosed": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The number of connections closed by this 
listener."
+                },
+                "bytesIn": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The number of bytes sent from clients to 
servers on all connections to this listener."
+                },
+                "bytesOut": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The number of bytes sent from servers to 
clients on all connections to this listener."
                 }
             }
         },
@@ -1302,13 +1322,32 @@
                     "description": "Port number or symbolic service name.",
                     "type": "string",
                     "create": true
-
                 },
                 "siteId": {
                     "type": "string",
                     "required": false,
                     "description": "Used to identify origin of connections.",
                     "create": true
+                },
+                "connectionsOpened": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The number of connections opened by this 
connector."
+                },
+                "connectionsClosed": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The number of connections closed by this 
connector."
+                },
+                "bytesIn": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The number of bytes sent from servers to 
clients on all connections created by this connector."
+                },
+                "bytesOut": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The number of bytes sent from clients to 
servers on all connections created by this connector."
                 }
             }
         },
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index b4cf3fc..af5faa6 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -51,9 +51,13 @@ const size_t TCP_BUFFER_SIZE = 16384*2;
 
 ALLOC_DEFINE(qd_tcp_listener_t);
 ALLOC_DEFINE(qd_tcp_connector_t);
+ALLOC_DEFINE(qd_tcp_bridge_t);
 
 #define WRITE_BUFFERS 12
 
+#define LOCK   sys_mutex_lock
+#define UNLOCK sys_mutex_unlock
+
 typedef struct qdr_tcp_connection_t qdr_tcp_connection_t;
 
 struct qdr_tcp_connection_t {
@@ -81,7 +85,7 @@ struct qdr_tcp_connection_t {
     bool                  read_eos_seen;
     qdr_delivery_t       *initial_delivery;
     qd_timer_t           *activate_timer;
-    qd_bridge_config_t    config;
+    qd_tcp_bridge_t      *bridge;         // config and stats
     qd_server_t          *server;
     char                 *remote_address;
     char                 *global_id;
@@ -122,6 +126,7 @@ typedef struct qdr_tcp_adaptor_t {
     qd_tcp_listener_list_t    listeners;
     qd_tcp_connector_list_t   connectors;
     qdr_tcp_connection_list_t connections;
+    qd_bridge_config_list_t   bridges;
     qd_log_source_t          *log_source;
 } qdr_tcp_adaptor_t;
 
@@ -132,6 +137,7 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, 
qdr_action_t *action, bo
 
 static void handle_disconnected(qdr_tcp_connection_t* conn);
 static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn);
+static void free_bridge_config(qd_tcp_bridge_t *config);
 static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc);
 
 static void allocate_tcp_buffer(pn_raw_buffer_t *buffer)
@@ -214,7 +220,7 @@ void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t 
context)
     }
 
     // prevent the tc from being deleted while running:
-    sys_mutex_lock(tc->activation_lock);
+    LOCK(tc->activation_lock);
 
     if (tc->pn_raw_conn) {
         sys_atomic_set(&tc->q2_restart, 1);
@@ -224,7 +230,7 @@ void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t 
context)
         pn_raw_connection_wake(tc->pn_raw_conn);
     }
 
-    sys_mutex_unlock(tc->activation_lock);
+    UNLOCK(tc->activation_lock);
 }
 
 // Extract buffers and their bytes from raw connection.
@@ -250,8 +256,12 @@ static int handle_incoming_raw_read(qdr_tcp_connection_t 
*conn, qd_buffer_list_t
     conn->read_pending = false;
     if (result > 0) {
         // account for any incoming bytes just read
+
         conn->last_in_time = tcp_adaptor->core->uptime_ticks;
         conn->bytes_in      += result;
+        LOCK(conn->bridge->stats_lock);
+        conn->bridge->bytes_in += result;
+        UNLOCK(conn->bridge->stats_lock);
         conn->bytes_unacked += result;
         if (conn->bytes_unacked >= TCP_MAX_CAPACITY) {
             qd_log(tcp_adaptor->log_source, QD_LOG_TRACE,
@@ -313,13 +323,13 @@ static int handle_incoming(qdr_tcp_connection_t *conn, 
const char *msg)
         qd_compose_insert_null(props);                      // message-id
         qd_compose_insert_null(props);                      // user-id
         if (conn->ingress) {
-            qd_compose_insert_string(props, conn->config.address); // to
+            qd_compose_insert_string(props, conn->bridge->address); // to
             qd_compose_insert_string(props, conn->global_id);      // subject
             qd_compose_insert_string(props, conn->reply_to);       // reply-to
             qd_log(log, QD_LOG_DEBUG,
                    "[C%"PRIu64"][L%"PRIu64"] Initiating listener (ingress) 
stream incoming link for %s connection to: %s reply: %s",
                    conn->conn_id, conn->incoming_id, 
qdr_tcp_connection_role_name(conn),
-                   conn->config.address, conn->reply_to);
+                   conn->bridge->address, conn->reply_to);
         } else {
             qd_compose_insert_string(props, conn->reply_to);  // to
             qd_compose_insert_string(props, conn->global_id); // subject
@@ -433,6 +443,10 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* 
tc)
     free(tc->write_buffer.bytes);
     free(tc->read_buffer.bytes);
     //proactor will free the socket
+    LOCK(tc->bridge->stats_lock);
+    tc->bridge->connections_closed += 1;
+    UNLOCK(tc->bridge->stats_lock);
+    free_bridge_config(tc->bridge);
     free_qdr_tcp_connection_t(tc);
 }
 
@@ -638,10 +652,7 @@ static void handle_outgoing(qdr_tcp_connection_t *conn)
             qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
                    "[C%"PRIu64"] handle_outgoing calling 
pn_raw_connection_write_close(). rcv_complete:%s, send_complete:%s",
                     conn->conn_id, qd_message_receive_complete(msg) ? "T" : 
"F", qd_message_send_complete(msg) ? "T" : "F");
-            sys_mutex_lock(conn->activation_lock);
             SET_ATOMIC_FLAG(&conn->raw_closed_write);
-
-            sys_mutex_unlock(conn->activation_lock);
             pn_raw_connection_write_close(conn->pn_raw_conn);
         }
     }
@@ -693,7 +704,7 @@ static void 
qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc)
     allocate_tcp_write_buffer(&tc->write_buffer);
     allocate_tcp_buffer(&tc->read_buffer);
     tc->remote_address = get_address_string(tc->pn_raw_conn);
-    tc->global_id = get_global_id(tc->config.site_id, tc->remote_address);
+    tc->global_id = get_global_id(tc->bridge->site_id, tc->remote_address);
 
     //
     // The qdr_connection_info() function makes its own copy of the passed in 
tcp_conn_properties.
@@ -740,7 +751,7 @@ static void 
qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc)
     qdr_terminus_t *dynamic_source = qdr_terminus(0);
     qdr_terminus_set_dynamic(dynamic_source);
     qdr_terminus_t *target = qdr_terminus(0);
-    qdr_terminus_set_address(target, tc->config.address);
+    qdr_terminus_set_address(target, tc->bridge->address);
 
     tc->outgoing = qdr_link_first_attach(conn,
                                          QD_OUTGOING,
@@ -779,7 +790,7 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
             qdr_tcp_connection_ingress_accept(conn);
             qd_log(log, QD_LOG_INFO,
                    "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Listener ingress 
accepted to %s from %s (global_id=%s)",
-                   conn->conn_id, conn->config.host_port, 
conn->remote_address, conn->global_id);
+                   conn->conn_id, conn->bridge->host_port, 
conn->remote_address, conn->global_id);
             break;
         } else {
             conn->remote_address = get_address_string(conn->pn_raw_conn);
@@ -799,9 +810,9 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] 
PN_RAW_CONNECTION_CLOSED_READ %s",
                conn->conn_id, conn->incoming_id, 
qdr_tcp_connection_role_name(conn));
         SET_ATOMIC_FLAG(&conn->raw_closed_read);
-        sys_mutex_lock(conn->activation_lock);
+        LOCK(conn->activation_lock);
         conn->q2_blocked = false;
-        sys_mutex_unlock(conn->activation_lock);
+        UNLOCK(conn->activation_lock);
         handle_incoming(conn, "PNRC_CLOSED_READ");
         break;
     }
@@ -816,9 +827,9 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
         qd_log(log, QD_LOG_INFO,
                "[C%"PRIu64"] PN_RAW_CONNECTION_DISCONNECTED %s",
                conn->conn_id, qdr_tcp_connection_role_name(conn));
-        sys_mutex_lock(conn->activation_lock);
+        LOCK(conn->activation_lock);
         conn->pn_raw_conn = 0;
-        sys_mutex_unlock(conn->activation_lock);
+        UNLOCK(conn->activation_lock);
         handle_disconnected(conn);
         break;
     }
@@ -846,9 +857,9 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
                "[C%"PRIu64"] PN_RAW_CONNECTION_WAKE %s",
                conn->conn_id, qdr_tcp_connection_role_name(conn));
         if (sys_atomic_set(&conn->q2_restart, 0)) {
-            sys_mutex_lock(conn->activation_lock);
+            LOCK(conn->activation_lock);
             conn->q2_blocked = false;
-            sys_mutex_unlock(conn->activation_lock);
+            UNLOCK(conn->activation_lock);
             // note: unit tests grep for this log!
             qd_log(log, QD_LOG_TRACE,
                    "[C%"PRIu64"] %s client link unblocked from Q2 limit",
@@ -886,6 +897,9 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
             conn->write_buffer.context = 0;
             conn->last_out_time = tcp_adaptor->core->uptime_ticks;
             conn->bytes_out += written;
+            LOCK(conn->bridge->stats_lock);
+            conn->bridge->bytes_out += written;
+            UNLOCK(conn->bridge->stats_lock);
 
             if (written > 0) {
                 // Tell the upstream to open its receive window.  Note: this 
update
@@ -926,11 +940,17 @@ static qdr_tcp_connection_t 
*qdr_tcp_connection_ingress(qd_tcp_listener_t* liste
     tc->ingress = true;
     tc->context.context = tc;
     tc->context.handler = &handle_connection_event;
-    tc->config = listener->config;
+    tc->bridge = listener->config;
+    sys_atomic_inc(&tc->bridge->ref_count);
     tc->server = listener->server;
     sys_atomic_init(&tc->q2_restart, 0);
     sys_atomic_init(&tc->raw_closed_read, 0);
     sys_atomic_init(&tc->raw_closed_write, 0);
+
+    LOCK(tc->bridge->stats_lock);
+    tc->bridge->connections_opened +=1;
+    UNLOCK(tc->bridge->stats_lock);
+
     tc->pn_raw_conn = pn_raw_connection();
     pn_raw_connection_set_context(tc->pn_raw_conn, tc);
     //the following call will cause a PN_RAW_CONNECTION_CONNECTED
@@ -945,7 +965,7 @@ static qdr_tcp_connection_t 
*qdr_tcp_connection_ingress(qd_tcp_listener_t* liste
 
 static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc)
 {
-    const char *host = tc->egress_dispatcher ? "egress-dispatch" : 
tc->config.host_port;
+    const char *host = tc->egress_dispatcher ? "egress-dispatch" : 
tc->bridge->host_port;
     qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Opening 
server-side core connection %s", tc->conn_id, host);
 
     //
@@ -990,7 +1010,7 @@ static void 
qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc)
     qdr_connection_set_context(conn, tc);
 
     qdr_terminus_t *source = qdr_terminus(0);
-    qdr_terminus_set_address(source, tc->config.address);
+    qdr_terminus_set_address(source, tc->bridge->address);
 
     // This attach passes the ownership of the delivery from the core-side 
connection and link
     // to the adaptor-side outgoing connection and link.
@@ -1021,7 +1041,7 @@ static void 
qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc)
 }
 
 
-static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t 
*config, qd_server_t *server, qdr_delivery_t *initial_delivery)
+static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_tcp_bridge_t 
*config, qd_server_t *server, qdr_delivery_t *initial_delivery)
 {
     qdr_tcp_connection_t* tc = new_qdr_tcp_connection_t();
     ZERO(tc);
@@ -1037,13 +1057,18 @@ static qdr_tcp_connection_t 
*qdr_tcp_connection_egress(qd_bridge_config_t *confi
     tc->ingress = false;
     tc->context.context = tc;
     tc->context.handler = &handle_connection_event;
-    tc->config = *config;
+    tc->bridge = config;
+    sys_atomic_inc(&tc->bridge->ref_count);
     tc->server = server;
     sys_atomic_init(&tc->q2_restart, 0);
     sys_atomic_init(&tc->raw_closed_read, 0);
     sys_atomic_init(&tc->raw_closed_write, 0);
     tc->conn_id = qd_server_allocate_connection_id(tc->server);
 
+    LOCK(tc->bridge->stats_lock);
+    tc->bridge->connections_opened +=1;
+    UNLOCK(tc->bridge->stats_lock);
+
     //
     // If this is the egress dispatcher, set up the core connection now.
     // Otherwise, set up a physical raw connection and wait until we are
@@ -1057,32 +1082,54 @@ static qdr_tcp_connection_t 
*qdr_tcp_connection_egress(qd_bridge_config_t *confi
         allocate_tcp_buffer(&tc->read_buffer);
         qd_log(tcp_adaptor->log_source, QD_LOG_INFO,
                "[C%"PRIu64"] call pn_proactor_raw_connect(). Egress connecting 
to: %s",
-               tc->conn_id, tc->config.host_port);
+               tc->conn_id, tc->bridge->host_port);
         tc->pn_raw_conn = pn_raw_connection();
         pn_raw_connection_set_context(tc->pn_raw_conn, tc);
-        pn_proactor_raw_connect(qd_server_proactor(tc->server), 
tc->pn_raw_conn, tc->config.host_port);
+        pn_proactor_raw_connect(qd_server_proactor(tc->server), 
tc->pn_raw_conn, tc->bridge->host_port);
     }
 
     return tc;
 }
 
-static void free_bridge_config(qd_bridge_config_t *config)
+
+static qd_tcp_bridge_t *qd_bridge_config()
+{
+    qd_tcp_bridge_t *bc = new_qd_tcp_bridge_t();
+    if (!bc) return 0;
+    ZERO(bc);
+    sys_atomic_init(&bc->ref_count, 1);
+    bc->stats_lock = sys_mutex();
+    return bc;
+}
+
+
+static void free_bridge_config(qd_tcp_bridge_t *config)
 {
     if (!config) return;
+    if (sys_atomic_dec(&config->ref_count) > 1) return;
+
+    qd_log(tcp_adaptor->log_source, QD_LOG_INFO,
+           "Deleted TCP bridge configuation '%s' for address %s, %s, siteId 
%s. "
+           "Connections opened:%"PRIu64", closed:%"PRIu64". Bytes 
in:%"PRIu64", out:%"PRIu64,
+           config->name, config->address, config->host_port, config->site_id,
+           config->connections_opened, config->connections_closed, 
config->bytes_in, config->bytes_out);
     free(config->name);
     free(config->address);
     free(config->host);
     free(config->port);
     free(config->site_id);
     free(config->host_port);
+
+    sys_atomic_destroy(&config->ref_count);
+    sys_mutex_free(config->stats_lock);
+    free_qd_tcp_bridge_t(config);
 }
 
 #define CHECK() if (qd_error_code()) goto error
 
-static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_bridge_config_t 
*config, qd_entity_t* entity, bool is_listener)
+static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_tcp_bridge_t 
*config, qd_entity_t* entity, bool is_listener)
 {
     qd_error_clear();
-    ZERO(config);
 
     config->name    = qd_entity_get_string(entity, "name");      CHECK();
     config->address = qd_entity_get_string(entity, "address");   CHECK();
@@ -1101,14 +1148,15 @@ static qd_error_t load_bridge_config(qd_dispatch_t *qd, 
qd_bridge_config_t *conf
     return qd_error_code();
 }
 
-static void log_tcp_bridge_config(qd_log_source_t *log, qd_bridge_config_t *c, 
const char *what) {
+static void log_tcp_bridge_config(qd_log_source_t *log, qd_tcp_bridge_t *c, 
const char *what) {
     qd_log(log, QD_LOG_INFO, "Configured %s for %s, %s:%s", what, c->address, 
c->host, c->port);
 }
 
 void qd_tcp_listener_decref(qd_tcp_listener_t* li)
 {
     if (li && sys_atomic_dec(&li->ref_count) == 1) {
-        free_bridge_config(&li->config);
+        sys_atomic_destroy(&li->ref_count);
+        free_bridge_config(li->config);
         free_qd_tcp_listener_t(li);
     }
 }
@@ -1117,7 +1165,7 @@ static void handle_listener_event(pn_event_t *e, 
qd_server_t *qd_server, void *c
     qd_log_source_t *log = tcp_adaptor->log_source;
 
     qd_tcp_listener_t *li = (qd_tcp_listener_t*) context;
-    const char *host_port = li->config.host_port;
+    const char *host_port = li->config->host_port;
 
     switch (pn_event_type(e)) {
 
@@ -1162,6 +1210,7 @@ static qd_tcp_listener_t *qd_tcp_listener(qd_server_t 
*server)
     li->server = server;
     li->context.context = li;
     li->context.handler = &handle_listener_event;
+    li->config = qd_bridge_config();
     return li;
 }
 
@@ -1171,12 +1220,12 @@ static bool tcp_listener_listen(qd_tcp_listener_t *li) {
    li->pn_listener = pn_listener();
     if (li->pn_listener) {
         pn_listener_set_context(li->pn_listener, &li->context);
-        pn_proactor_listen(qd_server_proactor(li->server), li->pn_listener, 
li->config.host_port, BACKLOG);
+        pn_proactor_listen(qd_server_proactor(li->server), li->pn_listener, 
li->config->host_port, BACKLOG);
         sys_atomic_inc(&li->ref_count); /* In use by proactor, 
PN_LISTENER_CLOSE will dec */
         /* Listen is asynchronous, log "listening" message on PN_LISTENER_OPEN 
event */
     } else {
         qd_log(tcp_adaptor->log_source, QD_LOG_CRITICAL, "Failed to create 
listener for %s",
-               li->config.host_port);
+               li->config->host_port);
      }
     return li->pn_listener;
 }
@@ -1184,14 +1233,14 @@ static bool tcp_listener_listen(qd_tcp_listener_t *li) {
 qd_tcp_listener_t *qd_dispatch_configure_tcp_listener(qd_dispatch_t *qd, 
qd_entity_t *entity)
 {
     qd_tcp_listener_t *li = qd_tcp_listener(qd->server);
-    if (!li || load_bridge_config(qd, &li->config, entity, true) != 
QD_ERROR_NONE) {
+    if (!li || load_bridge_config(qd, li->config, entity, true) != 
QD_ERROR_NONE) {
         qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "Unable to create tcp 
listener: %s", qd_error_message());
         qd_tcp_listener_decref(li);
         return 0;
     }
     DEQ_ITEM_INIT(li);
     DEQ_INSERT_TAIL(tcp_adaptor->listeners, li);
-    log_tcp_bridge_config(tcp_adaptor->log_source, &li->config, "TcpListener");
+    log_tcp_bridge_config(tcp_adaptor->log_source, li->config, "TcpListener");
     tcp_listener_listen(li);
     return li;
 }
@@ -1206,14 +1255,31 @@ void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, 
void *impl)
         DEQ_REMOVE(tcp_adaptor->listeners, li);
         qd_log(tcp_adaptor->log_source, QD_LOG_INFO,
                "Deleted TcpListener for %s, %s:%s",
-               li->config.address, li->config.host, li->config.port);
+               li->config->address, li->config->host, li->config->port);
         qd_tcp_listener_decref(li);
     }
 }
 
 qd_error_t qd_entity_refresh_tcpListener(qd_entity_t* entity, void *impl)
 {
-    return QD_ERROR_NONE;
+    qd_tcp_listener_t *listener = (qd_tcp_listener_t*)impl;
+
+    LOCK(listener->config->stats_lock);
+    uint64_t bi = listener->config->bytes_in;
+    uint64_t bo = listener->config->bytes_out;
+    uint64_t co = listener->config->connections_opened;
+    uint64_t cc = listener->config->connections_closed;
+    UNLOCK(listener->config->stats_lock);
+
+
+    if (   qd_entity_set_long(entity, "bytesIn",           bi) == 0
+        && qd_entity_set_long(entity, "bytesOut",          bo) == 0
+        && qd_entity_set_long(entity, "connectionsOpened", co) == 0
+        && qd_entity_set_long(entity, "connectionsClosed", cc) == 0)
+    {
+        return QD_ERROR_NONE;
+    }
+    return qd_error_code();
 }
 
 static qd_tcp_connector_t *qd_tcp_connector(qd_server_t *server)
@@ -1222,14 +1288,17 @@ static qd_tcp_connector_t *qd_tcp_connector(qd_server_t 
*server)
     if (!c) return 0;
     ZERO(c);
     sys_atomic_init(&c->ref_count, 1);
-    c->server      = server;
+    c->server = server;
+    c->config = qd_bridge_config();
+
     return c;
 }
 
 void qd_tcp_connector_decref(qd_tcp_connector_t* c)
 {
     if (c && sys_atomic_dec(&c->ref_count) == 1) {
-        free_bridge_config(&c->config);
+        sys_atomic_destroy(&c->ref_count);
+        free_bridge_config(c->config);
         free_qd_tcp_connector_t(c);
     }
 }
@@ -1237,15 +1306,15 @@ void qd_tcp_connector_decref(qd_tcp_connector_t* c)
 qd_tcp_connector_t *qd_dispatch_configure_tcp_connector(qd_dispatch_t *qd, 
qd_entity_t *entity)
 {
     qd_tcp_connector_t *c = qd_tcp_connector(qd->server);
-    if (!c || load_bridge_config(qd, &c->config, entity, true) != 
QD_ERROR_NONE) {
+    if (!c || load_bridge_config(qd, c->config, entity, true) != 
QD_ERROR_NONE) {
         qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "Unable to create tcp 
connector: %s", qd_error_message());
         qd_tcp_connector_decref(c);
         return 0;
     }
     DEQ_ITEM_INIT(c);
     DEQ_INSERT_TAIL(tcp_adaptor->connectors, c);
-    log_tcp_bridge_config(tcp_adaptor->log_source, &c->config, "TcpConnector");
-    c->dispatcher = qdr_tcp_connection_egress(&(c->config), c->server, NULL);
+    log_tcp_bridge_config(tcp_adaptor->log_source, c->config, "TcpConnector");
+    c->dispatcher = qdr_tcp_connection_egress(c->config, c->server, NULL);
     return c;
 }
 
@@ -1264,7 +1333,7 @@ void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, 
void *impl)
         //deliveries out to live connnections:
         qd_log(tcp_adaptor->log_source, QD_LOG_INFO,
                "Deleted TcpConnector for %s, %s:%s",
-               ct->config.address, ct->config.host, ct->config.port);
+               ct->config->address, ct->config->host, ct->config->port);
         close_egress_dispatcher((qdr_tcp_connection_t*) ct->dispatcher);
         DEQ_REMOVE(tcp_adaptor->connectors, ct);
         qd_tcp_connector_decref(ct);
@@ -1273,7 +1342,24 @@ void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, 
void *impl)
 
 qd_error_t qd_entity_refresh_tcpConnector(qd_entity_t* entity, void *impl)
 {
-    return QD_ERROR_NONE;
+    qd_tcp_connector_t *connector = (qd_tcp_connector_t*)impl;
+
+    LOCK(connector->config->stats_lock);
+    uint64_t bi = connector->config->bytes_in;
+    uint64_t bo = connector->config->bytes_out;
+    uint64_t co = connector->config->connections_opened;
+    uint64_t cc = connector->config->connections_closed;
+    UNLOCK(connector->config->stats_lock);
+
+
+    if (   qd_entity_set_long(entity, "bytesIn",           bi) == 0
+        && qd_entity_set_long(entity, "bytesOut",          bo) == 0
+        && qd_entity_set_long(entity, "connectionsOpened", co) == 0
+        && qd_entity_set_long(entity, "connectionsClosed", cc) == 0)
+    {
+        return QD_ERROR_NONE;
+    }
+    return qd_error_code();
 }
 
 static void qdr_tcp_first_attach(void *context, qdr_connection_t *conn, 
qdr_link_t *link,
@@ -1442,7 +1528,7 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t 
*link, qdr_delivery_t
         if (tc->egress_dispatcher) {
             qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
                    DLV_FMT" tcp_adaptor initiating egress connection", 
DLV_ARGS(delivery));
-            qdr_tcp_connection_egress(&(tc->config), tc->server, delivery);
+            qdr_tcp_connection_egress(tc->bridge, tc->server, delivery);
             return QD_DELIVERY_MOVED_TO_NEW_LINK;
         } else if (!tc->outstream) {
             tc->outstream = delivery;
@@ -1603,14 +1689,14 @@ static void qdr_tcp_activate(void *notused, 
qdr_connection_t *c)
     void *context = qdr_connection_get_context(c);
     if (context) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
-        sys_mutex_lock(conn->activation_lock);
+        LOCK(conn->activation_lock);
         if (conn->pn_raw_conn && !(IS_ATOMIC_FLAG_SET(&conn->raw_closed_read) 
&& IS_ATOMIC_FLAG_SET(&conn->raw_closed_write))) {
             qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
                    "[C%"PRIu64"] qdr_tcp_activate: call 
pn_raw_connection_wake()", conn->conn_id);
             pn_raw_connection_wake(conn->pn_raw_conn);
-            sys_mutex_unlock(conn->activation_lock);
+            UNLOCK(conn->activation_lock);
         } else if (conn->activate_timer) {
-            sys_mutex_unlock(conn->activation_lock);
+            UNLOCK(conn->activation_lock);
             // On egress, the raw connection is only created once the
             // first part of the message encapsulating the
             // client->server half of the stream has been
@@ -1621,7 +1707,7 @@ static void qdr_tcp_activate(void *notused, 
qdr_connection_t *c)
                    "[C%"PRIu64"] qdr_tcp_activate: schedule activate_timer", 
conn->conn_id);
             qd_timer_schedule(conn->activate_timer, 0);
         } else {
-            sys_mutex_unlock(conn->activation_lock);
+            UNLOCK(conn->activation_lock);
             qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
                    "[C%"PRIu64"] qdr_tcp_activate: Cannot activate", 
conn->conn_id);
         }
@@ -1677,7 +1763,7 @@ static void qdr_tcp_adaptor_final(void *adaptor_context)
     qd_tcp_listener_t *tl = DEQ_HEAD(adaptor->listeners);
     while (tl) {
         qd_tcp_listener_t *next = DEQ_NEXT(tl);
-        free_bridge_config(&tl->config);
+        free_bridge_config(tl->config);
         free_qd_tcp_listener_t(tl);
         tl = next;
     }
@@ -1685,7 +1771,7 @@ static void qdr_tcp_adaptor_final(void *adaptor_context)
     qd_tcp_connector_t *tr = DEQ_HEAD(adaptor->connectors);
     while (tr) {
         qd_tcp_connector_t *next = DEQ_NEXT(tr);
-        free_bridge_config(&tr->config);
+        free_bridge_config(tr->config);
         free_qdr_tcp_connection((qdr_tcp_connection_t*) tr->dispatcher);
         free_qd_tcp_connector_t(tr);
         tr = next;
@@ -1758,7 +1844,7 @@ static void insert_column(qdr_core_t *core, 
qdr_tcp_connection_t *conn, int col,
     }
 
     case QDR_TCP_CONNECTION_ADDRESS:
-        qd_compose_insert_string(body, conn->config.address);
+        qd_compose_insert_string(body, conn->bridge->address);
         break;
 
     case QDR_TCP_CONNECTION_HOST:
@@ -1944,7 +2030,7 @@ static void qdr_add_tcp_connection_CT(qdr_core_t *core, 
qdr_action_t *action, bo
         DEQ_INSERT_TAIL(tcp_adaptor->connections, conn);
         conn->in_list = true;
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] 
qdr_add_tcp_connection_CT %s (%zu)",
-            conn->conn_id, conn->config.host_port, 
DEQ_SIZE(tcp_adaptor->connections));
+            conn->conn_id, conn->bridge->host_port, 
DEQ_SIZE(tcp_adaptor->connections));
     }
 }
 
@@ -1957,7 +2043,7 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, 
qdr_action_t *action, bo
             qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
                    "[C%"PRIu64"] qdr_del_tcp_connection_CT %s deleted. 
bytes_in=%"PRIu64", bytes_out=%"PRId64", "
                    "opened_time=%"PRId64", last_in_time=%"PRId64", 
last_out_time=%"PRId64". Connections remaining %zu",
-                   conn->conn_id, conn->config.host_port,
+                   conn->conn_id, conn->bridge->host_port,
                    conn->bytes_in, conn->bytes_out, conn->opened_time, 
conn->last_in_time, conn->last_out_time,
                    DEQ_SIZE(tcp_adaptor->connections));
         }
diff --git a/src/adaptors/tcp_adaptor.h b/src/adaptors/tcp_adaptor.h
index 87151c8..07f96eb 100644
--- a/src/adaptors/tcp_adaptor.h
+++ b/src/adaptors/tcp_adaptor.h
@@ -36,25 +36,39 @@
 
 typedef struct qd_tcp_listener_t qd_tcp_listener_t;
 typedef struct qd_tcp_connector_t qd_tcp_connector_t;
-typedef struct qd_bridge_config_t qd_bridge_config_t;
+typedef struct qd_tcp_bridge_t qd_tcp_bridge_t;
 
-struct qd_bridge_config_t
+struct qd_tcp_bridge_t
 {
-    char *name;
-    char *address;
-    char *host;
-    char *port;
-    char *site_id;
-    char *host_port;
+    /* Created and referenced by each new listener or connector.
+     * Referenced by all connections created by listener or connector
+     */
+    sys_atomic_t  ref_count;
+    // static configuration defined at listener/connector creation
+    char         *name;
+    char         *address;
+    char         *host;
+    char         *port;
+    char         *site_id;
+    char         *host_port;
+    // run time statistics updated by connections
+    sys_mutex_t  *stats_lock;
+    uint64_t      connections_opened;
+    uint64_t      connections_closed;
+    uint64_t      bytes_in;
+    uint64_t      bytes_out;
 };
 
+DEQ_DECLARE(qd_tcp_bridge_t, qd_bridge_config_list_t);
+ALLOC_DECLARE(qd_tcp_bridge_t);
+
 struct qd_tcp_listener_t
 {
     qd_handler_context_t      context;
     /* May be referenced by connection_manager and pn_listener_t */
     sys_atomic_t              ref_count;
     qd_server_t              *server;
-    qd_bridge_config_t        config;
+    qd_tcp_bridge_t          *config;
     pn_listener_t            *pn_listener;
 
     DEQ_LINKS(qd_tcp_listener_t);
@@ -68,7 +82,7 @@ struct qd_tcp_connector_t
     /* May be referenced by connection_manager, timer and pn_connection_t */
     sys_atomic_t              ref_count;
     qd_server_t              *server;
-    qd_bridge_config_t        config;
+    qd_tcp_bridge_t          *config;
     void                     *dispatcher;
 
     DEQ_LINKS(qd_tcp_connector_t);
diff --git a/tests/system_tests_tcp_adaptor.py 
b/tests/system_tests_tcp_adaptor.py
index 5da2b6c..4ee9cea 100644
--- a/tests/system_tests_tcp_adaptor.py
+++ b/tests/system_tests_tcp_adaptor.py
@@ -23,6 +23,7 @@ from __future__ import absolute_import
 from __future__ import print_function
 
 import io
+import json
 import os
 import sys
 import time
@@ -38,6 +39,7 @@ from system_test import TIMEOUT
 from system_test import unittest
 
 from subprocess import PIPE
+from subprocess import STDOUT
 
 # Tests in this file are organized by classes that inherit TestCase.
 # The first instance is TcpAdaptor(TestCase).
@@ -547,6 +549,19 @@ class TcpAdaptor(TestCase):
             cls.echo_server_NS_CONN_STALL.wait()
         super(TcpAdaptor, cls).tearDownClass()
 
+    def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, 
address=None):
+        p = self.popen(
+            ['qdmanage'] + cmd.split(' ') + ['--bus', address or 
str(self.router_dict['INTA'].addresses[0]),
+                                             '--indent=-1', '--timeout', 
str(TIMEOUT)],
+            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
+            universal_newlines=True)
+        out = p.communicate(input)[0]
+        try:
+            p.teardown()
+        except Exception as e:
+            raise Exception(out if out else str(e))
+        return out
+
     class EchoPair():
         """
         For the concurrent tcp tests this class describes one of the client-
@@ -934,6 +949,32 @@ class TcpAdaptor(TestCase):
         self.ncat_runner(name, "EA1",  "EC2", self.logger)
         self.logger.log("TCP_TEST Stop %s SUCCESS" % name)
 
+    # connector/listener stats
+    def test_80_stats(self):
+        tname = "test_80 check stats in qdmanage"
+        self.logger.log(tname + " START")
+        # Verify listener stats
+        query_command = 'QUERY --type=tcpListener'
+        outputs = json.loads(self.run_qdmanage(query_command))
+        for output in outputs:
+            if output['name'].startswith("ES"):
+                # Check only echo server listeners
+                assert("connectionsOpened" in output)
+                assert(output["connectionsOpened"] > 0)
+                assert(output["connectionsOpened"] == 
output["connectionsClosed"])
+                assert(output["bytesIn"] == output["bytesOut"])
+        # Verify connector stats
+        query_command = 'QUERY --type=tcpConnector'
+        outputs = json.loads(self.run_qdmanage(query_command))
+        for output in outputs:
+            assert(output['address'].startswith("ES"))
+            assert("connectionsOpened" in output)
+            assert(output["connectionsOpened"] > 0)
+            # egress_dispatcher connection opens and should never close
+            assert(output["connectionsOpened"] == output["connectionsClosed"] 
+ 1)
+            assert(output["bytesIn"] == output["bytesOut"])
+        self.logger.log(tname + " SUCCESS")
+
 
 class TcpAdaptorManagementTest(TestCase):
     """

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to