This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push:
new b7918db DISPATCH-2329: Explicity check for released counts instead of
delivery counts of deliveries that are being released
b7918db is described below
commit b7918db600decd9b610f8401ff306261674cfdb8
Author: Ganesh Murthy <[email protected]>
AuthorDate: Mon Feb 14 11:56:42 2022 -0500
DISPATCH-2329: Explicity check for released counts instead of delivery
counts of deliveries that are being released
---
tests/system_tests_delivery_counts.py | 50 ++++++++++++++++++++++++++---------
1 file changed, 38 insertions(+), 12 deletions(-)
diff --git a/tests/system_tests_delivery_counts.py
b/tests/system_tests_delivery_counts.py
index 81917af..41be214 100644
--- a/tests/system_tests_delivery_counts.py
+++ b/tests/system_tests_delivery_counts.py
@@ -980,12 +980,18 @@ class OneRouterLinkCountersTest(TestCase):
if rx_limit is None else rx_limit
self.sent = 0
+ self.received = 0
self.timer = 0
self.poll_timer = None
self.conn = None
self.sender_stats = None
self.receiver_stats = None
self.large_message = large_message
+ self.reactor = None
+ self.sender = None
+ self.receiver = None
+ self.max_attempts = 10
+ self.num_attempts = 0
def timeout(self):
self._cleanup()
@@ -1001,24 +1007,40 @@ class OneRouterLinkCountersTest(TestCase):
self.timer.cancel()
self.timer = None
+ def _get_sender_receiver_stats(self):
+ self.receiver_stats = get_link_info("Rx_Test01", self.router_addr)
+ self.sender_stats = get_link_info("Tx_Test01", self.router_addr)
+
def poll_timeout(self):
"""
- Periodically check the deliveryCount on the receiver. Once it
- reaches rx_limit the test is complete: gather link statistics
- before closing the clients
+ Periodically check the deliveryCount or the releasedCount on the
receiver and sender.
"""
- li = get_link_info("Rx_Test01", self.router_addr)
- if li and li['deliveryCount'] == self.rx_limit:
- self.receiver_stats = li
- self.sender_stats = get_link_info("Tx_Test01",
self.router_addr)
+ restart_poll_timer = True
+ self._get_sender_receiver_stats()
+ if self.receiver_stats and self.outcome == Delivery.RELEASED and \
+ self.receiver_stats['releasedCount'] == self.rx_limit:
+ if self.sender_stats and self.sender_stats['releasedCount'] ==
self.rx_limit:
+ # We do not want to check just the deliveryCount here. The
deliveryCount gets
+ # updated much earlier than the releasedCount. We will
check the releasedCount instead.
+ # Check the releasedCount on the sender and the receiver.
This is because it takes time
+ # to propagate the outcome back to the sender the sender's
outcomes would lag behind the receivers
+ restart_poll_timer = False
+ self._cleanup()
+ elif self.receiver_stats and self.receiver_stats['deliveryCount']
== self.rx_limit:
+ restart_poll_timer = False
self._cleanup()
- else:
- self.poll_timer = self.reactor.schedule(0.5, PollTimeout(self))
+
+ if restart_poll_timer:
+ self.num_attempts += 1
+ if self.num_attempts == self.max_attempts:
+ # There is something wrong, fail the test.
+ self.timeout()
+ else:
+ self.poll_timer = self.reactor.schedule(0.5,
PollTimeout(self))
def on_start(self, event):
self.reactor = event.reactor
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
- self.poll_timer = event.reactor.schedule(0.5, PollTimeout(self))
self.conn = event.container.connect(self.router_addr)
self.receiver = event.container.create_receiver(self.conn,
source="Test01",
@@ -1029,7 +1051,7 @@ class OneRouterLinkCountersTest(TestCase):
name="Tx_Test01")
def on_sendable(self, event):
- if self.sent < self.count:
+ while self.sent < self.count:
if self.large_message:
dlv = self.sender.send(Message(body=LARGE_PAYLOAD))
else:
@@ -1039,10 +1061,14 @@ class OneRouterLinkCountersTest(TestCase):
self.sent += 1
def on_message(self, event):
+ self.received += 1
if self.outcome:
event.delivery.update(self.outcome)
event.delivery.settle()
- # otherwise just drop it
+
+ # Start up a poll timer once all the deliveries have been
sent/received.
+ if self.received == self.rx_limit and self.sent == self.count:
+ self.poll_timer = event.reactor.schedule(0.5,
PollTimeout(self))
def run(self):
Container(self).run()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]