Repository: qpid-dispatch Updated Branches: refs/heads/master 375c3edcd -> 00b47f15b
DISPATCH-295 - Ensure that unsettled messages are properly tracked across routed links. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/00b47f15 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/00b47f15 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/00b47f15 Branch: refs/heads/master Commit: 00b47f15b150d9e4f0838795404779b22609e2b2 Parents: 375c3ed Author: Ted Ross <[email protected]> Authored: Mon May 2 11:04:31 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Mon May 2 11:04:31 2016 -0400 ---------------------------------------------------------------------- src/router_core/transfer.c | 10 ++++++ tests/system_tests_link_routes.py | 64 ++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00b47f15/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index acb0b1b..ffe11c9 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -410,11 +410,21 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis // if (link->connected_link) { qdr_delivery_t *peer = qdr_forward_new_delivery_CT(core, dlv, link->connected_link, dlv->msg); + + // + // Copy the delivery tag. For link-routing, the delivery tag must be preserved. + // peer->tag_length = action->args.connection.tag_length; memcpy(peer->tag, action->args.connection.tag, peer->tag_length); + qdr_forward_deliver_CT(core, link->connected_link, peer); qd_message_free(dlv->msg); dlv->msg = 0; + link->total_deliveries++; + if (!dlv->settled) { + DEQ_INSERT_TAIL(link->unsettled, dlv); + dlv->where = QDR_DELIVERY_IN_UNSETTLED; + } return; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/00b47f15/tests/system_tests_link_routes.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index 9475460..6f49c12 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -447,6 +447,12 @@ class LinkRoutePatternTest(TestCase): self.assertTrue(test.message_received) self.assertTrue(test.delivery_tag_verified) + def test_close_with_unsettled(self): + test = CloseWithUnsettledTest(self.routers[1].addresses[0], self.routers[1].addresses[1]) + test.run() + self.assertEqual(None, test.error) + + class DeliveryTagsTest(MessagingHandler): def __init__(self, sender_address, listening_address, qdstat_address): super(DeliveryTagsTest, self).__init__() @@ -512,5 +518,63 @@ class DeliveryTagsTest(MessagingHandler): def run(self): Container(self).run() + +class Timeout(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.timeout() + + +class CloseWithUnsettledTest(MessagingHandler): + ## + ## This test sends a message across an attach-routed link. While the message + ## is unsettled, the client link is closed. The test is ensuring that the + ## router does not crash during the closing of the links. + ## + def __init__(self, normal_addr, route_addr): + super(CloseWithUnsettledTest, self).__init__(prefetch=0, auto_accept=False) + self.normal_addr = normal_addr + self.route_addr = route_addr + self.dest = "pulp.task.CWUtest" + self.error = None + + def timeout(self): + self.error = "Timeout Expired - Check for cores" + self.conn_normal.close() + self.conn_route.close() + + def on_start(self, event): + self.timer = event.reactor.schedule(5, Timeout(self)) + self.conn_route = event.container.connect(self.route_addr) + + def on_connection_opened(self, event): + if event.connection == self.conn_route: + self.conn_normal = event.container.connect(self.normal_addr) + elif event.connection == self.conn_normal: + self.sender = event.container.create_sender(self.conn_normal, self.dest) + + def on_connection_closed(self, event): + self.conn_route.close() + self.timer.cancel() + + def on_link_opened(self, event): + if event.receiver: + self.receiver = event.receiver + self.receiver.flow(1) + + def on_sendable(self, event): + msg = Message(body="CloseWithUnsettled") + event.sender.send(msg) + + def on_message(self, event): + self.conn_normal.close() + + def run(self): + Container(self).run() + + + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
