This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push:
new 3419ad4 DISPATCH-2275: fixup system tests broken by proton log output
changes
3419ad4 is described below
commit 3419ad4e2c6f5f290ae8f8cab4f1387f89d10bd8
Author: Kenneth Giusti <[email protected]>
AuthorDate: Fri Nov 5 13:27:30 2021 -0400
DISPATCH-2275: fixup system tests broken by proton log output changes
- skip test_04_scraper_tool (DISPATCH-2276)
- modify system_tests_protocol_settings to use proton api
- fix system_tests_link_routes to use proton codec
This closes #1430
---
tests/system_tests_link_routes.py | 350 +++++++++++++++++++++--------
tests/system_tests_protocol_settings.py | 313 +++++++++-----------------
tests/system_tests_topology_disposition.py | 2 +
3 files changed, 363 insertions(+), 302 deletions(-)
diff --git a/tests/system_tests_link_routes.py
b/tests/system_tests_link_routes.py
index f2a7d5a..2672ca5 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -21,13 +21,14 @@ from time import sleep, time
from threading import Event
from subprocess import PIPE, STDOUT
import socket
+from typing import Optional
from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process,
TestTimeout, \
AsyncTestSender, AsyncTestReceiver, MgmtMsgProxy, unittest, QdManager
from test_broker import FakeBroker
from test_broker import FakeService
-from proton import Delivery, symbol
+from proton import Delivery, symbol, Data, Described
from proton import Message, Condition
from proton.handlers import MessagingHandler
from proton.reactor import AtMostOnce, Container, DynamicNodeProperties,
LinkOption, AtLeastOnce
@@ -1760,6 +1761,23 @@ class LinkRouteDrainTest(TestCase):
class EmptyTransferTest(TestCase):
+ """Verify empty tranfer frames (no body) do not crash the router. See
+ DISPATCH-1988.
+ """
+
+ # various identifiers defined by AMQP 1.0
+ OPEN_DESCRIPTOR = 0x10
+ BEGIN_DESCRIPTOR = 0x11
+ ATTACH_DESCRIPTOR = 0x12
+ FLOW_DESCRIPTOR = 0x13
+ TRANSFER_DESCRIPTOR = 0x14
+ DISPO_DESCRIPTOR = 0x15
+ ACCEPTED_OUTCOME = 0x24
+ REJECTED_OUTCOME = 0x25
+ TARGET_DESCRIPTOR = 0x29
+ MA_SECTION_DESCRIPTOR = 0x73
+ BODY_SECTION_DESCRIPTOR = 0x77
+
@classmethod
def setUpClass(cls):
super(EmptyTransferTest, cls).setUpClass()
@@ -1769,7 +1787,6 @@ class EmptyTransferTest(TestCase):
('router', {'mode': 'standalone', 'id': 'QDR.A'}),
# the client will connect to this listener
('listener', {'role': 'normal',
- 'host': '0.0.0.0',
'port': cls.ROUTER_LISTEN_PORT,
'saslMechanisms': 'ANONYMOUS'}),
# to connect to the fake broker
@@ -1797,110 +1814,259 @@ class EmptyTransferTest(TestCase):
self.router.wait_connectors()
return fake_broker
+ def _find_frame(self, data: bytes, code: int) -> Optional[list]:
+ """Scan a byte sequence for performatives that match code.
+ Return the frame body (list) if match else None
+ """
+ while data:
+ # starts at frame header (8 bytes)
+ frame_len = int.from_bytes(data[:4], "big")
+ if frame_len == 0 or frame_len > len(data):
+ return None
+ desc = Data()
+ desc.decode(data[8:frame_len]) # skip frame header
+ data = data[frame_len:] # advance to next frame
+ desc.rewind()
+ if desc.next() is None:
+ return None
+ if not desc.is_described():
+ return None
+ py_desc = desc.get_py_described()
+ if py_desc.descriptor == code:
+ return py_desc.value
+ return None
+
+ def _send_frame(self, frame: Data, sock: socket.socket):
+ """Encode and send frame over sock
+ """
+ frame.rewind()
+ fbytes = frame.encode()
+ flen = len(fbytes) + 8
+ # AMQP FRAME HEADER: 4 byte length, DOFF, TYPE, CHANNEL
+ sock.sendall(flen.to_bytes(4, "big"))
+ sock.sendall(bytes([2, 0, 0, 0]))
+ sock.sendall(fbytes)
+
+ def _construct_transfer(self, delivery_id, tag, more=False, add_ma=False,
+ add_body=False) -> Data:
+ """Construct a Transfer frame in a proton Data object
+ """
+ t1_frame = Data()
+ t1_frame.put_described()
+ t1_frame.enter()
+ t1_frame.put_ulong(self.TRANSFER_DESCRIPTOR)
+ t1_frame.put_list()
+ t1_frame.enter()
+ t1_frame.put_uint(0) # handle
+ t1_frame.put_uint(delivery_id)
+ t1_frame.put_binary(tag)
+ t1_frame.put_uint(0) # msg format
+ t1_frame.put_bool(False) # settled
+ t1_frame.put_bool(more)
+ t1_frame.exit() # transfer list
+ t1_frame.exit() # transfer described type
+ if add_ma:
+ t1_frame.put_described()
+ t1_frame.enter()
+ t1_frame.put_ulong(self.MA_SECTION_DESCRIPTOR)
+ t1_frame.put_list()
+ t1_frame.enter()
+ t1_frame.put_ulong(9)
+ t1_frame.exit() # list
+ t1_frame.exit() # described
+ if add_body:
+ t1_frame.put_described()
+ t1_frame.enter()
+ t1_frame.put_ulong(self.BODY_SECTION_DESCRIPTOR)
+ t1_frame.put_string("I'm a small body!")
+ t1_frame.exit()
+ t1_frame.exit()
+
+ return t1_frame
+
+ def _get_outcome(self, dispo_frame: list) -> Optional[int]:
+ """Extract the outcome from a raw disposition frame"""
+ outcome = None
+ if len(dispo_frame) >= 5: # list[5] == state
+ if isinstance(dispo_frame[4], Described):
+ outcome = dispo_frame[4].descriptor
+ return outcome
+
+ def _read_socket(self, sock: socket.socket,
+ timeout: float = 1.0) -> bytes:
+ """Read all available data from the socket, waiting up to 1 second for
+ data to arrive
+ """
+ old_timeout = sock.gettimeout()
+ sock.settimeout(timeout)
+ data = b''
+ while True:
+ try:
+ incoming = sock.recv(4096)
+ if not incoming:
+ break
+ data += incoming
+ except OSError: # timeout
+ break
+ sock.settimeout(old_timeout)
+ return data
+
def test_DISPATCH_1988(self):
fake_broker = self._fake_broker(FakeBroker)
- AMQP_OPEN_BEGIN_ATTACH = bytearray(
- b'\x41\x4d\x51\x50\x00\x01\x00\x00\x00\x00\x00\x21\x02\x00\x00'
- b'\x00\x00\x53\x10\xd0\x00\x00\x00\x11\x00\x00\x00\x04\xa1\x06'
- b'\x2e\x2f\x73\x65\x6e\x64\x40\x40\x60\x7f\xff\x00\x00\x00\x21'
- b'\x02\x00\x00\x00\x00\x53\x11\xd0\x00\x00\x00\x11\x00\x00\x00'
- b'\x04\x40\x52\x00\x70\x7f\xff\xff\xff\x70\x7f\xff\xff\xff\x00'
- b'\x00\x00\x5b\x02\x00\x00\x00\x00\x53\x12\xd0\x00\x00\x00\x4b'
- b'\x00\x00\x00\x0b\xa1\x09\x6d\x79\x5f\x73\x65\x6e\x64\x65\x72'
- b'\x52\x00\x42\x50\x02\x50\x00\x00\x53\x28\xd0\x00\x00\x00\x0b'
- b'\x00\x00\x00\x05\x40\x52\x00\x40\x52\x00\x42\x00\x53\x29\xd0'
- b'\x00\x00\x00\x14\x00\x00\x00\x05\xa1\x08\x65\x78\x61\x6d\x70'
- b'\x6c\x65\x73\x52\x00\x40\x52\x00\x42\x40\x40\x52\x00\x53\x00')
+ self.router.wait_ready()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.settimeout(TIMEOUT)
# Connect to the router listening port and send an amqp, open,
# begin, attach. The attach is sent on the link
# routed address, "examples"
- s.connect(("0.0.0.0", EmptyTransferTest.ROUTER_LISTEN_PORT))
- s.sendall(AMQP_OPEN_BEGIN_ATTACH)
-
- # Give a second for the attach to propagate to the broker and
- # for the broker to send a response attach
- sleep(1)
- data = s.recv(2048)
- self.assertIn("examples", repr(data))
-
- # First send a message on link routed address "examples" with
- # message body of "message 0"
- # Verify the the sent message has been accepted.
- TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00'
- + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x01'
- + b'\xa0\x01\x01\x43\x42'
- + b'\x40\x40\x40\x40\x40\x42\x00\x53'
- + b'\x73\xc0\x02\x01\x44\x00\x53\x77'
- + b'\xa1\x09\x6d\x65\x73\x73\x61\x67'
- + b'\x65\x20\x30')
- s.sendall(TRANSFER_1)
- sleep(0.5)
- data = s.recv(1024)
- # The delivery has been accepted.
- self.assertIn("x00S$E", repr(data))
+ s.connect(("127.0.0.1", EmptyTransferTest.ROUTER_LISTEN_PORT))
+
+ # send 'AMQP 1 0' preamble
+ s.sendall(b'\x41\x4d\x51\x50\x00\x01\x00\x00')
+
+ # send Open/Begin/Attach
+ open_frame = Data()
+ open_frame.put_described()
+ open_frame.enter()
+ open_frame.put_ulong(self.OPEN_DESCRIPTOR)
+ open_frame.put_list()
+ open_frame.enter()
+ open_frame.put_string("TestContainer")
+ open_frame.exit()
+ open_frame.exit()
+ open_frame.rewind()
+
+ begin_frame = Data()
+ begin_frame.put_described()
+ begin_frame.enter()
+ begin_frame.put_ulong(self.BEGIN_DESCRIPTOR)
+ begin_frame.put_list()
+ begin_frame.enter()
+ begin_frame.put_null()
+ begin_frame.put_uint(0) # next out id
+ begin_frame.put_uint(0xfffff) # in/out window
+ begin_frame.put_uint(0xfffff)
+ begin_frame.exit()
+ begin_frame.exit()
+ begin_frame.rewind()
+
+ attach_frame = Data()
+ attach_frame.put_described()
+ attach_frame.enter()
+ attach_frame.put_ulong(self.ATTACH_DESCRIPTOR)
+ attach_frame.put_list()
+ attach_frame.enter()
+ attach_frame.put_string("test-link-name")
+ attach_frame.put_uint(0) # handle
+ attach_frame.put_bool(False) # sender
+ attach_frame.put_null()
+ attach_frame.put_null()
+ attach_frame.put_null()
+ # target:
+ attach_frame.put_described()
+ attach_frame.enter()
+ attach_frame.put_ulong(self.TARGET_DESCRIPTOR)
+ attach_frame.put_list()
+ attach_frame.enter()
+ attach_frame.put_string("examples/foo")
+ attach_frame.exit() # target list
+ attach_frame.exit() # target descriptor
+ attach_frame.exit() # attach list
+ attach_frame.exit() # attach descriptor
+ attach_frame.rewind()
+
+ for frame in [open_frame, begin_frame, attach_frame]:
+ self._send_frame(frame, s)
+
+ # Give time for the attach to propagate to the broker and
+ # for the broker to send a response attach and flow:
+ data = self._read_socket(s, timeout=2.0)
+ self.assertTrue(len(data) > 8)
+ self.assertEqual(data[:8], b'AMQP\x00\x01\x00\x00')
+ # expect that the connection was accepted: check for a flow frame:
+ flow_frame = self._find_frame(data[8:], self.FLOW_DESCRIPTOR)
+ self.assertIsNotNone(flow_frame, "no flow frame received: %s" % data)
+
+ # First send a message on link routed address "examples" with a small
+ # message body. Verify the the sent message has been accepted.
+ t1_frame = self._construct_transfer(0, b'\x01', add_ma=True,
add_body=True)
+ self._send_frame(t1_frame, s)
+
+ # We expect to get a disposition frame that accepted the message
+ data = self._read_socket(s)
+ self.assertTrue(len(data) > 0)
+ dispo_frame = self._find_frame(data, self.DISPO_DESCRIPTOR)
+ self.assertIsNotNone(dispo_frame,
+ "expected a disposition (none arrived!): %s"
+ % data)
+
+ outcome = self._get_outcome(dispo_frame)
+ self.assertEqual(self.ACCEPTED_OUTCOME, outcome,
+ "Transfer not accepted (unexpected!) actual=%s"
+ % outcome)
# Test case 1
- # Send an empty transfer frame to the router and you should
- # receive a rejected disposition from the router.
- # Without the fix for DISPATCH_1988,
- # upon sending this EMPTY_TRANSFER
- # the router crashes with the following assert
+ #
+ # Send an empty transfer frame to the router and you should receive a
+ # rejected disposition from the router. Without the fix for
+ # DISPATCH_1988, upon sending this EMPTY_TRANSFER the router crashes
+ # with the following assert
+ #
# qpid-dispatch/src/message.c:1260: qd_message_add_fanout: Assertion
`content->pending && qd_buffer_size(content->pending) > 0' failed.
- # This is the empty transfer frame that is sent to the router.
- # [0x614000030050]: AMQP:FRAME:0 <- @transfer(20) [handle=0,
delivery-id=0, delivery-tag=b"\x01", message-format=0, settled=false,
batchable=false]
- EMPTY_TRANSFER = bytearray(b'\x00\x00\x00\x1c\x02\x00\x00\x00'
- + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52'
- + b'\x02\xa0\x01\x02\x43\x42'
- + b'\x42\x40\x40\x40\x40\x42')
- s.sendall(EMPTY_TRANSFER)
- sleep(1)
- data = s.recv(1024)
- # The delivery has been rejected.
- self.assertIn("x00S%E", repr(data))
-
- # Let's send another transfer to make sure that the
- # router has not crashed.
- TRANSFER_1 = bytearray(b'\x00\x00\x00\x31\x02\x00\x00\x00'
- + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x03'
- + b'\xa0\x01\x03\x43\x42'
- + b'\x40\x40\x40\x40\x40\x42\x00\x53'
- + b'\x73\xc0\x02\x01\x44\x00\x53\x77'
- + b'\xa1\x09\x6d\x65\x73\x73\x61\x67'
- + b'\x65\x20\x30')
- s.sendall(TRANSFER_1)
- sleep(0.5)
- data = s.recv(1024)
- # The delivery has been accepted.
- self.assertIn("x00S$E", repr(data))
+
+ t2_frame = self._construct_transfer(1, b'\x02')
+ self._send_frame(t2_frame, s)
+
+ data = self._read_socket(s)
+ self.assertTrue(len(data) > 0)
+ dispo_frame = self._find_frame(data, self.DISPO_DESCRIPTOR)
+ self.assertIsNotNone(dispo_frame,
+ "expected a disposition (none arrived!): %s"
+ % data)
+ outcome = self._get_outcome(dispo_frame)
+ self.assertEqual(self.REJECTED_OUTCOME, outcome,
+ "Transfer not rejected (unexpected!) actual=%s"
+ % outcome)
# Test case 2
- # Now, send two empty transfer frames, first transfer has
- # more=true and the next transfer has more=false.
- # This will again be rejected by the router.
- # The following are the two transfer frames that will be
- # sent to the router.
- #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0,
delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled =
false, more = true, batchable = false]
- #[0x614000020050]: AMQP:FRAME: 0 <- @ transfer(20)[handle = 0,
delivery - id = 4, delivery - tag = b"\x04", message - format = 0, settled =
false, more = false, batchable = false]
- EMPTY_TRANSFER_MORE_TRUE = bytearray(
- b'\x00\x00\x00\x1c\x02\x00\x00\x00'
- + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04'
- + b'\xa0\x01\x04\x43\x42'
- + b'\x41\x40\x40\x40\x40\x42')
- EMPTY_TRANSFER_MORE_FALSE = bytearray(
- b'\x00\x00\x00\x1c\x02\x00\x00\x00'
- + b'\x00\x53\x14\xc0\x0f\x0b\x43\x52\x04'
- + b'\xa0\x01\x04\x43\x42'
- + b'\x42\x40\x40\x40\x40\x42')
- s.sendall(EMPTY_TRANSFER_MORE_TRUE)
- s.sendall(EMPTY_TRANSFER_MORE_FALSE)
- sleep(0.5)
- data = s.recv(1024)
- # The delivery has been rejected.
- self.assertIn("x00S%E", repr(data))
+ # Now, send two empty transfer frames, first transfer has more=true and
+ # the next transfer has more=false. This will again be rejected by the
+ # router.
+
+ t3_frame = self._construct_transfer(2, b'\x03', more=True)
+ self._send_frame(t3_frame, s)
+ t4_frame = self._construct_transfer(2, b'\x03')
+ self._send_frame(t4_frame, s)
+
+ data = self._read_socket(s)
+ self.assertTrue(len(data) > 0)
+ dispo_frame = self._find_frame(data, self.DISPO_DESCRIPTOR)
+ self.assertIsNotNone(dispo_frame,
+ "expected a disposition (none arrived!): %s"
+ % data)
+ outcome = self._get_outcome(dispo_frame)
+ self.assertEqual(self.REJECTED_OUTCOME, outcome,
+ "Transfer not rejected (unexpected!) actual: %s"
+ % outcome)
+
+ # Now send a good transfer and ensure the router accepts it
+ t5_frame = self._construct_transfer(3, b'\x04', add_ma=True,
add_body=True)
+ self._send_frame(t5_frame, s)
+
+ data = self._read_socket(s)
+ self.assertTrue(len(data) > 0)
+ dispo_frame = self._find_frame(data, self.DISPO_DESCRIPTOR)
+ self.assertIsNotNone(dispo_frame,
+ "expected a disposition (none arrived!): %s"
+ % data)
+ outcome = self._get_outcome(dispo_frame)
+ self.assertEqual(self.ACCEPTED_OUTCOME, outcome,
+ "Transfer not accepted (unexpected!) actual: %s"
+ % outcome)
s.close()
+ fake_broker.join()
class ConnectionLinkRouteTest(TestCase):
diff --git a/tests/system_tests_protocol_settings.py
b/tests/system_tests_protocol_settings.py
index 70495ce..faaa889 100644
--- a/tests/system_tests_protocol_settings.py
+++ b/tests/system_tests_protocol_settings.py
@@ -17,17 +17,16 @@
# under the License.
#
-import sys
from system_test import TestCase, Qdrouterd, main_module
-from system_test import unittest
-from proton.utils import BlockingConnection
+from system_test import unittest, TIMEOUT, TestTimeout
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
class MaxFrameMaxSessionFramesTest(TestCase):
"""System tests setting proton negotiated size max-frame-size and
incoming-window"""
@classmethod
def setUpClass(cls):
- """Start a router"""
super(MaxFrameMaxSessionFramesTest, cls).setUpClass()
name = "MaxFrameMaxSessionFrames"
config = Qdrouterd.Config([
@@ -39,29 +38,17 @@ class MaxFrameMaxSessionFramesTest(TestCase):
cls.router.wait_ready()
cls.address = cls.router.addresses[0]
- def test_max_frame_max_session_frames__max_sessions_default(self):
- # Set up a connection to get the Open and a receiver to get a Begin
frame in the log
- bc = BlockingConnection(self.router.addresses[0])
- bc.create_receiver("xxx")
- bc.close()
-
- with open(self.router.logfile_path, 'r') as router_log:
- log_lines = router_log.read().split("\n")
- open_lines = [s for s in log_lines if "-> @open" in s]
- # max-frame is from the config
- self.assertIn(' max-frame-size=2048,', open_lines[0])
- # channel-max is default
- self.assertIn(" channel-max=32767", open_lines[0])
- begin_lines = [s for s in log_lines if "-> @begin" in s]
- # incoming-window is from the config
- self.assertIn(" incoming-window=10,", begin_lines[0])
+ def test_max_frame_max_session_frames_max_sessions_default(self):
+ sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
+ sniffer.run()
+ self.assertEqual(2048, sniffer.remote_max_frame)
+ self.assertEqual(32767, sniffer.remote_channel_max)
class MaxSessionsTest(TestCase):
"""System tests setting proton channel-max"""
@classmethod
def setUpClass(cls):
- """Start a router and a messenger"""
super(MaxSessionsTest, cls).setUpClass()
name = "MaxSessions"
config = Qdrouterd.Config([
@@ -74,22 +61,15 @@ class MaxSessionsTest(TestCase):
cls.address = cls.router.addresses[0]
def test_max_sessions(self):
- # Set up a connection to get the Open and a receiver to get a Begin
frame in the log
- bc = BlockingConnection(self.router.addresses[0])
- bc.create_receiver("xxx")
- bc.close()
- with open(self.router.logfile_path, 'r') as router_log:
- log_lines = router_log.read().split("\n")
- open_lines = [s for s in log_lines if "-> @open" in s]
- # channel-max is 9
- self.assertIn(" channel-max=9", open_lines[0])
+ sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
+ sniffer.run()
+ self.assertEqual(9, sniffer.remote_channel_max)
class MaxSessionsZeroTest(TestCase):
"""System tests setting proton channel-max"""
@classmethod
def setUpClass(cls):
- """Start a router and a messenger"""
super(MaxSessionsZeroTest, cls).setUpClass()
name = "MaxSessionsZero"
config = Qdrouterd.Config([
@@ -102,22 +82,15 @@ class MaxSessionsZeroTest(TestCase):
cls.address = cls.router.addresses[0]
def test_max_sessions_zero(self):
- # Set up a connection to get the Open and a receiver to get a Begin
frame in the log
- bc = BlockingConnection(self.router.addresses[0])
- bc.create_receiver("xxx")
- bc.close()
- with open(self.router.logfile_path, 'r') as router_log:
- log_lines = router_log.read().split("\n")
- open_lines = [s for s in log_lines if "-> @open" in s]
- # channel-max is 0. Should get proton default 32767
- self.assertIn(" channel-max=32767", open_lines[0])
+ sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
+ sniffer.run()
+ self.assertEqual(32767, sniffer.remote_channel_max)
class MaxSessionsLargeTest(TestCase):
"""System tests setting proton channel-max"""
@classmethod
def setUpClass(cls):
- """Start a router and a messenger"""
super(MaxSessionsLargeTest, cls).setUpClass()
name = "MaxSessionsLarge"
config = Qdrouterd.Config([
@@ -130,15 +103,9 @@ class MaxSessionsLargeTest(TestCase):
cls.address = cls.router.addresses[0]
def test_max_sessions_large(self):
- # Set up a connection to get the Open and a receiver to get a Begin
frame in the log
- bc = BlockingConnection(self.router.addresses[0])
- bc.create_receiver("xxx")
- bc.close()
- with open(self.router.logfile_path, 'r') as router_log:
- log_lines = router_log.read().split("\n")
- open_lines = [s for s in log_lines if "-> @open" in s]
- # channel-max is 0. Should get proton default 32767
- self.assertIn(" channel-max=32767", open_lines[0])
+ sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
+ sniffer.run()
+ self.assertEqual(32767, sniffer.remote_channel_max)
class MaxFrameSmallTest(TestCase):
@@ -158,22 +125,15 @@ class MaxFrameSmallTest(TestCase):
cls.address = cls.router.addresses[0]
def test_max_frame_small(self):
- # Set up a connection to get the Open and a receiver to get a Begin
frame in the log
- bc = BlockingConnection(self.router.addresses[0])
- bc.create_receiver("xxx")
- bc.close()
- with open(self.router.logfile_path, 'r') as router_log:
- log_lines = router_log.read().split("\n")
- open_lines = [s for s in log_lines if "-> @open" in s]
- # if frame size <= 512 proton set min of 512
- self.assertIn(" max-frame-size=512", open_lines[0])
+ sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
+ sniffer.run()
+ self.assertEqual(512, sniffer.remote_max_frame)
class MaxFrameDefaultTest(TestCase):
"""System tests setting proton max-frame-size"""
@classmethod
def setUpClass(cls):
- """Start a router and a messenger"""
super(MaxFrameDefaultTest, cls).setUpClass()
name = "MaxFrameDefault"
config = Qdrouterd.Config([
@@ -186,91 +146,9 @@ class MaxFrameDefaultTest(TestCase):
cls.address = cls.router.addresses[0]
def test_max_frame_default(self):
- # Set up a connection to get the Open and a receiver to get a Begin
frame in the log
- bc = BlockingConnection(self.router.addresses[0])
- bc.create_receiver("xxx")
- bc.close()
- with open(self.router.logfile_path, 'r') as router_log:
- log_lines = router_log.read().split("\n")
- open_lines = [s for s in log_lines if "-> @open" in s]
- # if frame size not set then a default is used
- self.assertIn(" max-frame-size=16384", open_lines[0])
-
-
-class MaxSessionFramesDefaultTest(TestCase):
- """System tests setting proton max-frame-size"""
- @classmethod
- def setUpClass(cls):
- """Start a router and a messenger"""
- super(MaxSessionFramesDefaultTest, cls).setUpClass()
- name = "MaxSessionFramesDefault"
- config = Qdrouterd.Config([
- ('router', {'mode': 'standalone', 'id': 'QDR'}),
-
- ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port()}),
- ])
- cls.router = cls.tester.qdrouterd(name, config)
- cls.router.wait_ready()
- cls.address = cls.router.addresses[0]
-
- def test_max_session_frames_default(self):
- # Set up a connection to get the Open and a receiver to get a Begin
frame in the log
- bc = BlockingConnection(self.router.addresses[0])
- bc.create_receiver("xxx")
- bc.close()
- with open(self.router.logfile_path, 'r') as router_log:
- log_lines = router_log.read().split("\n")
- open_lines = [s for s in log_lines if "-> @open" in s]
- # if frame size not set then a default is used
- self.assertIn(" max-frame-size=16384", open_lines[0])
- begin_lines = [s for s in log_lines if "-> @begin" in s]
- # incoming-window should be 2^31-1 (64-bit) or
- # (2^31-1) / max-frame-size (32-bit)
- is_64bits = sys.maxsize > 2 ** 32
- expected = " incoming-window=2147483647," if is_64bits else \
- (" incoming-window=%d," % int(2147483647 / 16384))
- #self.assertIn(expected, begin_lines[0], "Expected:'%s' not found
in '%s'" % (expected, begin_lines[0]))
- self.assertIn(expected, begin_lines[0])
-
-
-class MaxFrameMaxSessionFramesZeroTest(TestCase):
- """
- System tests setting proton negotiated size max-frame-size and
incoming-window
- when they are both zero. Frame size is bumped up to the minimum and
capacity is
- bumped up to have an incoming window of 1
- """
- @classmethod
- def setUpClass(cls):
- """Start a router"""
- super(MaxFrameMaxSessionFramesZeroTest, cls).setUpClass()
- name = "MaxFrameMaxSessionFramesZero"
- config = Qdrouterd.Config([
- ('router', {'mode': 'standalone', 'id': 'QDR'}),
-
- ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(),
'maxFrameSize': '0', 'maxSessionFrames': '0'}),
- ])
- cls.router = cls.tester.qdrouterd(name, config)
- cls.router.wait_ready()
- cls.address = cls.router.addresses[0]
-
- def test_max_frame_max_session_zero(self):
- # Set up a connection to get the Open and a receiver to get a Begin
frame in the log
- bc = BlockingConnection(self.router.addresses[0])
- bc.create_receiver("xxx")
- bc.close()
-
- with open(self.router.logfile_path, 'r') as router_log:
- log_lines = router_log.read().split("\n")
- open_lines = [s for s in log_lines if "-> @open" in s]
- # max-frame gets set to protocol min
- self.assertIn(' max-frame-size=512,', open_lines[0])
- begin_lines = [s for s in log_lines if "-> @begin" in s]
- # incoming-window should be 2^31-1 (64-bit) or
- # (2^31-1) / max-frame-size (32-bit)
- is_64bits = sys.maxsize > 2 ** 32
- expected = " incoming-window=2147483647," if is_64bits else \
- (" incoming-window=%d," % int(2147483647 / 512))
- self.assertIn(expected, begin_lines[0])
+ sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx")
+ sniffer.run()
+ self.assertEqual(16384, sniffer.remote_max_frame)
class ConnectorSettingsDefaultTest(TestCase):
@@ -279,19 +157,12 @@ class ConnectorSettingsDefaultTest(TestCase):
is common code. This test makes sure that defaults in the connector
config make it to the wire.
"""
- inter_router_port = None
-
- @staticmethod
- def ssl_config(client_server, connection):
- return [] # Over-ridden by RouterTestSsl
-
@classmethod
def setUpClass(cls):
- """Start two routers"""
super(ConnectorSettingsDefaultTest, cls).setUpClass()
def router(name, client_server, connection):
- config = cls.ssl_config(client_server, connection) + [
+ config = [
('router', {'mode': 'interior', 'id': 'QDR.%s' % name}),
('listener', {'port': cls.tester.get_port()}),
@@ -300,55 +171,34 @@ class ConnectorSettingsDefaultTest(TestCase):
config = Qdrouterd.Config(config)
- cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
cls.routers = []
- inter_router_port = cls.tester.get_port()
+ cls.connector_port = cls.tester.get_port()
router('A', 'server',
- ('listener', {'role': 'inter-router', 'port':
inter_router_port}))
- router('B', 'client',
- ('connector', {'name': 'connectorToA', 'role': 'inter-router',
'port': inter_router_port}))
-
- cls.routers[0].wait_router_connected('QDR.B')
- cls.routers[1].wait_router_connected('QDR.A')
+ ('connector', {'name': 'testconnector', 'role':
'route-container', 'port': cls.connector_port}))
def test_connector_default(self):
- with open(self.routers[0].logfile_path, 'r') as router_log:
- log_lines = router_log.read().split("\n")
- open_lines = [s for s in log_lines if "<- @open" in s]
- # defaults
- self.assertIn(' max-frame-size=16384,', open_lines[0])
- self.assertIn(' channel-max=32767,', open_lines[0])
- begin_lines = [s for s in log_lines if "<- @begin" in s]
- # incoming-window should be 2^31-1 (64-bit) or
- # (2^31-1) / max-frame-size (32-bit)
- is_64bits = sys.maxsize > 2 ** 32
- expected = " incoming-window=2147483647," if is_64bits else \
- (" incoming-window=%d," % int(2147483647 / 16384))
- self.assertIn(expected, begin_lines[0])
+ sniffer =
ConnectorSettingsSniffer(self.routers[0].connector_addresses[0])
+ sniffer.run()
+ self.assertEqual(16384, sniffer.remote_max_frame)
+ self.assertEqual(32767, sniffer.remote_channel_max)
class ConnectorSettingsNondefaultTest(TestCase):
"""
The internal logic for protocol settings in listener and connector
is common code. This test makes sure that settings in the connector
- config make it to the wire. The listener tests test the setting logic.
+ config make it to the wire.
"""
- inter_router_port = None
-
- @staticmethod
- def ssl_config(client_server, connection):
- return [] # Over-ridden by RouterTestSsl
-
@classmethod
def setUpClass(cls):
- """Start two routers"""
super(ConnectorSettingsNondefaultTest, cls).setUpClass()
def router(name, client_server, connection):
- config = cls.ssl_config(client_server, connection) + [
+ config = [
('router', {'mode': 'interior', 'id': 'QDR.%s' % name}),
('listener', {'port': cls.tester.get_port()}),
@@ -357,41 +207,84 @@ class ConnectorSettingsNondefaultTest(TestCase):
config = Qdrouterd.Config(config)
- cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+ cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
cls.routers = []
- inter_router_port = cls.tester.get_port()
+ cls.connector_port = cls.tester.get_port()
router('A', 'server',
- ('listener', {'role': 'inter-router', 'port':
inter_router_port}))
- router('B', 'client',
- ('connector', {'name': 'connectorToA', 'role': 'inter-router',
'port': inter_router_port,
+ ('connector', {'name': 'testconnector', 'role':
'route-container', 'port': cls.connector_port,
'maxFrameSize': '2048', 'maxSessionFrames':
'10', 'maxSessions': '20'}))
- cls.routers[0].wait_router_connected('QDR.B')
- cls.routers[1].wait_router_connected('QDR.A')
-
def test_connector_default(self):
- with open(self.routers[0].logfile_path, 'r') as router_log:
- log_lines = router_log.read().split("\n")
- open_lines = [s for s in log_lines if "<- @open" in s]
- # nondefaults
- self.assertIn(' max-frame-size=2048,', open_lines[0])
- self.assertIn(' channel-max=19,', open_lines[0])
- begin_lines = [s for s in log_lines if "<- @begin" in s]
- # nondefaults
- SEARCH_STRING = " incoming-window=10,"
- # Sometimes, the begin frame we are looking for might not be
- # the first in the list. We just need to make sure that
- # it is somewhere in the list
- line_found = False
- for begin_line in begin_lines:
- if SEARCH_STRING in begin_line:
- line_found = True
- break
-
- self.assertTrue(line_found)
+ sniffer =
ConnectorSettingsSniffer(self.routers[0].connector_addresses[0])
+ sniffer.run()
+ self.assertEqual(2048, sniffer.remote_max_frame)
+ self.assertEqual(19, sniffer.remote_channel_max)
+
+
+class ProtocolSettingsSniffer(MessagingHandler):
+ """Create a Connection/Session/Link and capture the various protocol
+ settings sent by the peer.
+ """
+ def __init__(self, router_addr, source_addr="xxx", **kwargs):
+ super(ProtocolSettingsSniffer, self).__init__(**kwargs)
+ self.router_addr = router_addr
+ self.source_addr = source_addr
+ self.conn = None
+ self.remote_max_frame = None
+ self.remote_channel_max = None
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
+ self.conn = event.container.connect(self.router_addr)
+ self.receiver = event.container.create_receiver(self.conn,
self.source_addr)
+
+ def timeout(self):
+ self.error = "Timeout Expired - could not connect to router"
+ self.conn.close()
+
+ def on_link_opened(self, event):
+ tport = event.transport
+ self.remote_max_frame = tport.remote_max_frame_size
+ self.remote_channel_max = tport.remote_channel_max
+ self.conn.close()
+ self.timer.cancel()
+
+ def run(self):
+ Container(self).run()
+
+
+class ConnectorSettingsSniffer(MessagingHandler):
+ """Similar to ProtocolSettingsSniffer, but for router-initiated connections
+ """
+ def __init__(self, url, **kwargs):
+ super(ConnectorSettingsSniffer, self).__init__(**kwargs)
+ self.listener_addr = url
+ self.acceptor = None
+ self.remote_max_frame = None
+ self.remote_channel_max = None
+
+ def on_start(self, event):
+ self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
+ self.acceptor = event.container.listen(self.listener_addr)
+
+ def timeout(self):
+ self.error = "Timeout Expired - router not connecting"
+ if self.acceptor:
+ self.acceptor.close()
+
+ def on_connection_opened(self, event):
+ tport = event.transport
+ self.remote_max_frame = tport.remote_max_frame_size
+ self.remote_channel_max = tport.remote_channel_max
+ event.connection.close()
+ self.acceptor.close()
+ self.timer.cancel()
+
+ def run(self):
+ Container(self).run()
if __name__ == '__main__':
diff --git a/tests/system_tests_topology_disposition.py
b/tests/system_tests_topology_disposition.py
index dc5360b..b95a51d 100644
--- a/tests/system_tests_topology_disposition.py
+++ b/tests/system_tests_topology_disposition.py
@@ -23,6 +23,7 @@ import time
import unittest
from subprocess import PIPE, STDOUT
+import proton
from proton import Message, Timeout
from proton.handlers import MessagingHandler
from proton.reactor import Container
@@ -405,6 +406,7 @@ class TopologyDispositionTests (TestCase):
self.assertIsNone(error)
self.assertIsNone(error)
+ @unittest.skipIf(proton.VERSION > (0, 36, 0), "see DISPATCH-2276")
def test_04_scraper_tool(self):
name = 'test_04'
error = str(None)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]