Repository: qpid-dispatch
Updated Branches:
refs/heads/master a8a218d03 -> cc8f70a6c
DISPATCH-449 - Fixed leak of link resources for routed links.
Fixed leak of pn_session_t for outbound links.
Added timeout to the delivery-tag test for routed links.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/cc8f70a6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/cc8f70a6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/cc8f70a6
Branch: refs/heads/master
Commit: cc8f70a6caf5c2177ff325078c766af09144549d
Parents: a8a218d
Author: Ted Ross <[email protected]>
Authored: Thu Aug 4 13:06:09 2016 -0400
Committer: Ted Ross <[email protected]>
Committed: Thu Aug 4 13:08:00 2016 -0400
----------------------------------------------------------------------
src/container.c | 11 +-------
src/router_core/connections.c | 9 ++++++
src/router_node.c | 7 +++++
tests/system_tests_link_routes.py | 51 ++++++++++++++++++++--------------
4 files changed, 47 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cc8f70a6/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index 83d5a93..1c9fec8 100644
--- a/src/container.c
+++ b/src/container.c
@@ -315,17 +315,9 @@ static int close_handler(qd_container_t *container, void*
conn_context, pn_conne
node->ntype->link_detach_handler(node->context, link, QD_LOST);
}
}
- pn_link_close(pn_link);
pn_link = pn_link_next(pn_link, 0);
}
- // teardown all sessions
- pn_session_t *ssn = pn_session_head(conn, 0);
- while (ssn) {
- pn_session_close(ssn);
- ssn = pn_session_next(ssn, 0);
- }
-
// close the connection
pn_connection_close(conn);
notify_closed(container, qd_conn, conn_context);
@@ -787,10 +779,9 @@ qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn,
qd_direction_t dir, c
link->close_sess_with_link = true;
//
- // Keep the borrowed references
+ // Keep the borrowed link reference
//
pn_incref(link->pn_link);
- pn_incref(link->pn_sess);
pn_link_set_context(link->pn_link, link);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cc8f70a6/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 2b95b41..229571e 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1268,6 +1268,15 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core,
qdr_action_t *action, b
//
if (link->connected_link) {
qdr_link_outbound_detach_CT(core, link->connected_link, error,
QDR_CONDITION_NONE);
+
+ //
+ // If the link is completely detached, release its resources
+ //
+ if (link->detach_count == 2) {
+ qdr_link_cleanup_CT(core, conn, link);
+ free_qdr_link_t(link);
+ }
+
return;
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cc8f70a6/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index c02b016..3dcf960 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -753,6 +753,13 @@ static void CORE_link_detach(void *context, qdr_link_t
*link, qdr_error_t *error
qd_link_close(qlink);
//
+ // This is the last event for this link that we are going to send into
Proton.
+ // Remove the core->proton linkage. Note that the proton->core linkage
may still
+ // be intact and needed.
+ //
+ qdr_link_set_context(link, 0);
+
+ //
// If this is the second detach, free the qd_link
//
if (!first)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/cc8f70a6/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py
b/tests/system_tests_link_routes.py
index 0156edd..7f1d117 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -476,9 +476,7 @@ class LinkRoutePatternTest(TestCase):
qdstat_address = self.routers[2].addresses[0]
test = DeliveryTagsTest(sender_address, listening_address,
qdstat_address)
test.run()
- self.assertTrue(test.wait_completed)
- self.assertTrue(test.message_received)
- self.assertTrue(test.delivery_tag_verified)
+ self.assertEqual(None, test.error)
def test_close_with_unsettled(self):
test = CloseWithUnsettledTest(self.routers[1].addresses[0],
self.routers[1].addresses[1])
@@ -511,25 +509,42 @@ class LinkRoutePatternTest(TestCase):
self.assertEqual(None, test.error)
+class Timeout(object):
+ def __init__(self, parent):
+ self.parent = parent
+
+ def on_timer_task(self, event):
+ self.parent.timeout()
+
+
class DeliveryTagsTest(MessagingHandler):
def __init__(self, sender_address, listening_address, qdstat_address):
super(DeliveryTagsTest, self).__init__()
self.sender_address = sender_address
self.listening_address = listening_address
self.sender = None
- self.wait_completed = False
- self.message_received = False
self.receiver_connection = None
self.sender_connection = None
self.qdstat_address = qdstat_address
self.id = '1235'
self.times = 1
+ self.sent = 0
+ self.rcvd = 0
self.delivery_tag_verified = False
# The delivery tag we are going to send in the transfer frame
# We will later make sure that the same delivery tag shows up on the
receiving end in the link routed case.
self.delivery_tag = '92319'
+ self.error = None
+
+ def timeout(self):
+ self.error = "Timeout expired: sent=%d rcvd=%d" % (self.sent,
self.rcvd)
+ if self.receiver_connection:
+ self.receiver_connection.close()
+ if self.sender_connection:
+ self.sender_connection.close()
def on_start(self, event):
+ self.timer = event.reactor.schedule(5, Timeout(self))
self.receiver_connection =
event.container.connect(self.listening_address)
def on_connection_remote_open(self, event):
@@ -552,10 +567,10 @@ class DeliveryTagsTest(MessagingHandler):
out =
local_node.read(type='org.apache.qpid.dispatch.router.address',
name='Dpulp.task').remoteCount
if out == 1:
continue_loop = False
- i+=1
- sleep(0.25)
+ else:
+ i += 1
+ sleep(0.25)
- self.wait_completed = True
self.sender_connection =
event.container.connect(self.sender_address)
self.sender =
event.container.create_sender(self.sender_connection, "pulp.task",
options=AtMostOnce())
@@ -563,31 +578,25 @@ class DeliveryTagsTest(MessagingHandler):
if self.times == 1:
msg = Message(body="Hello World")
self.sender.send(msg, tag=self.delivery_tag)
- self.sender_connection.close()
- self.times +=1
+ self.times += 1
+ self.sent += 1
def on_message(self, event):
if "Hello World" == event.message.body:
- self.message_received = True
+ self.rcvd += 1
# If the tag on the delivery is the same as the tag we sent with the
initial transfer, it means
# that the router has propagated the delivery tag successfully because
of link routing.
- if self.delivery_tag == event.delivery.tag:
- self.delivery_tag_verified = True
+ if self.delivery_tag != event.delivery.tag:
+ self.error = "Delivery-tag: expected:%r got:%r" %
(self.delivery_tag, event.delivery.tag)
self.receiver_connection.close()
+ self.sender_connection.close()
+ self.timer.cancel()
def run(self):
Container(self).run()
-class Timeout(object):
- def __init__(self, parent):
- self.parent = parent
-
- def on_timer_task(self, event):
- self.parent.timeout()
-
-
class CloseWithUnsettledTest(MessagingHandler):
##
## This test sends a message across an attach-routed link. While the
message
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]