Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 0f092ac43 -> 936c46b19


DISPATCH-1069 - (mostly from gmurthy) - Re-established the deferred deletion of 
proton links and sessions to prevent double-frees and memory growth.
This closes #344


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/936c46b1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/936c46b1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/936c46b1

Branch: refs/heads/master
Commit: 936c46b193c07a7fcfc89e71c490ad092e7cffa5
Parents: 0f092ac
Author: Ted Ross <[email protected]>
Authored: Thu Jul 19 15:33:30 2018 -0400
Committer: Ted Ross <[email protected]>
Committed: Thu Jul 19 15:33:30 2018 -0400

----------------------------------------------------------------------
 src/container.c      | 98 +++++++++++++++++++++++++++++++++++++++--------
 src/router_node.c    |  4 ++
 src/server.c         | 41 +++++++++++++++-----
 src/server_private.h | 58 ++++++++++++++--------------
 4 files changed, 149 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/936c46b1/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 52231b4..d4fbe51 100644
--- a/src/container.c
+++ b/src/container.c
@@ -88,6 +88,8 @@ struct qd_container_t {
     qd_link_list_t        links;
 };
 
+ALLOC_DEFINE(qd_pn_free_link_session_t);
+
 static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link)
 {
     qd_node_t *node = container->default_node;
@@ -332,18 +334,86 @@ static void writable_handler(qd_container_t *container, 
pn_connection_t *conn, q
 }
 
 
-void qd_container_handle_event(qd_container_t *container, pn_event_t *event)
+/**
+ * Returns true if the free_link already exists in free_link_list, false 
otherwise
+ */
+static bool link_exists(qd_pn_free_link_session_list_t *free_list, pn_link_t 
*free_link)
 {
-    pn_connection_t *conn = pn_event_connection(event);
+    qd_pn_free_link_session_t *free_item = DEQ_HEAD(*free_list);
+    while(free_item) {
+        if (free_item->pn_link == free_link)
+            return true;
+        free_item = DEQ_NEXT(free_item);
+    }
+    return false;
+}
 
-    if (!conn)
-        return;
+/**
+ * Returns true if the free_session already exists in free_session_list, false 
otherwise
+*/
+static bool session_exists(qd_pn_free_link_session_list_t *free_list, 
pn_session_t *free_session)
+{
+    qd_pn_free_link_session_t *free_item = DEQ_HEAD(*free_list);
+    while(free_item) {
+        if (free_item->pn_session == free_session)
+            return true;
+        free_item = DEQ_NEXT(free_item);
+    }
+    return false;
+}
+
+static void add_session_to_free_list(qd_pn_free_link_session_list_t 
*free_link_session_list, pn_session_t *ssn)
+{
+    if (!session_exists(free_link_session_list, ssn)) {
+        qd_pn_free_link_session_t *to_free = new_qd_pn_free_link_session_t();
+        DEQ_ITEM_INIT(to_free);
+        to_free->pn_session = ssn;
+        to_free->pn_link = 0;
+        DEQ_INSERT_TAIL(*free_link_session_list, to_free);
+    }
+}
 
-    qd_connection_t *qd_conn = pn_connection_get_context(conn);
-    pn_session_t    *ssn = NULL;
-    pn_link_t       *pn_link = NULL;
-    qd_link_t       *qd_link = NULL;
-    pn_delivery_t   *delivery = NULL;
+static void add_link_to_free_list(qd_pn_free_link_session_list_t 
*free_link_session_list, pn_link_t *pn_link)
+{
+    if (!link_exists(free_link_session_list, pn_link)) {
+        qd_pn_free_link_session_t *to_free = new_qd_pn_free_link_session_t();
+        DEQ_ITEM_INIT(to_free);
+        to_free->pn_link = pn_link;
+        to_free->pn_session = 0;
+        DEQ_INSERT_TAIL(*free_link_session_list, to_free);
+    }
+
+}
+
+
+/*
+ * The need for these lists may indicate a router bug, where the router is
+ * using links/sessions after they are freed. Investigate and simplify if
+ * possible.
+*/
+void qd_conn_event_batch_complete(qd_container_t *container, qd_connection_t 
*qd_conn)
+{
+    qd_pn_free_link_session_t *to_free = 
DEQ_HEAD(qd_conn->free_link_session_list);
+
+    while(to_free) {
+        if (to_free->pn_link)
+            pn_link_free(to_free->pn_link);
+        if (to_free->pn_session)
+            pn_session_free(to_free->pn_session);
+        DEQ_REMOVE_HEAD(qd_conn->free_link_session_list);
+        free_qd_pn_free_link_session_t(to_free);
+        to_free = DEQ_HEAD(qd_conn->free_link_session_list);
+    }
+}
+
+
+void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
+                               pn_connection_t *conn, qd_connection_t *qd_conn)
+{
+    pn_session_t  *ssn = NULL;
+    pn_link_t     *pn_link = NULL;
+    qd_link_t     *qd_link = NULL;
+    pn_delivery_t *delivery = NULL;
 
     switch (pn_event_type(event)) {
 
@@ -406,7 +476,7 @@ void qd_container_handle_event(qd_container_t *container, 
pn_event_t *event)
         }
 
         if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
-            pn_session_free(ssn);
+            add_session_to_free_list(&qd_conn->free_link_session_list, ssn);
         }
         break;
 
@@ -458,7 +528,7 @@ void qd_container_handle_event(qd_container_t *container, 
pn_event_t *event)
                 pn_session_close(ssn);
             }
             else if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | 
PN_REMOTE_CLOSED)) {
-                pn_session_free(ssn);
+                add_session_to_free_list(&qd_conn->free_link_session_list, 
ssn);
             }
         }
         break;
@@ -517,8 +587,7 @@ void qd_container_handle_event(qd_container_t *container, 
pn_event_t *event)
                 }
 
                 if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) {
-                    pn_link_set_context(pn_link, NULL);
-                    pn_link_free(pn_link);
+                    add_link_to_free_list(&qd_conn->free_link_session_list, 
pn_link);
                 }
                 if (node) {
                     node->ntype->link_detach_handler(node->context, qd_link, 
dt);
@@ -531,8 +600,7 @@ void qd_container_handle_event(qd_container_t *container, 
pn_event_t *event)
     case PN_LINK_LOCAL_CLOSE:
         pn_link = pn_event_link(event);
         if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
-            pn_link_set_context(pn_link, NULL);
-            pn_link_free(pn_link);
+            add_link_to_free_list(&qd_conn->free_link_session_list, pn_link);
         }
         break;
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/936c46b1/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 71a545d..585dd6f 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -295,6 +295,10 @@ static void AMQP_rx_handler(void* context, qd_link_t *link)
     qd_router_t    *router       = (qd_router_t*) context;
     pn_link_t      *pn_link      = qd_link_pn(link);
     assert(pn_link);
+
+    if (!pn_link)
+        return;
+
     pn_delivery_t  *pnd          = pn_link_current(pn_link);
     if (!pnd)
         return;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/936c46b1/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index b698fab..8fc9511 100644
--- a/src/server.c
+++ b/src/server.c
@@ -522,6 +522,7 @@ qd_connection_t *qd_server_connection(qd_server_t *server, 
qd_server_config_t *c
     pn_connection_set_context(ctx->pn_conn, ctx);
     DEQ_ITEM_INIT(ctx);
     DEQ_INIT(ctx->deferred_calls);
+    DEQ_INIT(ctx->free_link_session_list);
     sys_mutex_lock(server->lock);
     ctx->connection_id = server->next_connection_id++;
     DEQ_INSERT_TAIL(server->conn_list, ctx);
@@ -692,7 +693,8 @@ static void invoke_deferred_calls(qd_connection_t *conn, 
bool discard)
     sys_mutex_unlock(conn->deferred_call_lock);
 }
 
-void qd_container_handle_event(qd_container_t *container, pn_event_t *event);
+void qd_container_handle_event(qd_container_t *container, pn_event_t *event, 
pn_connection_t *pn_conn, qd_connection_t *qd_conn);
+void qd_conn_event_batch_complete(qd_container_t *container, qd_connection_t 
*qd_conn);
 
 static void handle_listener(pn_event_t *e, qd_server_t *qd_server) {
     qd_log_source_t *log = qd_server->log_source;
@@ -856,13 +858,12 @@ static void qd_increment_conn_index(qd_connection_t *ctx)
 /* Events involving a connection or listener are serialized by the proactor so
  * only one event per connection / listener will be processed at a time.
  */
-static bool handle(qd_server_t *qd_server, pn_event_t *e) {
-    pn_connection_t *pn_conn = pn_event_connection(e);
+static bool handle(qd_server_t *qd_server, pn_event_t *e, pn_connection_t 
*pn_conn, qd_connection_t *ctx)
+{
     if (pn_conn && qdr_is_authentication_service_connection(pn_conn)) {
         qdr_handle_authentication_service_connection_event(e);
         return true;
     }
-    qd_connection_t *ctx = pn_conn ? (qd_connection_t*) 
pn_connection_get_context(pn_conn) : NULL;
 
     switch (pn_event_type(e)) {
 
@@ -943,8 +944,8 @@ static bool handle(qd_server_t *qd_server, pn_event_t *e) {
         break;
     } // Switch event type
 
-    /* TODO aconway 2017-04-18: fold the container handler into the server */
-    qd_container_handle_event(qd_server->container, e);
+    if (ctx)
+        qd_container_handle_event(qd_server->container, e, pn_conn, ctx);
 
     /* Free the connection after all other processing is complete */
     if (ctx && pn_event_type(e) == PN_TRANSPORT_CLOSED) {
@@ -961,9 +962,29 @@ static void *thread_run(void *arg)
     while (running) {
         pn_event_batch_t *events = pn_proactor_wait(qd_server->proactor);
         pn_event_t * e;
+        qd_connection_t *qd_conn = 0;
+        pn_connection_t *pn_conn = 0;
+
         while (running && (e = pn_event_batch_next(events))) {
-            running = handle(qd_server, e);
+            pn_connection_t *conn = pn_event_connection(e);
+
+            if (!pn_conn)
+                pn_conn = conn;
+            assert(pn_conn == conn);
+
+            if (!qd_conn)
+                qd_conn = !!pn_conn ? (qd_connection_t*) 
pn_connection_get_context(pn_conn) : 0;
+
+            running = handle(qd_server, e, conn, qd_conn);
         }
+
+        //
+        // Notify the container that the batch is complete so it can do 
after-batch
+        // processing.
+        //
+        if (qd_conn)
+            qd_conn_event_batch_complete(qd_server->container, qd_conn);
+
         pn_proactor_done(qd_server->proactor, events);
     }
     return NULL;
@@ -1200,7 +1221,7 @@ void qd_server_free(qd_server_t *qd_server)
 
 void qd_server_set_container(qd_dispatch_t *qd, qd_container_t *container)
 {
-    qd->server->container              = container;
+    qd->server->container = container;
 }
 
 
@@ -1473,7 +1494,9 @@ const char* qd_connection_remote_ip(const qd_connection_t 
*c) {
 
 /* Expose event handling for HTTP connections */
 void qd_connection_handle(qd_connection_t *c, pn_event_t *e) {
-    handle(c->server, e);
+    pn_connection_t *pn_conn = pn_event_connection(e);
+    qd_connection_t *qd_conn = !!pn_conn ? (qd_connection_t*) 
pn_connection_get_context(pn_conn) : 0;
+    handle(c->server, e, pn_conn, qd_conn);
 }
 
 bool qd_connection_strip_annotations_in(const qd_connection_t *c) {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/936c46b1/src/server_private.h
----------------------------------------------------------------------
diff --git a/src/server_private.h b/src/server_private.h
index d80a1fa..25cc901 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -133,34 +133,35 @@ DEQ_DECLARE(qd_connector_t, qd_connector_list_t);
  */
 struct qd_connection_t {
     DEQ_LINKS(qd_connection_t);
-    char                     *name;
-    qd_server_t              *server;
-    bool                      opened; // An open callback was invoked for this 
connection
-    bool                      closed;
-    int                       enqueued;
-    qd_timer_t               *timer;   // Timer for initial-setup
-    pn_connection_t          *pn_conn;
-    pn_session_t             *pn_sess;
-    pn_ssl_t                 *ssl;
-    qd_listener_t            *listener;
-    qd_connector_t           *connector;
-    void                     *context; // context from listener or connector
-    void                     *user_context;
-    void                     *link_context; // Context shared by this 
connection's links
-    uint64_t                  connection_id; // A unique identifier for the 
qd_connection_t. The underlying pn_connection already has one but it is long 
and clunky.
-    const char               *user_id; // A unique identifier for the user on 
the connection. This is currently populated  from the client ssl cert. See 
ssl_uid_format in server.h for more info
-    bool                      free_user_id;
-    qd_policy_settings_t     *policy_settings;
-    int                       n_sessions;
-    int                       n_senders;
-    int                       n_receivers;
-    void                     *open_container;
-    qd_deferred_call_list_t   deferred_calls;
-    sys_mutex_t              *deferred_call_lock;
-    bool                      policy_counted;
-    char                     *role;  //The specified role of the connection, 
e.g. "normal", "inter-router", "route-container" etc.
-    bool                      strip_annotations_in;
-    bool                      strip_annotations_out;
+    char                            *name;
+    qd_server_t                     *server;
+    bool                            opened; // An open callback was invoked 
for this connection
+    bool                            closed;
+    int                             enqueued;
+    qd_timer_t                      *timer;   // Timer for initial-setup
+    pn_connection_t                 *pn_conn;
+    pn_session_t                    *pn_sess;
+    pn_ssl_t                        *ssl;
+    qd_listener_t                   *listener;
+    qd_connector_t                  *connector;
+    void                            *context; // context from listener or 
connector
+    void                            *user_context;
+    void                            *link_context; // Context shared by this 
connection's links
+    uint64_t                        connection_id; // A unique identifier for 
the qd_connection_t. The underlying pn_connection already has one but it is 
long and clunky.
+    const char                      *user_id; // A unique identifier for the 
user on the connection. This is currently populated  from the client ssl cert. 
See ssl_uid_format in server.h for more info
+    bool                            free_user_id;
+    qd_policy_settings_t            *policy_settings;
+    int                             n_sessions;
+    int                             n_senders;
+    int                             n_receivers;
+    void                            *open_container;
+    qd_deferred_call_list_t         deferred_calls;
+    sys_mutex_t                     *deferred_call_lock;
+    bool                            policy_counted;
+    char                            *role;  //The specified role of the 
connection, e.g. "normal", "inter-router", "route-container" etc.
+    qd_pn_free_link_session_list_t  free_link_session_list;
+    bool                            strip_annotations_in;
+    bool                            strip_annotations_out;
     void (*wake)(qd_connection_t*); /* Wake method, different for HTTP vs. 
proactor */
     char rhost[NI_MAXHOST];     /* Remote host numeric IP for incoming 
connections */
     char rhost_port[NI_MAXHOST+NI_MAXSERV]; /* Remote host:port for incoming 
connections */
@@ -172,5 +173,6 @@ ALLOC_DECLARE(qd_listener_t);
 ALLOC_DECLARE(qd_deferred_call_t);
 ALLOC_DECLARE(qd_connector_t);
 ALLOC_DECLARE(qd_connection_t);
+ALLOC_DECLARE(qd_pn_free_link_session_t);
 
 #endif


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to