Repository: qpid-dispatch Updated Branches: refs/heads/0.6.x b765c1b84 -> db4a04cfe
DISPATCH-341 - From Ganesh Murthy - Added some drain tests Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/28a40255 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/28a40255 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/28a40255 Branch: refs/heads/0.6.x Commit: 28a40255c7c10fec71d8742d410b8eff67168b99 Parents: b765c1b Author: Ted Ross <[email protected]> Authored: Fri Jun 3 10:15:10 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Mon Jun 13 17:22:01 2016 -0400 ---------------------------------------------------------------------- tests/CMakeLists.txt | 1 + tests/system_tests_drain.py | 55 +++++++++++++++++ tests/system_tests_drain_support.py | 101 +++++++++++++++++++++++++++++++ tests/system_tests_link_routes.py | 15 ++++- 4 files changed, 170 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/28a40255/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index eef7900..1c1be1c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -72,6 +72,7 @@ add_test(router_policy_test ${TEST_WRAP} -m unittest -v router_policy_test) foreach(py_test_module # system_tests_broker system_tests_link_routes + system_tests_drain system_tests_management system_tests_one_router system_tests_policy http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/28a40255/tests/system_tests_drain.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py new file mode 100644 index 0000000..ba503a1 --- /dev/null +++ b/tests/system_tests_drain.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest + +from system_test import TestCase, Qdrouterd, main_module +from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler + +class DrainSupportTest(TestCase): + + @classmethod + def setUpClass(cls): + """Start a router and a messenger""" + super(DrainSupportTest, cls).setUpClass() + name = "test-router" + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR'}), + + # Setting the linkCapacity to 10 will allow the sender to send a burst of 10 messages + ('listener', {'port': cls.tester.get_port(), 'linkCapacity': 10}), + + ]) + + cls.router = cls.tester.qdrouterd(name, config) + cls.router.wait_ready() + cls.address = cls.router.addresses[0] + + def test_drain_support_all_messages(self): + drain_support = DrainMessagesHandler(self.address) + drain_support.run() + self.assertTrue(drain_support.drain_successful) + + def test_drain_support_one_message(self): + drain_support = DrainOneMessageHandler(self.address) + drain_support.run() + self.assertTrue(drain_support.drain_successful) + +if __name__ == '__main__': + unittest.main(main_module()) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/28a40255/tests/system_tests_drain_support.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_drain_support.py b/tests/system_tests_drain_support.py new file mode 100644 index 0000000..6192a7c --- /dev/null +++ b/tests/system_tests_drain_support.py @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton.handlers import MessagingHandler +from proton.reactor import Container +from proton import Message + +class DrainMessagesHandler(MessagingHandler): + def __init__(self, address): + # prefetch is set to zero so that proton does not automatically issue 10 credits. + super(DrainMessagesHandler, self).__init__(prefetch=0) + self.conn = None + self.sender = None + self.receiver = None + self.sent_count = 0 + self.received_count = 0 + self.address = address + self.drain_successful = False + + def on_start(self, event): + self.conn = event.container.connect(self.address) + + # Create a sender and a receiver. They are both listening on the same address + self.receiver = event.container.create_receiver(self.conn, "org.apache.dev") + self.sender = event.container.create_sender(self.conn, "org.apache.dev") + self.receiver.flow(1) + + def on_sendable(self, event): + if self.sent_count < 10: + msg = Message(body="Hello World") + dlv = event.sender.send(msg) + dlv.settle() + self.sent_count += 1 + + def on_message(self, event): + if event.receiver == self.receiver: + if "Hello World" == event.message.body: + self.received_count += 1 + + if self.received_count < 4: + event.receiver.flow(1) + elif self.received_count == 4: + # We are issuing a drain of 20. This means that we will receive all the 10 messages + # that the sender is sending. The router will also send back a response flow frame with + # drain=True but I don't have any way of making sure that the response frame reached the + # receiver + event.receiver.drain(20) + + # The fact that the event.link.credit is 0 means that the receiver will not be receiving any more + # messages. That along with 10 messages received indicates that the drain worked and we can + # declare that the test is successful + if self.received_count == 10 and event.link.credit == 0: + self.drain_successful = True + self.receiver.close() + self.sender.close() + self.conn.close() + + def run(self): + Container(self).run() + +class DrainOneMessageHandler(DrainMessagesHandler): + def __init__(self, address): + super(DrainOneMessageHandler, self).__init__(address) + + def on_message(self, event): + if event.receiver == self.receiver: + if "Hello World" == event.message.body: + self.received_count += 1 + + if self.received_count < 4: + event.receiver.flow(1) + elif self.received_count == 4: + # We are issuing a drain of 1 after we receive the 4th message. + # This means that going forward, we will receive only one more message. + event.receiver.drain(1) + + # The fact that the event.link.credit is 0 means that the receiver will not be receiving any more + # messages. That along with 5 messages received (4 earlier messages and 1 extra message for drain=1) + # indicates that the drain worked and we can declare that the test is successful + if self.received_count == 5 and event.link.credit == 0: + self.drain_successful = True + self.receiver.close() + self.sender.close() + self.conn.close() + http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/28a40255/tests/system_tests_link_routes.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index 8f2ee8d..37317f9 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -28,6 +28,8 @@ from proton.handlers import MessagingHandler from proton.reactor import AtMostOnce, Container from proton.utils import BlockingConnection, LinkDetached +from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler + from qpid_dispatch.management.client import Node class LinkRoutePatternTest(TestCase): @@ -443,6 +445,16 @@ class LinkRoutePatternTest(TestCase): test.run() self.assertEqual(None, test.error) + def test_www_drain_support_all_messages(self): + drain_support = DrainMessagesHandler(self.routers[2].addresses[1]) + drain_support.run() + self.assertTrue(drain_support.drain_successful) + + def test_www_drain_support_one_message(self): + drain_support = DrainOneMessageHandler(self.routers[2].addresses[1]) + drain_support.run() + self.assertTrue(drain_support.drain_successful) + class DeliveryTagsTest(MessagingHandler): def __init__(self, sender_address, listening_address, qdstat_address): @@ -565,7 +577,6 @@ class CloseWithUnsettledTest(MessagingHandler): def run(self): Container(self).run() - - if __name__ == '__main__': unittest.main(main_module()) + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
