Repository: qpid-dispatch Updated Branches: refs/heads/master aaf5f2cb7 -> a6c4b94b8
DISPATCH-1055: Fix credit propagation over link route after drain cycle Reset incremental credit given to core at end of drain cycle. Fix logic computing drain_changed flag. This closes #340 Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/a6c4b94b Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/a6c4b94b Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/a6c4b94b Branch: refs/heads/master Commit: a6c4b94b87f4b11981b1a549ddee6f614ee6e0a3 Parents: aaf5f2c Author: Chuck Rolke <[email protected]> Authored: Wed Jul 11 08:47:07 2018 -0400 Committer: Chuck Rolke <[email protected]> Committed: Wed Jul 11 08:47:07 2018 -0400 ---------------------------------------------------------------------- src/router_core/transfer.c | 14 ++-- tests/system_tests_drain.py | 49 ++++++++++- tests/system_tests_drain_support.py | 139 +++++++++++++++++++++++++++++++ 3 files changed, 193 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6c4b94b/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 5ee4ae3..a2374fb 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -233,10 +233,14 @@ void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mo // incrementally to the router core. i.e. convert absolute credit to // incremental credit. // - credit -= link->credit_to_core; - if (credit < 0) - credit = 0; - link->credit_to_core += credit; + if (link->drain_mode && !drain_mode) { + link->credit_to_core = 0; // credit calc reset when coming out of drain mode + } else { + credit -= link->credit_to_core; + if (credit < 0) + credit = 0; + link->credit_to_core += credit; + } action->args.connection.link = link; action->args.connection.credit = credit; @@ -1247,7 +1251,7 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bo { assert(link->link_direction == QD_INCOMING); - bool drain_changed = link->drain_mode |= drain; + bool drain_changed = link->drain_mode ^ drain; link->drain_mode = drain; if (link->credit_pending > 0) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6c4b94b/tests/system_tests_drain.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py index b9fd547..1b38aba 100644 --- a/tests/system_tests_drain.py +++ b/tests/system_tests_drain.py @@ -26,27 +26,58 @@ import unittest2 as unittest from system_test import TestCase, Qdrouterd, main_module from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler from system_tests_drain_support import DrainNoMessagesHandler, DrainNoMoreMessagesHandler +from system_tests_drain_support import DrainMessagesMoreHandler +from time import sleep class DrainSupportTest(TestCase): @classmethod def setUpClass(cls): - """Start a router and a messenger""" + """ + Set up two routers: + Router 'test-router' is the system under test. + Router 'broker' acts as a link route sink/source. + The link route uses prefix 'abc'. + """ super(DrainSupportTest, cls).setUpClass() + + test_listener_port = cls.tester.get_port() + broker_listener_port = cls.tester.get_port() + + # Configure and start 'broker' + bname = "broker" + bconfig = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'Broker'}), + ('listener', {'role': 'normal', + 'host': '0.0.0.0', 'port': broker_listener_port, 'linkCapacity': 100, 'saslMechanisms': 'ANONYMOUS'}), + ]) + cls.broker = cls.tester.qdrouterd(bname, bconfig, wait=True) + + # Configure and start test-router name = "test-router" config = Qdrouterd.Config([ ('router', {'mode': 'standalone', 'id': 'QDR'}), # Setting the linkCapacity to 10 will allow the sender to send a burst of 10 messages - ('listener', {'port': cls.tester.get_port(), 'linkCapacity': 10}), + ('listener', {'role': 'normal', + 'host': '0.0.0.0', 'port': test_listener_port, + 'linkCapacity': 10, 'saslMechanisms': 'ANONYMOUS'}), + # The DrainMessagesMoreHandler accepts a src/tgt address that may be link-routed. + # This defines the link route to 'broker' and the 'abc' prefix. + ('connector', {'name': 'broker1-conn', 'role': 'route-container', + 'host': '0.0.0.0', 'port': broker_listener_port, + 'saslMechanisms': 'ANONYMOUS'}), + ('linkRoute', {'prefix': 'abc', 'direction': 'out', 'connection': 'broker1-conn'}), + ('linkRoute', {'prefix': 'abc', 'direction': 'in', 'connection': 'broker1-conn'}), ]) - cls.router = cls.tester.qdrouterd(name, config) - cls.router.wait_ready() + cls.router = cls.tester.qdrouterd(name, config, wait=False) cls.address = cls.router.addresses[0] + sleep(4) # starting router with wait=True hangs. sleep for now + def test_drain_support_1_all_messages(self): drain_support = DrainMessagesHandler(self.address) drain_support.run() @@ -67,6 +98,16 @@ class DrainSupportTest(TestCase): drain_support.run() self.assertEqual(drain_support.error, None) + def test_drain_support_5_drain_then_more_messages_local(self): + drain_support = DrainMessagesMoreHandler(self.address, "org.apache.dev") + drain_support.run() + self.assertEqual(drain_support.error, None) + + def test_drain_support_5_drain_then_more_messages_routed(self): + drain_support = DrainMessagesMoreHandler(self.address, "abc") + drain_support.run() + self.assertEqual(drain_support.error, None) + if __name__ == '__main__': unittest.main(main_module()) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/a6c4b94b/tests/system_tests_drain_support.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py index 930a486..785cb16 100644 --- a/tests/system_tests_drain_support.py +++ b/tests/system_tests_drain_support.py @@ -212,5 +212,144 @@ class DrainNoMoreMessagesHandler(MessagingHandler): Container(self).run() +class DrainMessagesMoreHandler(MessagingHandler): + """ + Make sure the clients can send/receive after going through a drain cycle. + + Send phase + 1. Sender sending first 10 messages + 2. Sender paused waiting for drain to finish + 3. Sender is sending second 10 messages + 4. Sender is done. + + Receive phase + 1. Receiver receiving first four messages; At #4 receiver issues drain 4,20 + 2. Reciever receives messages 5..10. + When 10 messages have been received and link credit =0 the drain is done + Receiver issues 10 credits + 3. Receiver recieves messages 11..20. + 4. Receiver is done + + At issue in DISPATCH-1055 is that the 10 credits issued in Receive step 2 + are never propagated across a link route to the 'broker'. + + This code is instantiated with and without the link route to demonstrate that + it works properly when the 'test-router' is handling the drain by itself + and that it fails only on the link route. + """ + def __init__(self, address, route_name): + # prefetch is set to zero so that proton does not automatically issue 10 credits. + super(DrainMessagesMoreHandler, self).__init__(prefetch=0) + self.conn = None + self.sender = None + self.receiver = None + self.sent_count = 0 + self.received_count = 0 + self.address = address + self.error = "Unexpected Exit" + self.send_phase = 1 + self.recv_phase = 1 + self.route_name = route_name + self.verbose_printing = False + + def show_state(self): + return str("send_phase:" + str(self.send_phase) + + ", sent_count:" + str(self.sent_count) + + ", recv_phase:" + str(self.recv_phase) + + ", receive_count:" + str(self.received_count) + + ", receiver_credit:" + str(self.receiver.credit) + + ", sender_credit:" + str(self.sender.credit)) + + def printme(self, str): + if (self.verbose_printing): + print (str + " " + self.show_state()) + + def timeout(self): + self.error = "Timeout Expired: sent: %d rcvd: %d" % (self.sent_count, self.received_count) + self.conn.close() + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) + self.conn = event.container.connect(self.address) + + # Create a sender and a receiver. They are both listening on the same address + self.receiver = event.container.create_receiver(self.conn, source=self.route_name) + self.sender = event.container.create_sender(self.conn, target=self.route_name) + self.receiver.flow(1) + + def on_link_flow(self, event): + if event.link.is_sender and event.link.credit \ + and event.link.state & Endpoint.LOCAL_ACTIVE \ + and event.link.state & Endpoint.REMOTE_ACTIVE : + self.on_sendable(event) + + # The fact that the event.link.credit is 0 means that the receiver will not be receiving any more + # messages. That along with 10 messages received indicates that the drain worked. + if self.send_phase == 2 and self.received_count == 10 and event.link.credit == 0: + self.printme ("sender transitions to phase 3 - drain completed, send new flow now") + self.receiver.flow(10) + self.send_phase = 3 + + if event.link.is_sender and event.link.credit \ + and event.link.state & Endpoint.LOCAL_ACTIVE \ + and event.link.state & Endpoint.REMOTE_ACTIVE : + self.on_sendable(event) + self.printme (("sender " if event.link.is_sender else "receiver ") + "exit on_link_flow:") + + def on_sendable(self, event): + if event.link.is_sender and self.send_phase == 1 and self.sent_count < 10: + msg = Message(body="Hello World", properties={'seq': self.sent_count}) + dlv = event.sender.send(msg) + dlv.settle() + self.sent_count += 1 + if self.sent_count == 10: + self.printme ("sender transitions to phase 2 - wait for drain to finish") + self.send_phase = 2 + elif event.link.is_sender and self.send_phase == 3 and self.sent_count < 20: + msg = Message(body="Hello World", properties={'seq': self.sent_count}) + dlv = event.sender.send(msg) + dlv.settle() + self.sent_count += 1 + if self.sent_count == 20: + self.printme ("sender transitions to phase 4 - done sending") + self.send_phase = 4 + self.printme (("sender " if event.link.is_sender else "receiver ") + "exit on_sendable:") + + def on_message(self, event): + if event.receiver == self.receiver: + if "Hello World" == event.message.body: + self.received_count += 1 + + if self.recv_phase == 1 and self.received_count < 4: + event.receiver.flow(1) + elif self.recv_phase == 1 and self.received_count == 4: + # We are issuing a drain of 20. This means that we will receive all the 10 messages + # that the sender is sending. The router will also send back a response flow frame with + # drain=True but I don't have any way of making sure that the response frame reached the + # receiver + self.printme ("receiver transitions to phase 2 - sending drain now") + event.receiver.drain(20) + self.recv_phase = 2 + elif self.recv_phase == 2 and self.received_count == 10: + self.printme ("receiver transitions to phase 3") + self.recv_phase = 3 + msg = Message(body="Hello World", properties={'seq': self.sent_count}) + dlv = self.sender.send(msg) + dlv.settle() + self.sent_count += 1 + elif self.recv_phase == 3 and self.received_count == 20: + self.printme ("receiver transitions to phase 4 - test is completed successfully") + self.recv_phase = 4 + self.error = None + self.timer.cancel() + self.receiver.close() + self.sender.close() + self.conn.close() + self.printme ("exit on_message:") + + def run(self): + Container(self).run() + + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
