This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push:
new b12d49a DISPATCH-2227: simplify the
system_tests_distribution.test_09_closest_linear
b12d49a is described below
commit b12d49a18f5bf055487e1809f854024c935b36a0
Author: Kenneth Giusti <[email protected]>
AuthorDate: Fri Sep 17 14:59:58 2021 -0400
DISPATCH-2227: simplify the system_tests_distribution.test_09_closest_linear
This closes #1370
---
tests/system_tests_distribution.py | 420 +++++++++++++++++++------------------
1 file changed, 211 insertions(+), 209 deletions(-)
diff --git a/tests/system_tests_distribution.py
b/tests/system_tests_distribution.py
index 4e200f7..e6126a7 100644
--- a/tests/system_tests_distribution.py
+++ b/tests/system_tests_distribution.py
@@ -17,11 +17,11 @@
# under the License.
#
-import sys
+from time import sleep
-from proton import Message
+from proton import Message, Delivery
from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, TestTimeout
-from system_test import unittest
+from system_test import unittest, Logger
from proton.handlers import MessagingHandler
from proton.reactor import Container, LinkOption, ApplicationEvent,
EventInjector
@@ -44,6 +44,10 @@ class AddressCheckResponse(object):
def __getattr__(self, key):
return self.attrs[key]
+ def __str__(self):
+ return "Address Check Response: status=%s desc=%s attrs=%s" % \
+ (self.status_code, self.status_description, self.attrs)
+
class AddressChecker (object):
"""
@@ -1643,9 +1647,10 @@ class DistributionTests (TestCase):
next_link_sequence = 1
-def link_name():
+def link_name(suffix=None):
global next_link_sequence
- name = "link-name.%d" % next_link_sequence
+ suffix = suffix or "name"
+ name = "link-%s.%d" % (suffix, next_link_sequence)
next_link_sequence += 1
return name
@@ -2172,26 +2177,50 @@ class LinkAttachRouting (MessagingHandler):
class ClosestTest (MessagingHandler):
"""
- Test whether distance-based message routing works in a
- 3-router network. The network may be linear or mesh,
- depending on which routers the caller gives us.
+ Test whether distance-based message routing works in a 3-router
+ network. The network may be linear or mesh, depending on which routers the
+ caller gives us.
(Illustration is a linear network.)
sender -----> Router_1 -----> Router_2 -----> Router_3
| | |
v v v
- rcvr_1_a rcvr_2_a rcvr_3_a
- rcvr_1_b rcvr_2_b rcvr_3_b
+ rcvr_1 rcvr_2 rcvr_3
+
+ With a linear network of 3 routers, set up a sender on router_1, and then 1
+ receiver each on all 3 routers. Requirement: router 2 is closer than
+ router 3 by one hop.
+
+ Once the closest pair of receivers has received the required amount of
+ messages they are closed. Neither of the other receivers should have
+ received any messages, as they were not the closest receivers.
+
+ Repeat until all three routers have received messages.
+
+ The test is set up in phases to ensure there are no races between fast/slow
+ clients and routers:
+
+ Phase 1: bring up all connections and create receivers, wait until all
+ receivers can finished link setup.
- With a linear network of 3 routers, set up a sender on
- router_1, and then 2 receivers each on all 3 routers.
+ Phase 2: poll routers until the subscriber count shows all receivers are
+ ready.
+ Phase 3: start the sender, wait until on_sendable triggers
+
+ Phase 4: send test messages, verify distribution.
+
+ Once a batch of messages has completely arrived at the current closest
+ receiver, close that receiver. Note that there can be a few seconds before
+ that loss of link propagates to all three routers. During that time any
+ sent messages may fail with outcome RELEASED - that is expected.
"""
def __init__(self, test_name, router_1, router_2, router_3, addr_suffix,
print_debug=False):
super(ClosestTest, self).__init__(prefetch=0)
+ self.test_name = test_name
self.error = None
self.router_1 = router_1
self.router_2 = router_2
@@ -2199,58 +2228,45 @@ class ClosestTest (MessagingHandler):
self.addr_suffix = addr_suffix
self.dest = "closest/" + addr_suffix
- # This n_expected is actually the minimum number of messages
- # I will send. The real number will be higher because some
- # will be released when I close some receivers.
- self.n_expected = 300
- self.one_third = self.n_expected / 3
-
+ # after send_batch sent messages are accepted, verify the closest
+ # receivers have consumed the batch.
+ self.send_batch = 4
+ self.n_sent = 0
self.n_received = 0
+ self.sender = None
- self.count_1_a = 0
- self.count_1_b = 0
- self.count_2_a = 0
- self.count_2_b = 0
- self.count_3_a = 0
- self.count_3_b = 0
+ self.rx_opened = 0
+ self.rx_count_1 = 0
+ self.rx_count_2 = 0
+ self.rx_count_3 = 0
+ self.closest_rx = None
- self.addr_check_timer = None
+ # for checking the number of subscribers to the destination address
+ self.addr_checker = None
self.addr_check_receiver = None
self.addr_check_sender = None
- self.bailed = False
- self.test_name = test_name
-
- self.sender = None
- self.n_sent_1 = 0
- self.n_sent_2 = 0
- self.n_sent_3 = 0
-
- self.recv_1_a_closed = False
- self.recv_1_b_closed = False
- self.recv_2_a_closed = False
- self.recv_2_b_closed = False
- self.first_check = True
- self.send_on_sendable = True
+ self._logger = Logger(title=test_name, print_to_console=print_debug)
- self.print_debug = print_debug
+ def _new_message(self):
+ """Add expected rx for log tracing
+ """
+ return Message(body="%s: Hello, %s." % (self.test_name,
self.closest_rx.name),
+ address=self.dest)
def timeout(self):
- sys.stdout.flush()
- self.bail("Timeout Expired ")
-
- def address_check_timeout(self):
- self.addr_check()
+ self.bail("Timeout Expired")
- def bail(self, text):
+ def bail(self, error_text=None):
self.timer.cancel()
- self.error = text
+ self.error = error_text
self.send_cnx.close()
self.cnx_1.close()
self.cnx_2.close()
self.cnx_3.close()
- if self.addr_check_timer:
- self.addr_check_timer.cancel()
+ if error_text:
+ self._logger.log("Test failed: %s" % error_text)
+ self._logger.dump()
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
@@ -2259,185 +2275,170 @@ class ClosestTest (MessagingHandler):
self.cnx_2 = event.container.connect(self.router_2)
self.cnx_3 = event.container.connect(self.router_3)
- # Warning!
- # The two receiver-links on each router must be given
- # explicit distinct names, or we will in fact only get
- # one link. And then wonder why receiver 2 on each
- # router isn't getting any messages.
- self.recv_1_a = event.container.create_receiver(self.cnx_1,
self.dest, name=link_name())
- self.recv_1_b = event.container.create_receiver(self.cnx_1,
self.dest, name=link_name())
-
- self.recv_2_a = event.container.create_receiver(self.cnx_2,
self.dest, name=link_name())
- self.recv_2_b = event.container.create_receiver(self.cnx_2,
self.dest, name=link_name())
-
- self.recv_3_a = event.container.create_receiver(self.cnx_3,
self.dest, name=link_name())
- self.recv_3_b = event.container.create_receiver(self.cnx_3,
self.dest, name=link_name())
+ self.recv_1 = event.container.create_receiver(self.cnx_1, self.dest,
+ name=link_name("rx1"))
+ self.recv_2 = event.container.create_receiver(self.cnx_2, self.dest,
+ name=link_name("rx2"))
+ self.recv_3 = event.container.create_receiver(self.cnx_3, self.dest,
+ name=link_name("rx3"))
- self.recv_1_a.flow(self.n_expected)
- self.recv_2_a.flow(self.n_expected)
- self.recv_3_a.flow(self.n_expected)
+ # grant way more flow than necessary so we can consume any mis-routed
+ # message (that is a bug)
+ self.recv_1.flow(100)
+ self.recv_2.flow(100)
+ self.recv_3.flow(100)
- self.recv_1_b.flow(self.n_expected)
- self.recv_2_b.flow(self.n_expected)
- self.recv_3_b.flow(self.n_expected)
-
- self.addr_check_receiver = event.container.create_receiver(self.cnx_1,
dynamic=True)
- self.addr_check_receiver.flow(100)
- self.addr_check_sender = event.container.create_sender(self.cnx_1,
"$management")
-
- self.m_sent_1 = False
- self.m_sent_2 = False
- self.m_sent_3 = False
+ self.closest_rx = self.recv_1
def on_link_opened(self, event):
- if event.receiver == self.addr_check_receiver:
- # my addr-check link has opened: make the addr_checker with the
given address.
- self.addr_checker =
AddressChecker(self.addr_check_receiver.remote_source.address)
+ self._logger.log("Link opened: %s" % event.link.name)
+ if event.link.is_receiver:
+ if event.receiver == self.addr_check_receiver:
+ # Phase 2: address checker address available:
+ self._logger.log("Address check receiver link opened")
+ assert self.addr_checker is None
+ self.addr_checker =
AddressChecker(self.addr_check_receiver.remote_source.address)
+ self.addr_check_sender =
event.container.create_sender(self.cnx_1,
+
"$management",
+
name=link_name("check_tx"))
+ elif self.rx_opened != 3:
+ # Phase 1: wait for receivers to come up
+ assert event.receiver in [self.recv_1, self.recv_2,
self.recv_3]
+ self._logger.log("Test receiver link opened: %s" %
event.link.name)
+ self.rx_opened += 1
+ if self.rx_opened == 3:
+ # All test receivers links open, start Phase 2:
+ self._logger.log("Opening address check receiver")
+ self.addr_check_receiver =
event.container.create_receiver(self.cnx_1,
+
dynamic=True,
+
name=link_name("check_rx"))
+ self.addr_check_receiver.flow(100)
+
+ elif event.sender == self.addr_check_sender:
+ # fire off the first address query
self.addr_check()
def on_message(self, event):
+ self._logger.log("on_message %s" % event.receiver.name)
if event.receiver == self.addr_check_receiver:
# This is a response to one of my address-readiness checking
messages.
response =
self.addr_checker.parse_address_query_response(event.message)
- if self.first_check:
- if response.status_code == 200 and response.subscriberCount ==
2 and response.remoteCount == 2:
- # now we know that we have two subscribers on attached
router, and two remote
- # routers that know about the address. The network is
ready.
- # Now we can make the sender without getting a
- # "No Path To Destination" error.
- self.sender = event.container.create_sender(self.send_cnx,
self.dest)
-
- if not self.m_sent_1:
- self.m_sent_1 = True
- while self.n_sent_1 < self.one_third:
- msg = Message(body="Hello, closest.",
- address=self.dest)
- self.sender.send(msg)
- self.n_sent_1 += 1
- if self.print_debug:
- print("First hundred sent")
-
- # And we can quit checking.
- if self.addr_check_timer:
- self.addr_check_timer.cancel()
- self.addr_check_timer = None
- else:
- # If the latest check did not find the link-attack route
ready,
- # schedule another check a little while from now.
- self.addr_check_timer = event.reactor.schedule(3,
AddressCheckerTimeout(self))
+ if response.status_code == 200 and response.subscriberCount == 1
and response.remoteCount == 2:
+ # now we know that we have two subscribers on attached router,
and two remote
+ # routers that know about the address. The network is ready.
+ self._logger.log("Network ready")
+ assert self.sender is None
+ self.sender = event.container.create_sender(self.send_cnx,
+ self.dest,
+
name=link_name("sender"))
else:
- if response.status_code == 200 and response.subscriberCount ==
0 and response.remoteCount == 1:
- if not self.m_sent_3:
- self.m_sent_3 = True
- while self.n_sent_2 < self.one_third:
- msg = Message(body="Hello, closest.",
- address=self.dest)
- self.sender.send(msg)
- self.n_sent_2 += 1
-
- if self.print_debug:
- print("Third hundred sent")
-
- if self.addr_check_timer:
- self.addr_check_timer.cancel()
- self.addr_check_timer = None
- else:
- # If the latest check did not find the link-attack route
ready,
- # schedule another check a little while from now.
- self.addr_check_timer = event.reactor.schedule(3,
-
AddressCheckerTimeout(
- self))
- else :
+ # Not ready yet - poll again. This will continue until either
+ # the routers have updated or the test times out
+ self._logger.log("Network not ready yet: %s" % response)
+ self.addr_check_receiver.flow(1)
+ sleep(0.25)
+ self.addr_check()
+ else:
# This is a payload message.
self.n_received += 1
# Count the messages that have come in for
# each receiver.
- if event.receiver == self.recv_1_a:
- self.count_1_a += 1
- elif event.receiver == self.recv_1_b:
- self.count_1_b += 1
- elif event.receiver == self.recv_2_a:
- self.count_2_a += 1
- elif event.receiver == self.recv_2_b:
- self.count_2_b += 1
- elif event.receiver == self.recv_3_a:
- self.count_3_a += 1
- if self.print_debug:
- print("self.count_3_a", self.count_3_a)
- elif event.receiver == self.recv_3_b:
- self.count_3_b += 1
- if self.print_debug:
- print("self.count_3_b", self.count_3_b)
-
- if self.n_received == self.one_third:
- if self.print_debug:
- print("First one third received")
- # The first one-third of messages should have gone exclusively
- # to the near receivers. At this point we should have
- # no messages in the mid or far receivers.
- self.recv_1_a.close()
- self.recv_1_b.close()
- if (self.count_2_a + self.count_2_b + self.count_3_a +
self.count_3_b) > 0 :
- self.bail("error: routers 2 or 3 got messages before
router 1 receivers were closed.")
- # Make sure both receivers got some messages.
- if (self.count_1_a < self.one_third / 2 or self.count_1_b <
self.one_third / 2) or (self.count_1_b != self.count_1_a):
- self.bail("error: recv_1_a and recv_1_b did not get equal
number of messages")
-
- elif self.n_received == 2 * self.one_third:
- if self.print_debug:
- print("Second one third received")
- # The next one-third of messages should have gone exclusively
- # to the router_2 receivers. At this point we should have
- # no messages in the far receivers.
- self.recv_2_a.close()
- self.recv_2_b.close()
- if (self.count_3_a + self.count_3_b) > 0 :
- self.bail("error: router 3 got messages before 2 was
closed.")
- # Make sure both receivers got some messages.
- if (self.count_2_a < self.one_third / 2 or self.count_2_b <
self.one_third / 2) or (self.count_2_b != self.count_2_a):
- self.bail("error: recv_2_a and recv_2_b did not get equal
number of messages")
-
- # By the time we reach the expected number of messages
- # we have closed the router_1 and router_2 receivers. If the
- # router_3 receivers are empty at this point, something is wrong.
- if self.n_received >= self.n_expected :
- if self.print_debug:
- print("Third one third received")
- if (self.count_3_a < self.one_third / 2 or self.count_3_b <
self.one_third / 2) or (self.count_3_b != self.count_3_a):
- self.bail("error: recv_3_a and recv_3_b did not get equal
number of messages")
+ if event.receiver == self.recv_1:
+ self.rx_count_1 += 1
+ self._logger.log("RX 1 got message, total=%s" %
self.rx_count_1)
+ elif event.receiver == self.recv_2:
+ self.rx_count_2 += 1
+ self._logger.log("RX 2 got message, total=%s" %
self.rx_count_2)
+ elif event.receiver == self.recv_3:
+ self.rx_count_3 += 1
+ self._logger.log("RX 3 got message, total=%s" %
self.rx_count_3)
+ else:
+ self.bail("Unexpected receiver?")
+
+ def on_sendable(self, event):
+ self._logger.log("on_sendable %s" % event.sender.name)
+ if event.sender == self.sender:
+ if self.n_sent == 0:
+ # only have one message outstanding
+ self._logger.log("sending (sent=%s)" % self.n_sent)
+ self.sender.send(self._new_message())
+ self.n_sent += 1
+
+ def on_settled(self, event):
+ self._logger.log("On settled, link: %s" % event.link.name)
+ if event.link == self.sender:
+ dlv = event.delivery
+ if dlv.remote_state == Delivery.ACCEPTED:
+ if self.closest_rx == self.recv_1:
+ if self.rx_count_2 or self.rx_count_3:
+ self.bail("Error: non-closest client got message!")
+ else:
+ self.rx_count_1 += 1
+ if self.rx_count_1 == self.send_batch:
+ self._logger.log("RX 1 complete, closing")
+ self.recv_1.close()
+ self.closest_rx = self.recv_2
+ # now wait for close to complete before sending
more
+ else:
+ self.sender.send(self._new_message())
+ self.n_sent += 1
+ elif self.closest_rx == self.recv_2:
+ if self.rx_count_1 != self.send_batch or self.rx_count_3:
+ self.bail("Error: non-closest client got message!")
+ else:
+ self.rx_count_2 += 1
+ if self.rx_count_2 == self.send_batch:
+ self._logger.log("RX 2 complete, closing")
+ self.recv_2.close()
+ self.closest_rx = self.recv_3
+ # now wait for close to complete before sending
more
+ else:
+ self.sender.send(self._new_message())
+ self.n_sent += 1
+ elif self.closest_rx == self.recv_3:
+ if (self.rx_count_1 != self.send_batch or self.rx_count_2
!= self.send_batch):
+ self.bail("Error: non-closest client got message!")
+ else:
+ self.rx_count_3 += 1
+ if self.rx_count_3 == self.send_batch:
+ self._logger.log("RX 3 complete, closing, Test
Done!")
+ self.recv_3.close()
+ self.closest_rx = None
+ self.bail()
+ else:
+ self.sender.send(self._new_message())
+ self.n_sent += 1
else:
- self.bail(None)
+ self.bail("Error: self.closest_rx no match!")
+ else:
+ self._logger.log("Delivery Not Accepted: %s" %
dlv.remote_state)
+ if dlv.remote_state == Delivery.RELEASED:
+ # This occurs when the loss-of-rx-link event has not been
+ # propagated to all routers. When that happens the message
+ # may be forwarded back to the router with the last known
+ # location of the dropped link.
+ # This is expected, just try again
+ sleep(0.25)
+ self.sender.send(self._new_message())
+ self.n_sent += 1
+ else:
+ self.bail("Error: unexpected delivery failure: %s"
+ % dlv.remote_state)
def on_link_closed(self, event):
- if event.receiver == self.recv_1_b or event.receiver == self.recv_1_a:
- if event.receiver == self.recv_1_a:
- self.recv_1_a_closed = True
- if event.receiver == self.recv_1_b:
- self.recv_1_b_closed = True
- if self.recv_1_a_closed and self.recv_1_b_closed and not
self.m_sent_2:
- if self.print_debug:
- print("self.recv_1_a_closed and self.recv_1_b_closed")
-
- self.m_sent_2 = True
- while self.n_sent_3 < self.one_third:
- msg = Message(body="Hello, closest.",
- address=self.dest)
- self.sender.send(msg)
- self.n_sent_3 += 1
- if self.print_debug:
- print("Second hundred sent")
-
- if event.receiver == self.recv_2_a or event.receiver == self.recv_2_b:
- if event.receiver == self.recv_2_a:
- self.recv_2_a_closed = True
- if event.receiver == self.recv_2_b:
- self.recv_2_b_closed = True
- if self.recv_2_a_closed and self.recv_2_b_closed:
- self.first_check = False
- if self.print_debug:
- print("self.recv_2_a_closed and self.recv_2_b_closed")
- self.addr_check()
+ self._logger.log("Link closed %s" % event.link.name)
+ if event.link == self.recv_1 and self.closest_rx == self.recv_2:
+ self._logger.log("Next send for RX 2")
+ sleep(2.0) # give time for the link loss to propagate
+ self.sender.send(self._new_message())
+ self.n_sent += 1
+
+ elif event.link == self.recv_2 and self.closest_rx == self.recv_3:
+ self._logger.log("Next send for RX 3")
+ sleep(2.0) # give time for the link loss to propagate
+ self.sender.send(self._new_message())
+ self.n_sent += 1
def addr_check(self):
# Send the message that will query the management code to discover
@@ -2447,6 +2448,7 @@ class ClosestTest (MessagingHandler):
# BUGALERT: We have to prepend the 'M0' to this address prefix
# because that's what the router does internally. Someday this
# may change.
+ self._logger.log("Query addresses...")
self.addr_check_sender.send(self.addr_checker.make_address_query("M0"
+ self.dest))
def run(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]