Repository: qpid-dispatch
Updated Branches:
  refs/heads/master a8a218d03 -> cc8f70a6c


DISPATCH-449 - Fixed leak of link resources for routed links.
               Fixed leak of pn_session_t for outbound links.
               Added timeout to the delivery-tag test for routed links.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/cc8f70a6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/cc8f70a6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/cc8f70a6

Branch: refs/heads/master
Commit: cc8f70a6caf5c2177ff325078c766af09144549d
Parents: a8a218d
Author: Ted Ross <[email protected]>
Authored: Thu Aug 4 13:06:09 2016 -0400
Committer: Ted Ross <[email protected]>
Committed: Thu Aug 4 13:08:00 2016 -0400

----------------------------------------------------------------------
 src/container.c                   | 11 +-------
 src/router_core/connections.c     |  9 ++++++
 src/router_node.c                 |  7 +++++
 tests/system_tests_link_routes.py | 51 ++++++++++++++++++++--------------
 4 files changed, 47 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cc8f70a6/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 83d5a93..1c9fec8 100644
--- a/src/container.c
+++ b/src/container.c
@@ -315,17 +315,9 @@ static int close_handler(qd_container_t *container, void* 
conn_context, pn_conne
                 node->ntype->link_detach_handler(node->context, link, QD_LOST);
             }
         }
-        pn_link_close(pn_link);
         pn_link = pn_link_next(pn_link, 0);
     }
 
-    // teardown all sessions
-    pn_session_t *ssn = pn_session_head(conn, 0);
-    while (ssn) {
-        pn_session_close(ssn);
-        ssn = pn_session_next(ssn, 0);
-    }
-
     // close the connection
     pn_connection_close(conn);
     notify_closed(container, qd_conn, conn_context);
@@ -787,10 +779,9 @@ qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, 
qd_direction_t dir, c
     link->close_sess_with_link = true;
 
     //
-    // Keep the borrowed references
+    // Keep the borrowed link reference
     //
     pn_incref(link->pn_link);
-    pn_incref(link->pn_sess);
 
     pn_link_set_context(link->pn_link, link);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cc8f70a6/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 2b95b41..229571e 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1268,6 +1268,15 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, 
qdr_action_t *action, b
     //
     if (link->connected_link) {
         qdr_link_outbound_detach_CT(core, link->connected_link, error, 
QDR_CONDITION_NONE);
+
+        //
+        // If the link is completely detached, release its resources
+        //
+        if (link->detach_count == 2) {
+            qdr_link_cleanup_CT(core, conn, link);
+            free_qdr_link_t(link);
+        }
+
         return;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cc8f70a6/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index c02b016..3dcf960 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -753,6 +753,13 @@ static void CORE_link_detach(void *context, qdr_link_t 
*link, qdr_error_t *error
     qd_link_close(qlink);
 
     //
+    // This is the last event for this link that we are going to send into 
Proton.
+    // Remove the core->proton linkage.  Note that the proton->core linkage 
may still
+    // be intact and needed.
+    //
+    qdr_link_set_context(link, 0);
+
+    //
     // If this is the second detach, free the qd_link
     //
     if (!first)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cc8f70a6/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py 
b/tests/system_tests_link_routes.py
index 0156edd..7f1d117 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -476,9 +476,7 @@ class LinkRoutePatternTest(TestCase):
         qdstat_address = self.routers[2].addresses[0]
         test = DeliveryTagsTest(sender_address, listening_address, 
qdstat_address)
         test.run()
-        self.assertTrue(test.wait_completed)
-        self.assertTrue(test.message_received)
-        self.assertTrue(test.delivery_tag_verified)
+        self.assertEqual(None, test.error)
 
     def test_close_with_unsettled(self):
         test = CloseWithUnsettledTest(self.routers[1].addresses[0], 
self.routers[1].addresses[1])
@@ -511,25 +509,42 @@ class LinkRoutePatternTest(TestCase):
         self.assertEqual(None, test.error)
 
 
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
 class DeliveryTagsTest(MessagingHandler):
     def __init__(self, sender_address, listening_address, qdstat_address):
         super(DeliveryTagsTest, self).__init__()
         self.sender_address = sender_address
         self.listening_address = listening_address
         self.sender = None
-        self.wait_completed = False
-        self.message_received = False
         self.receiver_connection = None
         self.sender_connection = None
         self.qdstat_address = qdstat_address
         self.id = '1235'
         self.times = 1
+        self.sent = 0
+        self.rcvd = 0
         self.delivery_tag_verified = False
         # The delivery tag we are going to send in the transfer frame
         # We will later make sure that the same delivery tag shows up on the 
receiving end in the link routed case.
         self.delivery_tag = '92319'
+        self.error = None
+
+    def timeout(self):
+        self.error = "Timeout expired: sent=%d rcvd=%d" % (self.sent, 
self.rcvd)
+        if self.receiver_connection:
+            self.receiver_connection.close()
+        if self.sender_connection:
+            self.sender_connection.close()
 
     def on_start(self, event):
+        self.timer               = event.reactor.schedule(5, Timeout(self))
         self.receiver_connection = 
event.container.connect(self.listening_address)
 
     def on_connection_remote_open(self, event):
@@ -552,10 +567,10 @@ class DeliveryTagsTest(MessagingHandler):
                 out = 
local_node.read(type='org.apache.qpid.dispatch.router.address', 
name='Dpulp.task').remoteCount
                 if out == 1:
                     continue_loop = False
-                i+=1
-                sleep(0.25)
+                else:
+                    i += 1
+                    sleep(0.25)
 
-            self.wait_completed = True
             self.sender_connection = 
event.container.connect(self.sender_address)
             self.sender = 
event.container.create_sender(self.sender_connection, "pulp.task", 
options=AtMostOnce())
 
@@ -563,31 +578,25 @@ class DeliveryTagsTest(MessagingHandler):
         if self.times == 1:
             msg = Message(body="Hello World")
             self.sender.send(msg, tag=self.delivery_tag)
-            self.sender_connection.close()
-            self.times +=1
+            self.times += 1
+            self.sent += 1
 
     def on_message(self, event):
         if "Hello World" == event.message.body:
-            self.message_received = True
+            self.rcvd += 1
 
         # If the tag on the delivery is the same as the tag we sent with the 
initial transfer, it means
         # that the router has propagated the delivery tag successfully because 
of link routing.
-        if self.delivery_tag == event.delivery.tag:
-            self.delivery_tag_verified = True
+        if self.delivery_tag != event.delivery.tag:
+            self.error = "Delivery-tag: expected:%r got:%r" % 
(self.delivery_tag, event.delivery.tag)
         self.receiver_connection.close()
+        self.sender_connection.close()
+        self.timer.cancel()
 
     def run(self):
         Container(self).run()
 
 
-class Timeout(object):
-    def __init__(self, parent):
-        self.parent = parent
-
-    def on_timer_task(self, event):
-        self.parent.timeout()
-
-
 class CloseWithUnsettledTest(MessagingHandler):
     ##
     ## This test sends a message across an attach-routed link.  While the 
message


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to