Repository: qpid-dispatch
Updated Branches:
  refs/heads/0.6.x b765c1b84 -> db4a04cfe


DISPATCH-341 - From Ganesh Murthy - Added some drain tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/28a40255
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/28a40255
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/28a40255

Branch: refs/heads/0.6.x
Commit: 28a40255c7c10fec71d8742d410b8eff67168b99
Parents: b765c1b
Author: Ted Ross <[email protected]>
Authored: Fri Jun 3 10:15:10 2016 -0400
Committer: Ted Ross <[email protected]>
Committed: Mon Jun 13 17:22:01 2016 -0400

----------------------------------------------------------------------
 tests/CMakeLists.txt                |   1 +
 tests/system_tests_drain.py         |  55 +++++++++++++++++
 tests/system_tests_drain_support.py | 101 +++++++++++++++++++++++++++++++
 tests/system_tests_link_routes.py   |  15 ++++-
 4 files changed, 170 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/28a40255/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index eef7900..1c1be1c 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -72,6 +72,7 @@ add_test(router_policy_test    ${TEST_WRAP} -m unittest -v 
router_policy_test)
 foreach(py_test_module
 #   system_tests_broker
     system_tests_link_routes
+    system_tests_drain
     system_tests_management
     system_tests_one_router
     system_tests_policy

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/28a40255/tests/system_tests_drain.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain.py b/tests/system_tests_drain.py
new file mode 100644
index 0000000..ba503a1
--- /dev/null
+++ b/tests/system_tests_drain.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+from system_test import TestCase, Qdrouterd, main_module
+from system_tests_drain_support import DrainMessagesHandler, 
DrainOneMessageHandler
+
+class DrainSupportTest(TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router and a messenger"""
+        super(DrainSupportTest, cls).setUpClass()
+        name = "test-router"
+        config = Qdrouterd.Config([
+            ('router', {'mode': 'standalone', 'id': 'QDR'}),
+
+            # Setting the linkCapacity to 10 will allow the sender to send a 
burst of 10 messages
+            ('listener', {'port': cls.tester.get_port(), 'linkCapacity': 10}),
+
+        ])
+
+        cls.router = cls.tester.qdrouterd(name, config)
+        cls.router.wait_ready()
+        cls.address = cls.router.addresses[0]
+
+    def test_drain_support_all_messages(self):
+        drain_support = DrainMessagesHandler(self.address)
+        drain_support.run()
+        self.assertTrue(drain_support.drain_successful)
+
+    def test_drain_support_one_message(self):
+        drain_support = DrainOneMessageHandler(self.address)
+        drain_support.run()
+        self.assertTrue(drain_support.drain_successful)
+
+if __name__ == '__main__':
+    unittest.main(main_module())
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/28a40255/tests/system_tests_drain_support.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_drain_support.py 
b/tests/system_tests_drain_support.py
new file mode 100644
index 0000000..6192a7c
--- /dev/null
+++ b/tests/system_tests_drain_support.py
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from proton import Message
+
+class DrainMessagesHandler(MessagingHandler):
+    def __init__(self, address):
+        # prefetch is set to zero so that proton does not automatically issue 
10 credits.
+        super(DrainMessagesHandler, self).__init__(prefetch=0)
+        self.conn = None
+        self.sender = None
+        self.receiver = None
+        self.sent_count = 0
+        self.received_count = 0
+        self.address = address
+        self.drain_successful = False
+
+    def on_start(self, event):
+        self.conn = event.container.connect(self.address)
+
+        # Create a sender and a receiver. They are both listening on the same 
address
+        self.receiver = event.container.create_receiver(self.conn, 
"org.apache.dev")
+        self.sender = event.container.create_sender(self.conn, 
"org.apache.dev")
+        self.receiver.flow(1)
+
+    def on_sendable(self, event):
+        if self.sent_count < 10:
+            msg = Message(body="Hello World")
+            dlv = event.sender.send(msg)
+            dlv.settle()
+            self.sent_count += 1
+
+    def on_message(self, event):
+        if event.receiver == self.receiver:
+            if "Hello World" == event.message.body:
+                self.received_count += 1
+
+            if self.received_count < 4:
+                event.receiver.flow(1)
+            elif self.received_count == 4:
+                # We are issuing a drain of 20. This means that we will 
receive all the 10 messages
+                # that the sender is sending. The router will also send back a 
response flow frame with
+                # drain=True but I don't have any way of making sure that the 
response frame reached the
+                # receiver
+                event.receiver.drain(20)
+
+            # The fact that the event.link.credit is 0 means that the receiver 
will not be receiving any more
+            # messages. That along with 10 messages received indicates that 
the drain worked and we can
+            # declare that the test is successful
+            if self.received_count == 10 and event.link.credit == 0:
+                self.drain_successful = True
+                self.receiver.close()
+                self.sender.close()
+                self.conn.close()
+
+    def run(self):
+        Container(self).run()
+
+class DrainOneMessageHandler(DrainMessagesHandler):
+    def __init__(self, address):
+        super(DrainOneMessageHandler, self).__init__(address)
+
+    def on_message(self, event):
+        if event.receiver == self.receiver:
+            if "Hello World" == event.message.body:
+                self.received_count += 1
+
+            if self.received_count < 4:
+                event.receiver.flow(1)
+            elif self.received_count == 4:
+                # We are issuing a drain of 1 after we receive the 4th message.
+                # This means that going forward, we will receive only one more 
message.
+                event.receiver.drain(1)
+
+            # The fact that the event.link.credit is 0 means that the receiver 
will not be receiving any more
+            # messages. That along with 5 messages received (4 earlier 
messages and 1 extra message for drain=1)
+            # indicates that the drain worked and we can declare that the test 
is successful
+            if self.received_count == 5 and event.link.credit == 0:
+                self.drain_successful = True
+                self.receiver.close()
+                self.sender.close()
+                self.conn.close()
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/28a40255/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py 
b/tests/system_tests_link_routes.py
index 8f2ee8d..37317f9 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -28,6 +28,8 @@ from proton.handlers import MessagingHandler
 from proton.reactor import AtMostOnce, Container
 from proton.utils import BlockingConnection, LinkDetached
 
+from system_tests_drain_support import DrainMessagesHandler, 
DrainOneMessageHandler
+
 from qpid_dispatch.management.client import Node
 
 class LinkRoutePatternTest(TestCase):
@@ -443,6 +445,16 @@ class LinkRoutePatternTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_www_drain_support_all_messages(self):
+        drain_support = DrainMessagesHandler(self.routers[2].addresses[1])
+        drain_support.run()
+        self.assertTrue(drain_support.drain_successful)
+
+    def test_www_drain_support_one_message(self):
+        drain_support = DrainOneMessageHandler(self.routers[2].addresses[1])
+        drain_support.run()
+        self.assertTrue(drain_support.drain_successful)
+
 
 class DeliveryTagsTest(MessagingHandler):
     def __init__(self, sender_address, listening_address, qdstat_address):
@@ -565,7 +577,6 @@ class CloseWithUnsettledTest(MessagingHandler):
     def run(self):
         Container(self).run()
 
-
-
 if __name__ == '__main__':
     unittest.main(main_module())
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to