This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new ef3e34c  DISPATCH-2258: Cleaned up log messages, created senders after 
receivers are created
ef3e34c is described below

commit ef3e34cfa9b4a8ffbbd513a22bd5f70cac2fab89
Author: Ganesh Murthy <[email protected]>
AuthorDate: Tue Nov 16 10:59:24 2021 -0500

    DISPATCH-2258: Cleaned up log messages, created senders after receivers are 
created
---
 tests/system_tests_distribution.py | 92 +++++++++++++++++---------------------
 1 file changed, 40 insertions(+), 52 deletions(-)

diff --git a/tests/system_tests_distribution.py 
b/tests/system_tests_distribution.py
index e6126a7..5d63fbe 100644
--- a/tests/system_tests_distribution.py
+++ b/tests/system_tests_distribution.py
@@ -1635,8 +1635,8 @@ class DistributionTests (TestCase):
                                     self.A_addr,
                                     self.B_addr,
                                     self.C_route_container_addr,
-                                    self.waypoint_prefix_3 + '.waypoint'
-                                    )
+                                    self.waypoint_prefix_3 + '.waypoint',
+                                    debug=True)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1951,9 +1951,9 @@ class LinkAttachRoutingCheckOnly (MessagingHandler):
 
         self.debug = False
 
-    def debug_print(self, message) :
-        if self.debug :
-            print(message)
+    def debug_print(self, message):
+        if self.debug:
+            print(message, flush=True)
 
     def timeout(self):
         self.bail("Timeout Expired")
@@ -2889,9 +2889,9 @@ class RoutingTest (MessagingHandler):
         self.status = 'start up'
         self.container_id = container_id
 
-    def debug_print(self, message) :
-        if self.debug :
-            print(message)
+    def debug_print(self, message):
+        if self.debug:
+            print(message, flush=True)
 
     # If this happens, the test is hanging.
     def timeout(self):
@@ -3112,7 +3112,7 @@ class RoutingTest (MessagingHandler):
     # =====================================================
     def check_receiver_distribution(self, expected_receiver_counts) :
         self.debug_print("check_receiver_distribution expecting: %s" % 
str(expected_receiver_counts))
-        if self.debug :
+        if self.debug:
             self.print_receiver_distribution()
         for router in range(len(self.router_cnx_counts)) :
             cnx_dict = self.router_cnx_counts[router]
@@ -3377,21 +3377,20 @@ class WaypointTest (MessagingHandler):
         self.client_connection.close()
         self.timer.cancel()
 
-    def debug_print(self, message) :
-        if self.debug :
-            print(message)
+    def debug_print(self, message):
+        if self.debug:
+            print(message, flush=True)
 
     def send_from_client(self, sender, n_messages, sender_index):
         n_sent = 0
-        self.debug_print("send_from_client -------------------")
         while sender.credit > 0 and n_sent < n_messages:
             n_sent      += 1
             self.n_sent += 1
             msg = Message(body=n_sent)
-            self.debug_print("    send_from_client -- sender: %d n_sent: %d" % 
(sender_index, n_sent))
             sender.send(msg)
             # We send from a client
             self.n_transitions += 1
+        self.debug_print("    send_from_client -- sender: %d n_sent: %d" % 
(sender_index, n_sent))
 
     def send_from_waypoint(self):
         self.debug_print("send_from_waypoint ---------------------")
@@ -3603,9 +3602,9 @@ class SerialWaypointTest (MessagingHandler):
             cnx.close()
         self.timer.cancel()
 
-    def debug_print(self, message) :
-        if self.debug :
-            print(message)
+    def debug_print(self, message):
+        if self.debug:
+            print(message, flush=True)
 
     def send_from_client(self, sender, n_messages, sender_index):
         n_sent = 0
@@ -3615,7 +3614,7 @@ class SerialWaypointTest (MessagingHandler):
             n_sent             += 1
             self.n_sent        += 1
             self.n_transitions += 1
-            self.debug_print("send_from_client -- sender: %d n_sent: %d" % 
(sender_index, n_sent))
+        self.debug_print("send_from_client -- sender: %d n_sent: %d" % 
(sender_index, n_sent))
 
     def send_from_waypoint(self, waypoint):
         self.debug_print("send_from_waypoint ------------------------------")
@@ -3688,8 +3687,6 @@ class SerialWaypointTest (MessagingHandler):
                     self.debug_print("    sending %d" % sender['to_send'])
                     self.send_from_client(sender['sender'], sender['to_send'], 
index)
                     sender['n_sent'] = sender['to_send']  # n_sent = n_to_send
-                else :
-                    self.debug_print("    this sender is already finished.")
                 return
 
         for j in range(len(self.waypoints)) :
@@ -3714,7 +3711,7 @@ class SerialWaypointTest (MessagingHandler):
                 self.n_rcvd += 1
                 if self.n_rcvd >= self.n_expected_received and self.n_thru >= 
self.n_expected_received:
                     self.debug_print("DONE -- self.n_rcvd: %d   self.n_thru: 
%d" % (self.n_rcvd, self.n_thru))
-                    if self.debug :
+                    if self.debug:
                         self.report()
                     self.check_results_and_bail()
                     return
@@ -3815,8 +3812,8 @@ class ParallelWaypointTest (MessagingHandler):
                  client_host_1,
                  client_host_2,
                  route_container_host,
-                 destination
-                 ):
+                 destination,
+                 debug=False):
         super(ParallelWaypointTest, self). __init__()
         self.client_host_1        = client_host_1
         self.client_host_2        = client_host_2
@@ -3826,6 +3823,7 @@ class ParallelWaypointTest (MessagingHandler):
         self.error                = None
         self.messages_per_sender  = 100
         self.container_id         = container_id
+        self.timer = None
 
         self.route_container_connection = None
 
@@ -3884,7 +3882,7 @@ class ParallelWaypointTest (MessagingHandler):
         n_links_per_message = 4
         self.n_expected_transitions = len(self.senders) * 
self.messages_per_sender * n_links_per_message
 
-        self.debug = False
+        self.debug = debug
 
         self.test_name = test_name
 
@@ -3898,9 +3896,9 @@ class ParallelWaypointTest (MessagingHandler):
             cnx.close()
         self.timer.cancel()
 
-    def debug_print(self, message) :
-        if self.debug :
-            print(message)
+    def debug_print(self, message):
+        if self.debug:
+            print(message, flush=True)
 
     def send_from_client(self, sender, n_messages, sender_index):
         n_sent = 0
@@ -3910,11 +3908,9 @@ class ParallelWaypointTest (MessagingHandler):
             n_sent             += 1
             self.n_sent        += 1
             self.n_transitions += 1
-            self.debug_print("send_from_client -- sender: %d n_sent: %d" % 
(sender_index, n_sent))
+        self.debug_print("send_from_client -- sender: %d n_sent: %d" % 
(sender_index, n_sent))
 
     def send_from_waypoint(self, waypoint):
-        self.debug_print("send_from_waypoint ------------------------------")
-
         while waypoint['sender'].credit > 0 and len(waypoint['queue']) > 0:
             m = waypoint['queue'].pop()
             message_content_number = m.body
@@ -3933,26 +3929,16 @@ class ParallelWaypointTest (MessagingHandler):
         # containerId, and will at that time instantiate any associated 
autolinks.
         # We will get an 'on_link_opening' for each of them.
         self.route_container_connection = 
event.container.connect(self.route_container_host)
-
         for i in range(len(self.sender_connections)) :
             cnx = self.sender_connections[i]
-            sender   = self.senders[i]
             receiver = self.receivers[i]
-
-            sender['sender'] = event.container.create_sender(cnx,
-                                                             self.destination,
-                                                             name=link_name())
-            sender['to_send'] = self.messages_per_sender
-            sender['n_sent']  = 0
             receiver['receiver'] = event.container.create_receiver(cnx,
                                                                    
self.destination,
                                                                    
name=link_name())
             receiver['n_received'] = 0
 
     def on_link_opening(self, event):
-
         self.debug_print("on_link_opening -------------------------- ")
-
         if event.sender:
             self.debug_print("    sender: %s" % 
event.sender.remote_source.address)
             event.sender.source.address = event.sender.remote_source.address
@@ -3972,19 +3958,25 @@ class ParallelWaypointTest (MessagingHandler):
                 if self.n_waypoint_receivers < 2 :
                     self.waypoints[self.n_waypoint_receivers]['receiver'] = 
event.receiver
                     self.n_waypoint_receivers += 1
+                # Create the senders after the waypoint receiver links have 
been opened.
+                if self.n_waypoint_receivers == 2:
+                    for i in range(len(self.sender_connections)):
+                        cnx = self.sender_connections[i]
+                        sender = self.senders[i]
+                        sender['sender'] = event.container.create_sender(cnx,
+                                                                         
self.destination,
+                                                                         
name=link_name())
+                        sender['to_send'] = self.messages_per_sender
+                        sender['n_sent'] = 0
 
     def on_sendable(self, event):
-        self.debug_print("on_sendable ------------------------------")
         for index in range(len(self.senders)) :
             sender = self.senders[index]
             if event.sender == sender['sender'] :
-                self.debug_print("    client sender %d" % index)
                 if sender['n_sent'] < sender['to_send'] :
                     self.debug_print("    sending %d" % sender['to_send'])
                     self.send_from_client(sender['sender'], sender['to_send'], 
index)
                     sender['n_sent'] = sender['to_send']  # n_sent = n_to_send
-                else :
-                    self.debug_print("    this sender is already finished.")
                 return
 
         for j in range(len(self.waypoints)) :
@@ -3995,21 +3987,17 @@ class ParallelWaypointTest (MessagingHandler):
                 return
 
     def on_message(self, event):
-
-        self.debug_print("on_message --------------------------- ")
-
         # Is this one of our client receivers ?
         for i in range(len(self.receivers)) :
             receiver = self.receivers[i]
             if event.receiver == receiver['receiver'] :
                 receiver['n_received'] += 1
                 self.n_transitions     += 1
-                self.debug_print("    client receiver %d has %d messages." % 
(i, receiver['n_received']))
-                message_content_number = event.message.body
+                self.debug_print("    on_message client receiver %d has %d 
messages." % (i, receiver['n_received']))
                 self.n_rcvd += 1
                 if self.n_rcvd >= self.n_expected_received and self.n_thru >= 
self.n_expected_received:
-                    self.debug_print("DONE -- self.n_rcvd: %d   self.n_thru: 
%d" % (self.n_rcvd, self.n_thru))
-                    if self.debug :
+                    self.debug_print("on_message DONE -- self.n_rcvd: %d   
self.n_thru: %d" % (self.n_rcvd, self.n_thru))
+                    if self.debug:
                         self.report()
                     self.check_results_and_bail()
                     return
@@ -4022,7 +4010,7 @@ class ParallelWaypointTest (MessagingHandler):
                 waypoint['queue'].append(m)
                 waypoint['n_received'] += 1
                 self.n_transitions        += 1
-                self.debug_print("    message received at waypoint %d, queue 
depth is now %d" % (j, len(waypoint['queue'])))
+                self.debug_print("    on_message message received at waypoint 
%d, queue depth is now %d" % (j, len(waypoint['queue'])))
                 self.send_from_waypoint(waypoint)
 
     def check_results_and_bail(self) :

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to