Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 1ca80b6e2 -> c9262728d


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/tests/system_tests_two_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_two_routers.py 
b/tests/system_tests_two_routers.py
index cebc74a..e3a8391 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -31,14 +31,14 @@ except ImportError:
     from proton import PN_STATUS_MODIFIED as MODIFIED
 
 
-class RouterTest(TestCase):
+class TwoRouterTest(TestCase):
 
     inter_router_port = None
 
     @classmethod
     def setUpClass(cls):
         """Start a router and a messenger"""
-        super(RouterTest, cls).setUpClass()
+        super(TwoRouterTest, cls).setUpClass()
 
         def router(name, client_server, connection):
 
@@ -1067,13 +1067,11 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
-
     def test_15_attach_on_inter_router(self):
         test = AttachOnInterRouterTest(self.routers[0].addresses[5])
         test.run()
         self.assertEqual(None, test.error)
 
-
     def test_16_delivery_annotations(self):
         addr = "amqp:/delivery_annotations.1"
         M1 = self.messenger()
@@ -1167,6 +1165,11 @@ class RouterTest(TestCase):
         for M in receivers:
             M.stop()
 
+    def test_17_large_streaming_test(self):
+        test = LargeMessageStreamTest(self.routers[0].addresses[0], 
self.routers[1].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
 
 class Timeout(object):
     def __init__(self, parent):
@@ -1175,6 +1178,56 @@ class Timeout(object):
     def on_timer_task(self, event):
         self.parent.timeout()
 
+class LargeMessageStreamTest(MessagingHandler):
+    def __init__(self, address1, address2):
+        super(LargeMessageStreamTest, self).__init__()
+        self.address1 = address1
+        self.address2 = address2
+        self.dest = "LargeMessageStreamTest"
+        self.error = None
+        self.conn1 = None
+        self.conn2 = None
+        self.count = 10
+        self.n_sent = 0
+        self.timer = None
+        self.sender = None
+        self.receiver = None
+        self.n_received = 0
+        self.body = ""
+        for i in range(10000):
+            self.body += "0123456789101112131415"
+
+    def check_if_done(self):
+        if self.n_received == self.count:
+            self.timer.cancel()
+            self.conn1.close()
+            self.conn2.close()
+
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, 
self.n_received)
+        self.conn1.close()
+        self.conn2.close()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn1 = event.container.connect(self.address1)
+        self.conn2 = event.container.connect(self.address2)
+        self.sender = event.container.create_sender(self.conn1, self.dest)
+        self.receiver = event.container.create_receiver(self.conn2, self.dest)
+
+    def on_sendable(self, event):
+        if self.n_sent < self.count:
+            msg = Message(body=self.body)
+            # send(msg) calls the stream function which streams data from 
sender to the router
+            event.sender.send(msg)
+            self.n_sent += 1
+
+    def on_message(self, event):
+        self.n_received += 1
+        self.check_if_done()
+
+    def run(self):
+        Container(self).run()
 
 class ExcessDeliveriesReleasedTest(MessagingHandler):
     def __init__(self, address1, address2):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to