DISPATCH-341 - Added two tests for message-routed drain. Added timeouts for tests.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/789b73e2 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/789b73e2 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/789b73e2 Branch: refs/heads/0.6.x Commit: 789b73e2fdf72911182f194c60a8a89b05dd94ea Parents: 28a4025 Author: Ted Ross <[email protected]> Authored: Fri Jun 3 10:52:24 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Mon Jun 13 17:22:06 2016 -0400 ---------------------------------------------------------------------- tests/system_tests_drain.py | 17 ++++- tests/system_tests_drain_support.py | 105 ++++++++++++++++++++++++++++++- 2 files changed, 116 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/789b73e2/tests/system_tests_drain.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py index ba503a1..9747995 100644 --- a/tests/system_tests_drain.py +++ b/tests/system_tests_drain.py @@ -21,6 +21,7 @@ import unittest from system_test import TestCase, Qdrouterd, main_module from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler +from system_tests_drain_support import DrainNoMessagesHandler, DrainNoMoreMessagesHandler class DrainSupportTest(TestCase): @@ -44,12 +45,22 @@ class DrainSupportTest(TestCase): def test_drain_support_all_messages(self): drain_support = DrainMessagesHandler(self.address) drain_support.run() - self.assertTrue(drain_support.drain_successful) + self.assertEqual(drain_support.error, None) def test_drain_support_one_message(self): drain_support = DrainOneMessageHandler(self.address) drain_support.run() - self.assertTrue(drain_support.drain_successful) + self.assertEqual(drain_support.error, None) + + def test_drain_support_no_messages(self): + drain_support = DrainNoMessagesHandler(self.address) + drain_support.run() + self.assertEqual(drain_support.error, None) + + def test_drain_support_no_more_messages(self): + drain_support = DrainNoMoreMessagesHandler(self.address) + drain_support.run() + self.assertEqual(drain_support.error, None) if __name__ == '__main__': - unittest.main(main_module()) \ No newline at end of file + unittest.main(main_module()) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/789b73e2/tests/system_tests_drain_support.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py index 6192a7c..f11b8b8 100644 --- a/tests/system_tests_drain_support.py +++ b/tests/system_tests_drain_support.py @@ -21,6 +21,14 @@ from proton.handlers import MessagingHandler from proton.reactor import Container from proton import Message +class Timeout(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.timeout() + + class DrainMessagesHandler(MessagingHandler): def __init__(self, address): # prefetch is set to zero so that proton does not automatically issue 10 credits. @@ -31,9 +39,14 @@ class DrainMessagesHandler(MessagingHandler): self.sent_count = 0 self.received_count = 0 self.address = address - self.drain_successful = False + self.error = "Unexpected Exit" + + def timeout(self): + self.error = "Timeout Expired" + self.conn.close() def on_start(self, event): + self.timer = event.reactor.schedule(5, Timeout(self)) self.conn = event.container.connect(self.address) # Create a sender and a receiver. They are both listening on the same address @@ -66,7 +79,8 @@ class DrainMessagesHandler(MessagingHandler): # messages. That along with 10 messages received indicates that the drain worked and we can # declare that the test is successful if self.received_count == 10 and event.link.credit == 0: - self.drain_successful = True + self.error = None + self.timer.cancel() self.receiver.close() self.sender.close() self.conn.close() @@ -74,6 +88,7 @@ class DrainMessagesHandler(MessagingHandler): def run(self): Container(self).run() + class DrainOneMessageHandler(DrainMessagesHandler): def __init__(self, address): super(DrainOneMessageHandler, self).__init__(address) @@ -94,8 +109,92 @@ class DrainOneMessageHandler(DrainMessagesHandler): # messages. That along with 5 messages received (4 earlier messages and 1 extra message for drain=1) # indicates that the drain worked and we can declare that the test is successful if self.received_count == 5 and event.link.credit == 0: - self.drain_successful = True + self.error = None + self.timer.cancel() self.receiver.close() self.sender.close() self.conn.close() + +class DrainNoMessagesHandler(MessagingHandler): + def __init__(self, address): + # prefetch is set to zero so that proton does not automatically issue 10 credits. + super(DrainNoMessagesHandler, self).__init__(prefetch=0) + self.conn = None + self.sender = None + self.receiver = None + self.address = address + self.error = "Unexpected Exit" + + def timeout(self): + self.error = "Timeout Expired" + self.conn.close() + + def on_start(self, event): + self.timer = event.reactor.schedule(5, Timeout(self)) + self.conn = event.container.connect(self.address) + + # Create a sender and a receiver. They are both listening on the same address + self.receiver = event.container.create_receiver(self.conn, "org.apache.dev") + self.sender = event.container.create_sender(self.conn, "org.apache.dev") + self.receiver.flow(1) + + def on_sendable(self, event): + self.receiver.drain(1) + + def on_drained(self, event): + if sender.credit == 0: + self.error = None + self.timer.cancel() + self.conn.close() + + def run(self): + Container(self).run() + + +class DrainNoMoreMessagesHandler(MessagingHandler): + def __init__(self, address): + # prefetch is set to zero so that proton does not automatically issue 10 credits. + super(DrainNoMoreMessagesHandler, self).__init__(prefetch=0) + self.conn = None + self.sender = None + self.receiver = None + self.address = address + self.sent = 0 + self.rcvd = 0 + self.error = "Unexpected Exit" + + def timeout(self): + self.error = "Timeout Expired: sent=%d rcvd=%d" % (self.sent, self.rcvd) + self.conn.close() + + def on_start(self, event): + self.timer = event.reactor.schedule(5, Timeout(self)) + self.conn = event.container.connect(self.address) + + # Create a sender and a receiver. They are both listening on the same address + self.receiver = event.container.create_receiver(self.conn, "org.apache.dev") + self.sender = event.container.create_sender(self.conn, "org.apache.dev") + self.receiver.flow(1) + + def on_sendable(self, event): + if self.sent == 0: + msg = Message(body="Hello World") + event.sender.send(msg) + self.sent += 1 + + def on_message(self, event): + self.rcvd += 1 + + def on_settled(self, event): + self.receiver.drain(1) + + def on_drained(self, event): + if sender.credit == 0: + self.error = None + self.timer.cancel() + self.conn.close() + + def run(self): + Container(self).run() + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
