This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push:
new ef3e34c DISPATCH-2258: Cleaned up log messages, created senders after
receivers are created
ef3e34c is described below
commit ef3e34cfa9b4a8ffbbd513a22bd5f70cac2fab89
Author: Ganesh Murthy <[email protected]>
AuthorDate: Tue Nov 16 10:59:24 2021 -0500
DISPATCH-2258: Cleaned up log messages, created senders after receivers are
created
---
tests/system_tests_distribution.py | 92 +++++++++++++++++---------------------
1 file changed, 40 insertions(+), 52 deletions(-)
diff --git a/tests/system_tests_distribution.py
b/tests/system_tests_distribution.py
index e6126a7..5d63fbe 100644
--- a/tests/system_tests_distribution.py
+++ b/tests/system_tests_distribution.py
@@ -1635,8 +1635,8 @@ class DistributionTests (TestCase):
self.A_addr,
self.B_addr,
self.C_route_container_addr,
- self.waypoint_prefix_3 + '.waypoint'
- )
+ self.waypoint_prefix_3 + '.waypoint',
+ debug=True)
test.run()
self.assertIsNone(test.error)
@@ -1951,9 +1951,9 @@ class LinkAttachRoutingCheckOnly (MessagingHandler):
self.debug = False
- def debug_print(self, message) :
- if self.debug :
- print(message)
+ def debug_print(self, message):
+ if self.debug:
+ print(message, flush=True)
def timeout(self):
self.bail("Timeout Expired")
@@ -2889,9 +2889,9 @@ class RoutingTest (MessagingHandler):
self.status = 'start up'
self.container_id = container_id
- def debug_print(self, message) :
- if self.debug :
- print(message)
+ def debug_print(self, message):
+ if self.debug:
+ print(message, flush=True)
# If this happens, the test is hanging.
def timeout(self):
@@ -3112,7 +3112,7 @@ class RoutingTest (MessagingHandler):
# =====================================================
def check_receiver_distribution(self, expected_receiver_counts) :
self.debug_print("check_receiver_distribution expecting: %s" %
str(expected_receiver_counts))
- if self.debug :
+ if self.debug:
self.print_receiver_distribution()
for router in range(len(self.router_cnx_counts)) :
cnx_dict = self.router_cnx_counts[router]
@@ -3377,21 +3377,20 @@ class WaypointTest (MessagingHandler):
self.client_connection.close()
self.timer.cancel()
- def debug_print(self, message) :
- if self.debug :
- print(message)
+ def debug_print(self, message):
+ if self.debug:
+ print(message, flush=True)
def send_from_client(self, sender, n_messages, sender_index):
n_sent = 0
- self.debug_print("send_from_client -------------------")
while sender.credit > 0 and n_sent < n_messages:
n_sent += 1
self.n_sent += 1
msg = Message(body=n_sent)
- self.debug_print(" send_from_client -- sender: %d n_sent: %d" %
(sender_index, n_sent))
sender.send(msg)
# We send from a client
self.n_transitions += 1
+ self.debug_print(" send_from_client -- sender: %d n_sent: %d" %
(sender_index, n_sent))
def send_from_waypoint(self):
self.debug_print("send_from_waypoint ---------------------")
@@ -3603,9 +3602,9 @@ class SerialWaypointTest (MessagingHandler):
cnx.close()
self.timer.cancel()
- def debug_print(self, message) :
- if self.debug :
- print(message)
+ def debug_print(self, message):
+ if self.debug:
+ print(message, flush=True)
def send_from_client(self, sender, n_messages, sender_index):
n_sent = 0
@@ -3615,7 +3614,7 @@ class SerialWaypointTest (MessagingHandler):
n_sent += 1
self.n_sent += 1
self.n_transitions += 1
- self.debug_print("send_from_client -- sender: %d n_sent: %d" %
(sender_index, n_sent))
+ self.debug_print("send_from_client -- sender: %d n_sent: %d" %
(sender_index, n_sent))
def send_from_waypoint(self, waypoint):
self.debug_print("send_from_waypoint ------------------------------")
@@ -3688,8 +3687,6 @@ class SerialWaypointTest (MessagingHandler):
self.debug_print(" sending %d" % sender['to_send'])
self.send_from_client(sender['sender'], sender['to_send'],
index)
sender['n_sent'] = sender['to_send'] # n_sent = n_to_send
- else :
- self.debug_print(" this sender is already finished.")
return
for j in range(len(self.waypoints)) :
@@ -3714,7 +3711,7 @@ class SerialWaypointTest (MessagingHandler):
self.n_rcvd += 1
if self.n_rcvd >= self.n_expected_received and self.n_thru >=
self.n_expected_received:
self.debug_print("DONE -- self.n_rcvd: %d self.n_thru:
%d" % (self.n_rcvd, self.n_thru))
- if self.debug :
+ if self.debug:
self.report()
self.check_results_and_bail()
return
@@ -3815,8 +3812,8 @@ class ParallelWaypointTest (MessagingHandler):
client_host_1,
client_host_2,
route_container_host,
- destination
- ):
+ destination,
+ debug=False):
super(ParallelWaypointTest, self). __init__()
self.client_host_1 = client_host_1
self.client_host_2 = client_host_2
@@ -3826,6 +3823,7 @@ class ParallelWaypointTest (MessagingHandler):
self.error = None
self.messages_per_sender = 100
self.container_id = container_id
+ self.timer = None
self.route_container_connection = None
@@ -3884,7 +3882,7 @@ class ParallelWaypointTest (MessagingHandler):
n_links_per_message = 4
self.n_expected_transitions = len(self.senders) *
self.messages_per_sender * n_links_per_message
- self.debug = False
+ self.debug = debug
self.test_name = test_name
@@ -3898,9 +3896,9 @@ class ParallelWaypointTest (MessagingHandler):
cnx.close()
self.timer.cancel()
- def debug_print(self, message) :
- if self.debug :
- print(message)
+ def debug_print(self, message):
+ if self.debug:
+ print(message, flush=True)
def send_from_client(self, sender, n_messages, sender_index):
n_sent = 0
@@ -3910,11 +3908,9 @@ class ParallelWaypointTest (MessagingHandler):
n_sent += 1
self.n_sent += 1
self.n_transitions += 1
- self.debug_print("send_from_client -- sender: %d n_sent: %d" %
(sender_index, n_sent))
+ self.debug_print("send_from_client -- sender: %d n_sent: %d" %
(sender_index, n_sent))
def send_from_waypoint(self, waypoint):
- self.debug_print("send_from_waypoint ------------------------------")
-
while waypoint['sender'].credit > 0 and len(waypoint['queue']) > 0:
m = waypoint['queue'].pop()
message_content_number = m.body
@@ -3933,26 +3929,16 @@ class ParallelWaypointTest (MessagingHandler):
# containerId, and will at that time instantiate any associated
autolinks.
# We will get an 'on_link_opening' for each of them.
self.route_container_connection =
event.container.connect(self.route_container_host)
-
for i in range(len(self.sender_connections)) :
cnx = self.sender_connections[i]
- sender = self.senders[i]
receiver = self.receivers[i]
-
- sender['sender'] = event.container.create_sender(cnx,
- self.destination,
- name=link_name())
- sender['to_send'] = self.messages_per_sender
- sender['n_sent'] = 0
receiver['receiver'] = event.container.create_receiver(cnx,
self.destination,
name=link_name())
receiver['n_received'] = 0
def on_link_opening(self, event):
-
self.debug_print("on_link_opening -------------------------- ")
-
if event.sender:
self.debug_print(" sender: %s" %
event.sender.remote_source.address)
event.sender.source.address = event.sender.remote_source.address
@@ -3972,19 +3958,25 @@ class ParallelWaypointTest (MessagingHandler):
if self.n_waypoint_receivers < 2 :
self.waypoints[self.n_waypoint_receivers]['receiver'] =
event.receiver
self.n_waypoint_receivers += 1
+ # Create the senders after the waypoint receiver links have
been opened.
+ if self.n_waypoint_receivers == 2:
+ for i in range(len(self.sender_connections)):
+ cnx = self.sender_connections[i]
+ sender = self.senders[i]
+ sender['sender'] = event.container.create_sender(cnx,
+
self.destination,
+
name=link_name())
+ sender['to_send'] = self.messages_per_sender
+ sender['n_sent'] = 0
def on_sendable(self, event):
- self.debug_print("on_sendable ------------------------------")
for index in range(len(self.senders)) :
sender = self.senders[index]
if event.sender == sender['sender'] :
- self.debug_print(" client sender %d" % index)
if sender['n_sent'] < sender['to_send'] :
self.debug_print(" sending %d" % sender['to_send'])
self.send_from_client(sender['sender'], sender['to_send'],
index)
sender['n_sent'] = sender['to_send'] # n_sent = n_to_send
- else :
- self.debug_print(" this sender is already finished.")
return
for j in range(len(self.waypoints)) :
@@ -3995,21 +3987,17 @@ class ParallelWaypointTest (MessagingHandler):
return
def on_message(self, event):
-
- self.debug_print("on_message --------------------------- ")
-
# Is this one of our client receivers ?
for i in range(len(self.receivers)) :
receiver = self.receivers[i]
if event.receiver == receiver['receiver'] :
receiver['n_received'] += 1
self.n_transitions += 1
- self.debug_print(" client receiver %d has %d messages." %
(i, receiver['n_received']))
- message_content_number = event.message.body
+ self.debug_print(" on_message client receiver %d has %d
messages." % (i, receiver['n_received']))
self.n_rcvd += 1
if self.n_rcvd >= self.n_expected_received and self.n_thru >=
self.n_expected_received:
- self.debug_print("DONE -- self.n_rcvd: %d self.n_thru:
%d" % (self.n_rcvd, self.n_thru))
- if self.debug :
+ self.debug_print("on_message DONE -- self.n_rcvd: %d
self.n_thru: %d" % (self.n_rcvd, self.n_thru))
+ if self.debug:
self.report()
self.check_results_and_bail()
return
@@ -4022,7 +4010,7 @@ class ParallelWaypointTest (MessagingHandler):
waypoint['queue'].append(m)
waypoint['n_received'] += 1
self.n_transitions += 1
- self.debug_print(" message received at waypoint %d, queue
depth is now %d" % (j, len(waypoint['queue'])))
+ self.debug_print(" on_message message received at waypoint
%d, queue depth is now %d" % (j, len(waypoint['queue'])))
self.send_from_waypoint(waypoint)
def check_results_and_bail(self) :
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]