Repository: qpid-dispatch Updated Branches: refs/heads/master 21a32cad7 -> 3086de102
DISPATCH-209 - From Mick Goulish - Added anonymous sender for 3-router test. This closes #126 Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/3086de10 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/3086de10 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/3086de10 Branch: refs/heads/master Commit: 3086de102ff125fc9a218c15140db4f858210da9 Parents: 21a32ca Author: Ted Ross <[email protected]> Authored: Mon Dec 19 09:21:52 2016 -0500 Committer: Ted Ross <[email protected]> Committed: Mon Dec 19 09:21:52 2016 -0500 ---------------------------------------------------------------------- tests/system_tests_three_routers.py | 84 +++++++++++++++++++++++++++++++- 1 file changed, 82 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3086de10/tests/system_tests_three_routers.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_three_routers.py b/tests/system_tests_three_routers.py index def31ff..2414c92 100644 --- a/tests/system_tests_three_routers.py +++ b/tests/system_tests_three_routers.py @@ -23,6 +23,7 @@ from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SS from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process from proton.handlers import MessagingHandler from proton.reactor import Container, AtMostOnce, AtLeastOnce +import time # PROTON-828: @@ -65,6 +66,7 @@ class RouterTest(TestCase): inter_router_port_1 = cls.tester.get_port() inter_router_port_2 = cls.tester.get_port() + # A <--- B <--- C router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port_1}) ) router('B', ('listener', {'role': 'inter-router', 'port': inter_router_port_2}), @@ -83,6 +85,11 @@ class RouterTest(TestCase): test.run() self.assertEqual(None, test.error) + def test_02_anonymous_sender(self): + test = AnonymousSenderTest(self.routers[0].addresses[0], self.routers[2].addresses[0]) + test.run() + self.assertEqual(None, test.error) + class Timeout(object): def __init__(self, parent): @@ -112,7 +119,8 @@ class TargetedSenderTest(MessagingHandler): self.conn2.close() def on_start(self, event): - self.timer = event.reactor.schedule(10, Timeout(self)) + # receiver <--- A <--- B <---- C <--- sender + self.timer = event.reactor.schedule(5, Timeout(self)) self.conn1 = event.container.connect(self.address1) self.conn2 = event.container.connect(self.address2) self.sender = event.container.create_sender(self.conn1, self.dest) @@ -136,10 +144,82 @@ class TargetedSenderTest(MessagingHandler): self.conn2.close() self.timer.cancel() - def run(self): Container(self).run() + + +class AnonymousSenderTest(MessagingHandler): + def __init__(self, address1, address2): + super(AnonymousSenderTest, self).__init__(prefetch=0) + self.address1 = address1 + self.address2 = address2 + self.dest = "closest.Anonymous" + self.error = None + self.sender = None + self.receiver = None + self.n_expected = 10 + self.n_sent = 0 + self.n_received = 0 + self.n_accepted = 0 + + def timeout(self): + self.error = "Timeout Expired %d messages received." % self.n_received + self.conn1.close() + self.conn2.close() + + # The problem with using an anonymous sender in a router + # network is that it takes finite time for endpoint information + # to propagate around the network. It is possible for me to + # start sending before my router knows how to route my messages, + # which will cause them to get released, and my test will hang, + # doomed to wait eternally for the tenth message to be received. + # To fix this, we will detect released messages here, and decrement + # the sent message counter, forcing a resend for each drop. + # And also pause for a moment, since we know that the network is + # not yet ready. + def on_released(self, event): + self.n_sent -= 1 + time.sleep(0.1) + + def on_link_opened(self, event): + if event.receiver: + # This sender has no destination addr, so we will have to + # address each message individually. + # Also -- Create the sender here, when we know that the + # receiver link has opened, because then we are at least + # close to being able to send. (See comment above.) + self.sender = event.container.create_sender(self.conn1, None) + + def on_start(self, event): + self.timer = event.reactor.schedule(5, Timeout(self)) + self.conn1 = event.container.connect(self.address1) + self.conn2 = event.container.connect(self.address2) + self.receiver = event.container.create_receiver(self.conn2, self.dest) + self.receiver.flow(self.n_expected) + + def on_sendable(self, event): + if self.n_sent < self.n_expected: + # Add the destination addr to each message. + msg = Message(body=self.n_sent, address=self.dest) + event.sender.send(msg) + self.n_sent += 1 + + def on_accepted(self, event): + self.n_accepted += 1 + + def on_message(self, event): + self.n_received += 1 + if self.n_received == self.n_expected: + self.receiver.close() + self.conn1.close() + self.conn2.close() + self.timer.cancel() + def run(self): + Container(self).run() + + + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
