This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 3419ad4  DISPATCH-2275: fixup system tests broken by proton log output 
changes
3419ad4 is described below

commit 3419ad4e2c6f5f290ae8f8cab4f1387f89d10bd8
Author: Kenneth Giusti <[email protected]>
AuthorDate: Fri Nov 5 13:27:30 2021 -0400

    DISPATCH-2275: fixup system tests broken by proton log output changes
    
    - skip test_04_scraper_tool (DISPATCH-2276)
    - modify system_tests_protocol_settings to use proton api
    - fix system_tests_link_routes to use proton codec
    
    This closes #1430
---
 tests/system_tests_link_routes.py          | 350 +++++++++++++++++++++--------
 tests/system_tests_protocol_settings.py    | 313 +++++++++-----------------
 tests/system_tests_topology_disposition.py |   2 +
 3 files changed, 363 insertions(+), 302 deletions(-)

diff --git a/tests/system_tests_link_routes.py 
b/tests/system_tests_link_routes.py
index f2a7d5a..2672ca5 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -21,13 +21,14 @@ from time import sleep, time
 from threading import Event
 from subprocess import PIPE, STDOUT
 import socket
+from typing import Optional
 
 from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process, 
TestTimeout, \
     AsyncTestSender, AsyncTestReceiver, MgmtMsgProxy, unittest, QdManager
 from test_broker import FakeBroker
 from test_broker import FakeService
 
-from proton import Delivery, symbol
+from proton import Delivery, symbol, Data, Described
 from proton import Message, Condition
 from proton.handlers import MessagingHandler
 from proton.reactor import AtMostOnce, Container, DynamicNodeProperties, 
LinkOption, AtLeastOnce
@@ -1760,6 +1761,23 @@ class LinkRouteDrainTest(TestCase):
 
 
 class EmptyTransferTest(TestCase):
+    """Verify empty tranfer frames (no body) do not crash the router.  See
+    DISPATCH-1988.
+    """
+
+    # various identifiers defined by AMQP 1.0
+    OPEN_DESCRIPTOR = 0x10
+    BEGIN_DESCRIPTOR = 0x11
+    ATTACH_DESCRIPTOR = 0x12
+    FLOW_DESCRIPTOR = 0x13
+    TRANSFER_DESCRIPTOR = 0x14
+    DISPO_DESCRIPTOR = 0x15
+    ACCEPTED_OUTCOME = 0x24
+    REJECTED_OUTCOME = 0x25
+    TARGET_DESCRIPTOR = 0x29
+    MA_SECTION_DESCRIPTOR = 0x73
+    BODY_SECTION_DESCRIPTOR = 0x77
+
     @classmethod
     def setUpClass(cls):
         super(EmptyTransferTest, cls).setUpClass()
@@ -1769,7 +1787,6 @@ class EmptyTransferTest(TestCase):
             ('router', {'mode': 'standalone', 'id': 'QDR.A'}),
             # the client will connect to this listener
             ('listener', {'role': 'normal',
-                          'host': '0.0.0.0',
                           'port': cls.ROUTER_LISTEN_PORT,
                           'saslMechanisms': 'ANONYMOUS'}),
             # to connect to the fake broker
@@ -1797,110 +1814,259 @@ class EmptyTransferTest(TestCase):
         self.router.wait_connectors()
         return fake_broker
 
+    def _find_frame(self, data: bytes, code: int) -> Optional[list]:
+        """Scan a byte sequence for performatives that match code.
+        Return the frame body (list) if match else None
+        """
+        while data:
+            # starts at frame header (8 bytes)
+            frame_len = int.from_bytes(data[:4], "big")
+            if frame_len == 0 or frame_len > len(data):
+                return None
+            desc = Data()
+            desc.decode(data[8:frame_len])  # skip frame header
+            data = data[frame_len:]    # advance to next frame
+            desc.rewind()
+            if desc.next() is None:
+                return None
+            if not desc.is_described():
+                return None
+            py_desc = desc.get_py_described()
+            if py_desc.descriptor == code:
+                return py_desc.value
+        return None
+
+    def _send_frame(self, frame: Data, sock: socket.socket):
+        """Encode and send frame over sock
+        """
+        frame.rewind()
+        fbytes = frame.encode()
+        flen = len(fbytes) + 8
+        # AMQP FRAME HEADER: 4 byte length, DOFF, TYPE, CHANNEL
+        sock.sendall(flen.to_bytes(4, "big"))
+        sock.sendall(bytes([2, 0, 0, 0]))
+        sock.sendall(fbytes)
+
+    def _construct_transfer(self, delivery_id, tag, more=False, add_ma=False,
+                            add_body=False) -> Data:
+        """Construct a Transfer frame in a proton Data object
+        """
+        t1_frame = Data()
+        t1_frame.put_described()
+        t1_frame.enter()
+        t1_frame.put_ulong(self.TRANSFER_DESCRIPTOR)
+        t1_frame.put_list()
+        t1_frame.enter()
+        t1_frame.put_uint(0)  # handle
+        t1_frame.put_uint(delivery_id)
+        t1_frame.put_binary(tag)
+        t1_frame.put_uint(0)           # msg format
+        t1_frame.put_bool(False)       # settled
+        t1_frame.put_bool(more)
+        t1_frame.exit()   # transfer list
+        t1_frame.exit()   # transfer described type
+        if add_ma:
+            t1_frame.put_described()
+            t1_frame.enter()
+            t1_frame.put_ulong(self.MA_SECTION_DESCRIPTOR)
+            t1_frame.put_list()
+            t1_frame.enter()
+            t1_frame.put_ulong(9)
+            t1_frame.exit()  # list
+            t1_frame.exit()  # described
+        if add_body:
+            t1_frame.put_described()
+            t1_frame.enter()
+            t1_frame.put_ulong(self.BODY_SECTION_DESCRIPTOR)
+            t1_frame.put_string("I'm a small body!")
+            t1_frame.exit()
+            t1_frame.exit()
+
+        return t1_frame
+
+    def _get_outcome(self, dispo_frame: list) -> Optional[int]:
+        """Extract the outcome from a raw disposition frame"""
+        outcome = None
+        if len(dispo_frame) >= 5:  # list[5] == state
+            if isinstance(dispo_frame[4], Described):
+                outcome = dispo_frame[4].descriptor
+        return outcome
+
+    def _read_socket(self, sock: socket.socket,
+                     timeout: float = 1.0) -> bytes:
+        """Read all available data from the socket, waiting up to 1 second for
+        data to arrive
+        """
+        old_timeout = sock.gettimeout()
+        sock.settimeout(timeout)
+        data = b''
+        while True:
+            try:
+                incoming = sock.recv(4096)
+                if not incoming:
+                    break
+                data += incoming
+            except OSError:  # timeout
+                break
+        sock.settimeout(old_timeout)
+        return data
+
     def test_DISPATCH_1988(self):
         fake_broker = self._fake_broker(FakeBroker)
-        AMQP_OPEN_BEGIN_ATTACH = bytearray(
-            b'\x41\x4d\x51\x50\x00\x01\x00\x00\x00\x00\x00\x21\x02\x00\x00'
-            b'\x00\x00\x53\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x04\xa1\x06'
-            b'\x2e\x2f\x73\x65\x6e\x64\x40\x40\x60\x7f\xff\x00\x00\x00\x21'
-            b'\x02\x00\x00\x00\x00\x53\x11\xd0\x00\x00\x00\x11\x00\x00\x00'
-            b'\x04\x40\x52\x00\x70\x7f\xff\xff\xff\x70\x7f\xff\xff\xff\x00'
-            b'\x00\x00\x5b\x02\x00\x00\x00\x00\x53\x12\xd0\x00\x00\x00\x4b'
-            b'\x00\x00\x00\x0b\xa1\x09\x6d\x79\x5f\x73\x65\x6e\x64\x65\x72'
-            b'\x52\x00\x42\x50\x02\x50\x00\x00\x53\x28\xd0\x00\x00\x00\x0b'
-            b'\x00\x00\x00\x05\x40\x52\x00\x40\x52\x00\x42\x00\x53\x29\xd0'
-            b'\x00\x00\x00\x14\x00\x00\x00\x05\xa1\x08\x65\x78\x61\x6d\x70'
-            b'\x6c\x65\x73\x52\x00\x40\x52\x00\x42\x40\x40\x52\x00\x53\x00')
 
+        self.router.wait_ready()
         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s.settimeout(TIMEOUT)
         # Connect to the router listening port and send an amqp, open,
         # begin, attach. The attach is sent on the link
         # routed address, "examples"
-        s.connect(("0.0.0.0", EmptyTransferTest.ROUTER_LISTEN_PORT))
-        s.sendall(AMQP_OPEN_BEGIN_ATTACH)
-
-        # Give a second for the attach to propagate to the broker and
-        # for the broker to send a response attach
-        sleep(1)
-        data = s.recv(2048)
-        self.assertIn("examples", repr(data))
-
-        # First send a message on link routed address "examples" with
-        # message body of "message 0"
-        # Verify the the sent message has been accepted.
-        TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00'
-                               + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x01'
-                               + b'\xa0\x01\x01\x43\x42'
-                               + b'\x40\x40\x40\x40\x40\x42\x00\x53'
-                               + b'\x73\xc0\x02\x01\x44\x00\x53\x77'
-                               + b'\xa1\x09\x6d\x65\x73\x73\x61\x67'
-                               + b'\x65\x20\x30')
-        s.sendall(TRANSFER_1)
-        sleep(0.5)
-        data = s.recv(1024)
-        # The delivery has been accepted.
-        self.assertIn("x00S$E", repr(data))
+        s.connect(("127.0.0.1", EmptyTransferTest.ROUTER_LISTEN_PORT))
+
+        # send 'AMQP 1 0' preamble
+        s.sendall(b'\x41\x4d\x51\x50\x00\x01\x00\x00')
+
+        # send Open/Begin/Attach
+        open_frame = Data()
+        open_frame.put_described()
+        open_frame.enter()
+        open_frame.put_ulong(self.OPEN_DESCRIPTOR)
+        open_frame.put_list()
+        open_frame.enter()
+        open_frame.put_string("TestContainer")
+        open_frame.exit()
+        open_frame.exit()
+        open_frame.rewind()
+
+        begin_frame = Data()
+        begin_frame.put_described()
+        begin_frame.enter()
+        begin_frame.put_ulong(self.BEGIN_DESCRIPTOR)
+        begin_frame.put_list()
+        begin_frame.enter()
+        begin_frame.put_null()
+        begin_frame.put_uint(0)   # next out id
+        begin_frame.put_uint(0xfffff)  # in/out window
+        begin_frame.put_uint(0xfffff)
+        begin_frame.exit()
+        begin_frame.exit()
+        begin_frame.rewind()
+
+        attach_frame = Data()
+        attach_frame.put_described()
+        attach_frame.enter()
+        attach_frame.put_ulong(self.ATTACH_DESCRIPTOR)
+        attach_frame.put_list()
+        attach_frame.enter()
+        attach_frame.put_string("test-link-name")
+        attach_frame.put_uint(0)      # handle
+        attach_frame.put_bool(False)  # sender
+        attach_frame.put_null()
+        attach_frame.put_null()
+        attach_frame.put_null()
+        # target:
+        attach_frame.put_described()
+        attach_frame.enter()
+        attach_frame.put_ulong(self.TARGET_DESCRIPTOR)
+        attach_frame.put_list()
+        attach_frame.enter()
+        attach_frame.put_string("examples/foo")
+        attach_frame.exit()  # target list
+        attach_frame.exit()  # target descriptor
+        attach_frame.exit()    # attach list
+        attach_frame.exit()    # attach descriptor
+        attach_frame.rewind()
+
+        for frame in [open_frame, begin_frame, attach_frame]:
+            self._send_frame(frame, s)
+
+        # Give time for the attach to propagate to the broker and
+        # for the broker to send a response attach and flow:
+        data = self._read_socket(s, timeout=2.0)
+        self.assertTrue(len(data) > 8)
+        self.assertEqual(data[:8], b'AMQP\x00\x01\x00\x00')
+        # expect that the connection was accepted: check for a flow frame:
+        flow_frame = self._find_frame(data[8:], self.FLOW_DESCRIPTOR)
+        self.assertIsNotNone(flow_frame, "no flow frame received: %s" % data)
+
+        # First send a message on link routed address "examples" with a small
+        # message body. Verify the the sent message has been accepted.
+        t1_frame = self._construct_transfer(0, b'\x01', add_ma=True, 
add_body=True)
+        self._send_frame(t1_frame, s)
+
+        # We expect to get a disposition frame that accepted the message
+        data = self._read_socket(s)
+        self.assertTrue(len(data) > 0)
+        dispo_frame = self._find_frame(data, self.DISPO_DESCRIPTOR)
+        self.assertIsNotNone(dispo_frame,
+                             "expected a disposition (none arrived!): %s"
+                             % data)
+
+        outcome = self._get_outcome(dispo_frame)
+        self.assertEqual(self.ACCEPTED_OUTCOME, outcome,
+                         "Transfer not accepted (unexpected!) actual=%s"
+                         % outcome)
 
         # Test case 1
-        # Send an empty transfer frame to the router and you should
-        # receive a rejected disposition from the router.
-        # Without the fix for DISPATCH_1988,
-        # upon sending this EMPTY_TRANSFER
-        # the router crashes with the following assert
+        #
+        # Send an empty transfer frame to the router and you should receive a
+        # rejected disposition from the router.  Without the fix for
+        # DISPATCH_1988, upon sending this EMPTY_TRANSFER the router crashes
+        # with the following assert
+        #
         # qpid-dispatch/src/message.c:1260: qd_message_add_fanout: Assertion 
`content->pending && qd_buffer_size(content->pending) > 0' failed.
-        # This is the empty transfer frame that is sent to the router.
-        # [0x614000030050]: AMQP:FRAME:0 <- @transfer(20) [handle=0, 
delivery-id=0, delivery-tag=b"\x01", message-format=0, settled=false, 
batchable=false]
-        EMPTY_TRANSFER = bytearray(b'\x00\x00\x00\x1c\x02\x00\x00\x00'
-                                   + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52'
-                                   + b'\x02\xa0\x01\x02\x43\x42'
-                                   + b'\x42\x40\x40\x40\x40\x42')
-        s.sendall(EMPTY_TRANSFER)
-        sleep(1)
-        data = s.recv(1024)
-        # The delivery has been rejected.
-        self.assertIn("x00S%E", repr(data))
-
-        # Let's send another transfer to make sure that the
-        # router has not crashed.
-        TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00'
-                               + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x03'
-                               + b'\xa0\x01\x03\x43\x42'
-                               + b'\x40\x40\x40\x40\x40\x42\x00\x53'
-                               + b'\x73\xc0\x02\x01\x44\x00\x53\x77'
-                               + b'\xa1\x09\x6d\x65\x73\x73\x61\x67'
-                               + b'\x65\x20\x30')
-        s.sendall(TRANSFER_1)
-        sleep(0.5)
-        data = s.recv(1024)
-        # The delivery has been accepted.
-        self.assertIn("x00S$E", repr(data))
+
+        t2_frame = self._construct_transfer(1, b'\x02')
+        self._send_frame(t2_frame, s)
+
+        data = self._read_socket(s)
+        self.assertTrue(len(data) > 0)
+        dispo_frame = self._find_frame(data, self.DISPO_DESCRIPTOR)
+        self.assertIsNotNone(dispo_frame,
+                             "expected a disposition (none arrived!): %s"
+                             % data)
+        outcome = self._get_outcome(dispo_frame)
+        self.assertEqual(self.REJECTED_OUTCOME, outcome,
+                         "Transfer not rejected (unexpected!) actual=%s"
+                         % outcome)
 
         # Test case 2
-        # Now, send two empty transfer frames, first transfer has
-        # more=true and the next transfer has more=false.
-        # This will again be rejected by the router.
-        # The following are the two transfer frames that will be
-        # sent to the router.
-        #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, 
delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = 
false, more = true, batchable = false]
-        #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0, 
delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled = 
false, more = false, batchable = false]
-        EMPTY_TRANSFER_MORE_TRUE = bytearray(
-            b'\x00\x00\x00\x1c\x02\x00\x00\x00'
-            + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04'
-            + b'\xa0\x01\x04\x43\x42'
-            + b'\x41\x40\x40\x40\x40\x42')
-        EMPTY_TRANSFER_MORE_FALSE = bytearray(
-            b'\x00\x00\x00\x1c\x02\x00\x00\x00'
-            + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04'
-            + b'\xa0\x01\x04\x43\x42'
-            + b'\x42\x40\x40\x40\x40\x42')
-        s.sendall(EMPTY_TRANSFER_MORE_TRUE)
-        s.sendall(EMPTY_TRANSFER_MORE_FALSE)
-        sleep(0.5)
-        data = s.recv(1024)
-        # The delivery has been rejected.
-        self.assertIn("x00S%E", repr(data))
+        # Now, send two empty transfer frames, first transfer has more=true and
+        # the next transfer has more=false.  This will again be rejected by the
+        # router.
+
+        t3_frame = self._construct_transfer(2, b'\x03', more=True)
+        self._send_frame(t3_frame, s)
+        t4_frame = self._construct_transfer(2, b'\x03')
+        self._send_frame(t4_frame, s)
+
+        data = self._read_socket(s)
+        self.assertTrue(len(data) > 0)
+        dispo_frame = self._find_frame(data, self.DISPO_DESCRIPTOR)
+        self.assertIsNotNone(dispo_frame,
+                             "expected a disposition (none arrived!): %s"
+                             % data)
+        outcome = self._get_outcome(dispo_frame)
+        self.assertEqual(self.REJECTED_OUTCOME, outcome,
+                         "Transfer not rejected (unexpected!) actual: %s"
+                         % outcome)
+
+        # Now send a good transfer and ensure the router accepts it
+        t5_frame = self._construct_transfer(3, b'\x04', add_ma=True, 
add_body=True)
+        self._send_frame(t5_frame, s)
+
+        data = self._read_socket(s)
+        self.assertTrue(len(data) > 0)
+        dispo_frame = self._find_frame(data, self.DISPO_DESCRIPTOR)
+        self.assertIsNotNone(dispo_frame,
+                             "expected a disposition (none arrived!): %s"
+                             % data)
+        outcome = self._get_outcome(dispo_frame)
+        self.assertEqual(self.ACCEPTED_OUTCOME, outcome,
+                         "Transfer not accepted (unexpected!) actual: %s"
+                         % outcome)
 
         s.close()
+        fake_broker.join()
 
 
 class ConnectionLinkRouteTest(TestCase):
diff --git a/tests/system_tests_protocol_settings.py 
b/tests/system_tests_protocol_settings.py
index 70495ce..faaa889 100644
--- a/tests/system_tests_protocol_settings.py
+++ b/tests/system_tests_protocol_settings.py
@@ -17,17 +17,16 @@
 # under the License.
 #
 
-import sys
 from system_test import TestCase, Qdrouterd, main_module
-from system_test import unittest
-from proton.utils import BlockingConnection
+from system_test import unittest, TIMEOUT, TestTimeout
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
 
 
 class MaxFrameMaxSessionFramesTest(TestCase):
     """System tests setting proton negotiated size max-frame-size and 
incoming-window"""
     @classmethod
     def setUpClass(cls):
-        """Start a router"""
         super(MaxFrameMaxSessionFramesTest, cls).setUpClass()
         name = "MaxFrameMaxSessionFrames"
         config = Qdrouterd.Config([
@@ -39,29 +38,17 @@ class MaxFrameMaxSessionFramesTest(TestCase):
         cls.router.wait_ready()
         cls.address = cls.router.addresses[0]
 
-    def test_max_frame_max_session_frames__max_sessions_default(self):
-        # Set up a connection to get the Open and a receiver to get a Begin 
frame in the log
-        bc = BlockingConnection(self.router.addresses[0])
-        bc.create_receiver("xxx")
-        bc.close()
-
-        with open(self.router.logfile_path, 'r') as router_log:
-            log_lines = router_log.read().split("\n")
-            open_lines = [s for s in log_lines if "-> @open" in s]
-            # max-frame is from the config
-            self.assertIn(' max-frame-size=2048,', open_lines[0])
-            # channel-max is default
-            self.assertIn(" channel-max=32767", open_lines[0])
-            begin_lines = [s for s in log_lines if "-> @begin" in s]
-            # incoming-window is from the config
-            self.assertIn(" incoming-window=10,", begin_lines[0])
+    def test_max_frame_max_session_frames_max_sessions_default(self):
+        sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
+        sniffer.run()
+        self.assertEqual(2048, sniffer.remote_max_frame)
+        self.assertEqual(32767, sniffer.remote_channel_max)
 
 
 class MaxSessionsTest(TestCase):
     """System tests setting proton channel-max"""
     @classmethod
     def setUpClass(cls):
-        """Start a router and a messenger"""
         super(MaxSessionsTest, cls).setUpClass()
         name = "MaxSessions"
         config = Qdrouterd.Config([
@@ -74,22 +61,15 @@ class MaxSessionsTest(TestCase):
         cls.address = cls.router.addresses[0]
 
     def test_max_sessions(self):
-        # Set up a connection to get the Open and a receiver to get a Begin 
frame in the log
-        bc = BlockingConnection(self.router.addresses[0])
-        bc.create_receiver("xxx")
-        bc.close()
-        with open(self.router.logfile_path, 'r') as router_log:
-            log_lines = router_log.read().split("\n")
-            open_lines = [s for s in log_lines if "-> @open" in s]
-            # channel-max is 9
-            self.assertIn(" channel-max=9", open_lines[0])
+        sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
+        sniffer.run()
+        self.assertEqual(9, sniffer.remote_channel_max)
 
 
 class MaxSessionsZeroTest(TestCase):
     """System tests setting proton channel-max"""
     @classmethod
     def setUpClass(cls):
-        """Start a router and a messenger"""
         super(MaxSessionsZeroTest, cls).setUpClass()
         name = "MaxSessionsZero"
         config = Qdrouterd.Config([
@@ -102,22 +82,15 @@ class MaxSessionsZeroTest(TestCase):
         cls.address = cls.router.addresses[0]
 
     def test_max_sessions_zero(self):
-        # Set up a connection to get the Open and a receiver to get a Begin 
frame in the log
-        bc = BlockingConnection(self.router.addresses[0])
-        bc.create_receiver("xxx")
-        bc.close()
-        with open(self.router.logfile_path, 'r') as router_log:
-            log_lines = router_log.read().split("\n")
-            open_lines = [s for s in log_lines if "-> @open" in s]
-            # channel-max is 0. Should get proton default 32767
-            self.assertIn(" channel-max=32767", open_lines[0])
+        sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
+        sniffer.run()
+        self.assertEqual(32767, sniffer.remote_channel_max)
 
 
 class MaxSessionsLargeTest(TestCase):
     """System tests setting proton channel-max"""
     @classmethod
     def setUpClass(cls):
-        """Start a router and a messenger"""
         super(MaxSessionsLargeTest, cls).setUpClass()
         name = "MaxSessionsLarge"
         config = Qdrouterd.Config([
@@ -130,15 +103,9 @@ class MaxSessionsLargeTest(TestCase):
         cls.address = cls.router.addresses[0]
 
     def test_max_sessions_large(self):
-        # Set up a connection to get the Open and a receiver to get a Begin 
frame in the log
-        bc = BlockingConnection(self.router.addresses[0])
-        bc.create_receiver("xxx")
-        bc.close()
-        with open(self.router.logfile_path, 'r') as router_log:
-            log_lines = router_log.read().split("\n")
-            open_lines = [s for s in log_lines if "-> @open" in s]
-            # channel-max is 0. Should get proton default 32767
-            self.assertIn(" channel-max=32767", open_lines[0])
+        sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
+        sniffer.run()
+        self.assertEqual(32767, sniffer.remote_channel_max)
 
 
 class MaxFrameSmallTest(TestCase):
@@ -158,22 +125,15 @@ class MaxFrameSmallTest(TestCase):
         cls.address = cls.router.addresses[0]
 
     def test_max_frame_small(self):
-        # Set up a connection to get the Open and a receiver to get a Begin 
frame in the log
-        bc = BlockingConnection(self.router.addresses[0])
-        bc.create_receiver("xxx")
-        bc.close()
-        with open(self.router.logfile_path, 'r') as router_log:
-            log_lines = router_log.read().split("\n")
-            open_lines = [s for s in log_lines if "-> @open" in s]
-            # if frame size <= 512 proton set min of 512
-            self.assertIn(" max-frame-size=512", open_lines[0])
+        sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
+        sniffer.run()
+        self.assertEqual(512, sniffer.remote_max_frame)
 
 
 class MaxFrameDefaultTest(TestCase):
     """System tests setting proton max-frame-size"""
     @classmethod
     def setUpClass(cls):
-        """Start a router and a messenger"""
         super(MaxFrameDefaultTest, cls).setUpClass()
         name = "MaxFrameDefault"
         config = Qdrouterd.Config([
@@ -186,91 +146,9 @@ class MaxFrameDefaultTest(TestCase):
         cls.address = cls.router.addresses[0]
 
     def test_max_frame_default(self):
-        # Set up a connection to get the Open and a receiver to get a Begin 
frame in the log
-        bc = BlockingConnection(self.router.addresses[0])
-        bc.create_receiver("xxx")
-        bc.close()
-        with open(self.router.logfile_path, 'r') as router_log:
-            log_lines = router_log.read().split("\n")
-            open_lines = [s for s in log_lines if "-> @open" in s]
-            # if frame size not set then a default is used
-            self.assertIn(" max-frame-size=16384", open_lines[0])
-
-
-class MaxSessionFramesDefaultTest(TestCase):
-    """System tests setting proton max-frame-size"""
-    @classmethod
-    def setUpClass(cls):
-        """Start a router and a messenger"""
-        super(MaxSessionFramesDefaultTest, cls).setUpClass()
-        name = "MaxSessionFramesDefault"
-        config = Qdrouterd.Config([
-            ('router', {'mode': 'standalone', 'id': 'QDR'}),
-
-            ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port()}),
-        ])
-        cls.router = cls.tester.qdrouterd(name, config)
-        cls.router.wait_ready()
-        cls.address = cls.router.addresses[0]
-
-    def test_max_session_frames_default(self):
-        # Set up a connection to get the Open and a receiver to get a Begin 
frame in the log
-        bc = BlockingConnection(self.router.addresses[0])
-        bc.create_receiver("xxx")
-        bc.close()
-        with open(self.router.logfile_path, 'r') as router_log:
-            log_lines = router_log.read().split("\n")
-            open_lines = [s for s in log_lines if "-> @open" in s]
-            # if frame size not set then a default is used
-            self.assertIn(" max-frame-size=16384", open_lines[0])
-            begin_lines = [s for s in log_lines if "-> @begin" in s]
-            # incoming-window should be 2^31-1 (64-bit) or
-            # (2^31-1) / max-frame-size (32-bit)
-            is_64bits = sys.maxsize > 2 ** 32
-            expected = " incoming-window=2147483647," if is_64bits else \
-                (" incoming-window=%d," % int(2147483647 / 16384))
-            #self.assertIn(expected, begin_lines[0], "Expected:'%s' not found 
in '%s'" % (expected, begin_lines[0]))
-            self.assertIn(expected, begin_lines[0])
-
-
-class MaxFrameMaxSessionFramesZeroTest(TestCase):
-    """
-    System tests setting proton negotiated size max-frame-size and 
incoming-window
-    when they are both zero. Frame size is bumped up to the minimum and 
capacity is
-    bumped up to have an incoming window of 1
-    """
-    @classmethod
-    def setUpClass(cls):
-        """Start a router"""
-        super(MaxFrameMaxSessionFramesZeroTest, cls).setUpClass()
-        name = "MaxFrameMaxSessionFramesZero"
-        config = Qdrouterd.Config([
-            ('router', {'mode': 'standalone', 'id': 'QDR'}),
-
-            ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 
'maxFrameSize': '0', 'maxSessionFrames': '0'}),
-        ])
-        cls.router = cls.tester.qdrouterd(name, config)
-        cls.router.wait_ready()
-        cls.address = cls.router.addresses[0]
-
-    def test_max_frame_max_session_zero(self):
-        # Set up a connection to get the Open and a receiver to get a Begin 
frame in the log
-        bc = BlockingConnection(self.router.addresses[0])
-        bc.create_receiver("xxx")
-        bc.close()
-
-        with open(self.router.logfile_path, 'r') as router_log:
-            log_lines = router_log.read().split("\n")
-            open_lines = [s for s in log_lines if "-> @open" in s]
-            # max-frame gets set to protocol min
-            self.assertIn(' max-frame-size=512,', open_lines[0])
-            begin_lines = [s for s in log_lines if "-> @begin" in s]
-            # incoming-window should be 2^31-1 (64-bit) or
-            # (2^31-1) / max-frame-size (32-bit)
-            is_64bits = sys.maxsize > 2 ** 32
-            expected = " incoming-window=2147483647," if is_64bits else \
-                (" incoming-window=%d," % int(2147483647 / 512))
-            self.assertIn(expected, begin_lines[0])
+        sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
+        sniffer.run()
+        self.assertEqual(16384, sniffer.remote_max_frame)
 
 
 class ConnectorSettingsDefaultTest(TestCase):
@@ -279,19 +157,12 @@ class ConnectorSettingsDefaultTest(TestCase):
     is common code. This test makes sure that defaults in the connector
     config make it to the wire.
     """
-    inter_router_port = None
-
-    @staticmethod
-    def ssl_config(client_server, connection):
-        return []  # Over-ridden by RouterTestSsl
-
     @classmethod
     def setUpClass(cls):
-        """Start two routers"""
         super(ConnectorSettingsDefaultTest, cls).setUpClass()
 
         def router(name, client_server, connection):
-            config = cls.ssl_config(client_server, connection) + [
+            config = [
                 ('router', {'mode': 'interior', 'id': 'QDR.%s' % name}),
 
                 ('listener', {'port': cls.tester.get_port()}),
@@ -300,55 +171,34 @@ class ConnectorSettingsDefaultTest(TestCase):
 
             config = Qdrouterd.Config(config)
 
-            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
 
         cls.routers = []
 
-        inter_router_port = cls.tester.get_port()
+        cls.connector_port = cls.tester.get_port()
 
         router('A', 'server',
-               ('listener', {'role': 'inter-router', 'port': 
inter_router_port}))
-        router('B', 'client',
-               ('connector', {'name': 'connectorToA', 'role': 'inter-router', 
'port': inter_router_port}))
-
-        cls.routers[0].wait_router_connected('QDR.B')
-        cls.routers[1].wait_router_connected('QDR.A')
+               ('connector', {'name': 'testconnector', 'role': 
'route-container', 'port': cls.connector_port}))
 
     def test_connector_default(self):
-        with open(self.routers[0].logfile_path, 'r') as router_log:
-            log_lines = router_log.read().split("\n")
-            open_lines = [s for s in log_lines if "<- @open" in s]
-            # defaults
-            self.assertIn(' max-frame-size=16384,', open_lines[0])
-            self.assertIn(' channel-max=32767,', open_lines[0])
-            begin_lines = [s for s in log_lines if "<- @begin" in s]
-            # incoming-window should be 2^31-1 (64-bit) or
-            # (2^31-1) / max-frame-size (32-bit)
-            is_64bits = sys.maxsize > 2 ** 32
-            expected = " incoming-window=2147483647," if is_64bits else \
-                (" incoming-window=%d," % int(2147483647 / 16384))
-            self.assertIn(expected, begin_lines[0])
+        sniffer = 
ConnectorSettingsSniffer(self.routers[0].connector_addresses[0])
+        sniffer.run()
+        self.assertEqual(16384, sniffer.remote_max_frame)
+        self.assertEqual(32767, sniffer.remote_channel_max)
 
 
 class ConnectorSettingsNondefaultTest(TestCase):
     """
     The internal logic for protocol settings in listener and connector
     is common code. This test makes sure that settings in the connector
-    config make it to the wire. The listener tests test the setting logic.
+    config make it to the wire.
     """
-    inter_router_port = None
-
-    @staticmethod
-    def ssl_config(client_server, connection):
-        return []  # Over-ridden by RouterTestSsl
-
     @classmethod
     def setUpClass(cls):
-        """Start two routers"""
         super(ConnectorSettingsNondefaultTest, cls).setUpClass()
 
         def router(name, client_server, connection):
-            config = cls.ssl_config(client_server, connection) + [
+            config = [
                 ('router', {'mode': 'interior', 'id': 'QDR.%s' % name}),
 
                 ('listener', {'port': cls.tester.get_port()}),
@@ -357,41 +207,84 @@ class ConnectorSettingsNondefaultTest(TestCase):
 
             config = Qdrouterd.Config(config)
 
-            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
 
         cls.routers = []
 
-        inter_router_port = cls.tester.get_port()
+        cls.connector_port = cls.tester.get_port()
 
         router('A', 'server',
-               ('listener', {'role': 'inter-router', 'port': 
inter_router_port}))
-        router('B', 'client',
-               ('connector', {'name': 'connectorToA', 'role': 'inter-router', 
'port': inter_router_port,
+               ('connector', {'name': 'testconnector', 'role': 
'route-container', 'port': cls.connector_port,
                               'maxFrameSize': '2048', 'maxSessionFrames': 
'10', 'maxSessions': '20'}))
 
-        cls.routers[0].wait_router_connected('QDR.B')
-        cls.routers[1].wait_router_connected('QDR.A')
-
     def test_connector_default(self):
-        with open(self.routers[0].logfile_path, 'r') as router_log:
-            log_lines = router_log.read().split("\n")
-            open_lines = [s for s in log_lines if "<- @open" in s]
-            # nondefaults
-            self.assertIn(' max-frame-size=2048,', open_lines[0])
-            self.assertIn(' channel-max=19,', open_lines[0])
-            begin_lines = [s for s in log_lines if "<- @begin" in s]
-            # nondefaults
-            SEARCH_STRING = " incoming-window=10,"
-            # Sometimes, the begin frame we are looking for might not be
-            # the first in the list. We just need to make sure that
-            # it is somewhere in the list
-            line_found = False
-            for begin_line in begin_lines:
-                if SEARCH_STRING in begin_line:
-                    line_found = True
-                    break
-
-            self.assertTrue(line_found)
+        sniffer = 
ConnectorSettingsSniffer(self.routers[0].connector_addresses[0])
+        sniffer.run()
+        self.assertEqual(2048, sniffer.remote_max_frame)
+        self.assertEqual(19, sniffer.remote_channel_max)
+
+
+class ProtocolSettingsSniffer(MessagingHandler):
+    """Create a Connection/Session/Link and capture the various protocol
+    settings sent by the peer.
+    """
+    def __init__(self, router_addr, source_addr="xxx", **kwargs):
+        super(ProtocolSettingsSniffer, self).__init__(**kwargs)
+        self.router_addr = router_addr
+        self.source_addr = source_addr
+        self.conn = None
+        self.remote_max_frame = None
+        self.remote_channel_max = None
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
+        self.conn = event.container.connect(self.router_addr)
+        self.receiver = event.container.create_receiver(self.conn, 
self.source_addr)
+
+    def timeout(self):
+        self.error = "Timeout Expired - could not connect to router"
+        self.conn.close()
+
+    def on_link_opened(self, event):
+        tport = event.transport
+        self.remote_max_frame = tport.remote_max_frame_size
+        self.remote_channel_max = tport.remote_channel_max
+        self.conn.close()
+        self.timer.cancel()
+
+    def run(self):
+        Container(self).run()
+
+
+class ConnectorSettingsSniffer(MessagingHandler):
+    """Similar to ProtocolSettingsSniffer, but for router-initiated connections
+    """
+    def __init__(self, url, **kwargs):
+        super(ConnectorSettingsSniffer, self).__init__(**kwargs)
+        self.listener_addr = url
+        self.acceptor = None
+        self.remote_max_frame = None
+        self.remote_channel_max = None
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
+        self.acceptor = event.container.listen(self.listener_addr)
+
+    def timeout(self):
+        self.error = "Timeout Expired - router not connecting"
+        if self.acceptor:
+            self.acceptor.close()
+
+    def on_connection_opened(self, event):
+        tport = event.transport
+        self.remote_max_frame = tport.remote_max_frame_size
+        self.remote_channel_max = tport.remote_channel_max
+        event.connection.close()
+        self.acceptor.close()
+        self.timer.cancel()
+
+    def run(self):
+        Container(self).run()
 
 
 if __name__ == '__main__':
diff --git a/tests/system_tests_topology_disposition.py 
b/tests/system_tests_topology_disposition.py
index dc5360b..b95a51d 100644
--- a/tests/system_tests_topology_disposition.py
+++ b/tests/system_tests_topology_disposition.py
@@ -23,6 +23,7 @@ import time
 import unittest
 from subprocess import PIPE, STDOUT
 
+import proton
 from proton import Message, Timeout
 from proton.handlers import MessagingHandler
 from proton.reactor import Container
@@ -405,6 +406,7 @@ class TopologyDispositionTests (TestCase):
                 self.assertIsNone(error)
             self.assertIsNone(error)
 
+    @unittest.skipIf(proton.VERSION > (0, 36, 0), "see DISPATCH-2276")
     def test_04_scraper_tool(self):
         name = 'test_04'
         error = str(None)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to