This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c8829b  DISPATCH-1772: Add logging to fallback dest self test
8c8829b is described below

commit 8c8829b88386e590e2fe14278fa2f2ece8e2a44c
Author: Chuck Rolke <c...@apache.org>
AuthorDate: Wed Jul 14 15:01:43 2021 -0400

    DISPATCH-1772: Add logging to fallback dest self test
    
     * This patch instruments the SwitchoverTest where most of the test
       failures show up
    
     * The on_message handler is broken up to explicitly handle receiving
       from primary and fallback receivers in phases 0 and 1
    
     * Add sender drain handling
    
    This closes #1298
---
 tests/system_tests_fallback_dest.py | 178 ++++++++++++++++++++++++------------
 1 file changed, 118 insertions(+), 60 deletions(-)

diff --git a/tests/system_tests_fallback_dest.py 
b/tests/system_tests_fallback_dest.py
index db51172..85e6c89 100644
--- a/tests/system_tests_fallback_dest.py
+++ b/tests/system_tests_fallback_dest.py
@@ -20,6 +20,7 @@
 from proton import Message, symbol
 from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, TestTimeout
 from system_test import unittest
+from system_test import Logger
 from proton.handlers import MessagingHandler
 from proton.reactor import Container
 
@@ -258,113 +259,113 @@ class RouterTest(TestCase):
         self.assertIsNone(test.error)
 
     def test_25_switchover_same_edge(self):
-        test = SwitchoverTest(self.ROUTER_EA1,
-                              self.ROUTER_EA1,
-                              self.ROUTER_EA1,
+        test = SwitchoverTest([self.ROUTER_EA1, "EA1"],
+                              [self.ROUTER_EA1, "EA1"],
+                              [self.ROUTER_EA1, "EA1"],
                               'dest.25')
         test.run()
         self.assertIsNone(test.error)
 
     def test_26_switchover_same_interior(self):
-        test = SwitchoverTest(self.ROUTER_INTA,
-                              self.ROUTER_INTA,
-                              self.ROUTER_INTA,
+        test = SwitchoverTest([self.ROUTER_INTA, "INTA"],
+                              [self.ROUTER_INTA, "INTA"],
+                              [self.ROUTER_INTA, "INTA"],
                               'dest.26')
         test.run()
         self.assertIsNone(test.error)
 
     def test_27_switchover_local_edge_alt_remote_interior(self):
-        test = SwitchoverTest(self.ROUTER_EA1,
-                              self.ROUTER_INTA,
-                              self.ROUTER_EA1,
+        test = SwitchoverTest([self.ROUTER_EA1, "EA1"],
+                              [self.ROUTER_INTA, "INTA"],
+                              [self.ROUTER_EA1, "EA1"],
                               'dest.27')
         test.run()
         self.assertIsNone(test.error)
 
     def test_28_switchover_local_edge_alt_remote_edge(self):
-        test = SwitchoverTest(self.ROUTER_EA1,
-                              self.ROUTER_EB1,
-                              self.ROUTER_EA1,
+        test = SwitchoverTest([self.ROUTER_EA1, "EA1"],
+                              [self.ROUTER_EB1, "EB1"],
+                              [self.ROUTER_EA1, "EA1"],
                               'dest.28')
         test.run()
         self.assertIsNone(test.error)
 
     def test_29_switchover_local_edge_pri_remote_interior(self):
-        test = SwitchoverTest(self.ROUTER_EA1,
-                              self.ROUTER_EA1,
-                              self.ROUTER_INTA,
+        test = SwitchoverTest([self.ROUTER_EA1, "EA1"],
+                              [self.ROUTER_EA1, "EA1"],
+                              [self.ROUTER_INTA, "INTA"],
                               'dest.29')
         test.run()
         self.assertIsNone(test.error)
 
     def test_30_switchover_local_interior_pri_remote_edge(self):
-        test = SwitchoverTest(self.ROUTER_EA1,
-                              self.ROUTER_EA1,
-                              self.ROUTER_EB1,
+        test = SwitchoverTest([self.ROUTER_EA1, "EA1"],
+                              [self.ROUTER_EA1, "EA1"],
+                              [self.ROUTER_EB1, "EB1"],
                               'dest.30')
         test.run()
         self.assertIsNone(test.error)
 
     def test_31_switchover_local_interior_alt_remote_interior(self):
-        test = SwitchoverTest(self.ROUTER_INTB,
-                              self.ROUTER_INTA,
-                              self.ROUTER_INTB,
+        test = SwitchoverTest([self.ROUTER_INTB, "INTB"],
+                              [self.ROUTER_INTA, "INTA"],
+                              [self.ROUTER_INTB, "INTB"],
                               'dest.31')
         test.run()
         self.assertIsNone(test.error)
 
     def test_32_switchover_local_interior_alt_remote_edge(self):
-        test = SwitchoverTest(self.ROUTER_INTB,
-                              self.ROUTER_EA2,
-                              self.ROUTER_INTB,
+        test = SwitchoverTest([self.ROUTER_INTB, "INTB"],
+                              [self.ROUTER_EA2, "EA2"],
+                              [self.ROUTER_INTB, "INTB"],
                               'dest.32')
         test.run()
         self.assertIsNone(test.error)
 
     def test_33_switchover_local_interior_pri_remote_interior(self):
-        test = SwitchoverTest(self.ROUTER_INTB,
-                              self.ROUTER_INTB,
-                              self.ROUTER_INTA,
+        test = SwitchoverTest([self.ROUTER_INTB, "INTB"],
+                              [self.ROUTER_INTB, "INTB"],
+                              [self.ROUTER_INTA, "INTA"],
                               'dest.33')
         test.run()
         self.assertIsNone(test.error)
 
     def test_34_switchover_local_interior_pri_remote_edge(self):
-        test = SwitchoverTest(self.ROUTER_INTB,
-                              self.ROUTER_INTB,
-                              self.ROUTER_EB1,
+        test = SwitchoverTest([self.ROUTER_INTB, "INTB"],
+                              [self.ROUTER_INTB, "INTB"],
+                              [self.ROUTER_EB1, "EB1"],
                               'dest.34')
         test.run()
         self.assertIsNone(test.error)
 
     def test_35_switchover_mix_1(self):
-        test = SwitchoverTest(self.ROUTER_INTA,
-                              self.ROUTER_INTB,
-                              self.ROUTER_EA1,
+        test = SwitchoverTest([self.ROUTER_INTA, "INTA"],
+                              [self.ROUTER_INTB, "INTB"],
+                              [self.ROUTER_EA1, "EA1"],
                               'dest.35')
         test.run()
         self.assertIsNone(test.error)
 
     def test_36_switchover_mix_2(self):
-        test = SwitchoverTest(self.ROUTER_EA1,
-                              self.ROUTER_INTB,
-                              self.ROUTER_INTA,
+        test = SwitchoverTest([self.ROUTER_EA1, "EA1"],
+                              [self.ROUTER_INTB, "INTB"],
+                              [self.ROUTER_INTA, "INTA"],
                               'dest.36')
         test.run()
         self.assertIsNone(test.error)
 
     def test_37_switchover_mix_3(self):
-        test = SwitchoverTest(self.ROUTER_EA1,
-                              self.ROUTER_INTB,
-                              self.ROUTER_EB1,
+        test = SwitchoverTest([self.ROUTER_EA1, "EA1"],
+                              [self.ROUTER_INTB, "INTB"],
+                              [self.ROUTER_EB1, "EB1"],
                               'dest.37')
         test.run()
         self.assertIsNone(test.error)
 
     def test_38_switchover_mix_4(self):
-        test = SwitchoverTest(self.ROUTER_EA1,
-                              self.ROUTER_EA2,
-                              self.ROUTER_EB1,
+        test = SwitchoverTest([self.ROUTER_EA1, "EA1"],
+                              [self.ROUTER_EA2, "EA2"],
+                              [self.ROUTER_EB1, "EB1"],
                               'dest.38')
         test.run()
         self.assertIsNone(test.error)
@@ -567,9 +568,12 @@ class ReceiverFirstTest(MessagingHandler):
 class SwitchoverTest(MessagingHandler):
     def __init__(self, sender_host, primary_host, fallback_host, addr):
         super(SwitchoverTest, self).__init__()
-        self.sender_host    = sender_host
-        self.primary_host   = primary_host
-        self.fallback_host  = fallback_host
+        self.sender_host    = sender_host[0]
+        self.primary_host   = primary_host[0]
+        self.fallback_host  = fallback_host[0]
+        self.sender_name    = sender_host[1]
+        self.primary_name   = primary_host[1]
+        self.fallback_name  = fallback_host[1]
         self.addr           = addr
         self.count          = 300
 
@@ -586,6 +590,14 @@ class SwitchoverTest(MessagingHandler):
         self.tx_seq         = 0
         self.local_rel      = 0
 
+        self.log_prefix     = "FALLBACK_TEST %s" % self.addr
+        self.logger = Logger("SwitchoverTest_%s" % addr, 
print_to_console=False)
+        # Prepend a convenience SERVER line for scraper tool.
+        # Then the logs from this test can be merged with the router logs in 
scraper.
+        self.logger.log("SERVER (info) Container Name: %s" % self.addr)
+        self.logger.log("%s SwitchoverTest sender:%s primary:%s fallback:%s" %
+                        (self.log_prefix, self.sender_name, self.primary_name, 
self.fallback_name))
+
     def timeout(self):
         self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_rel=%d, phase=%d, 
local_rel=%d" % \
                      (self.n_tx, self.n_rx, self.n_rel, self.phase, 
self.local_rel)
@@ -602,65 +614,111 @@ class SwitchoverTest(MessagingHandler):
 
     def on_start(self, event):
         self.timer              = event.reactor.schedule(TIMEOUT, 
TestTimeout(self))
+        self.logger.log("%s Opening sender connection to %s" % 
(self.log_prefix, self.sender_name))
         self.sender_conn        = event.container.connect(self.sender_host)
+        self.logger.log("%s Opening primary receiver connection to %s" % 
(self.log_prefix, self.primary_name))
         self.primary_conn       = event.container.connect(self.primary_host)
+        self.logger.log("%s Opening fallback receiver connection to %s" % 
(self.log_prefix, self.fallback_name))
         self.fallback_conn      = event.container.connect(self.fallback_host)
+        self.logger.log("%s Opening primary receiver to %s" % 
(self.log_prefix, self.primary_name))
         self.primary_receiver   = 
event.container.create_receiver(self.primary_conn, self.addr, name=(self.addr + 
"_primary_receiver"))
+        self.logger.log("%s Opening fallback receiver to %s" % 
(self.log_prefix, self.fallback_name))
         self.fallback_receiver  = 
event.container.create_receiver(self.fallback_conn, self.addr, name=(self.addr 
+ "fallback_receiver"))
         
self.fallback_receiver.source.capabilities.put_object(symbol("qd.fallback"))
 
     def on_link_opened(self, event):
         receiver_event = False
         if event.receiver == self.primary_receiver:
+            self.logger.log("%s Primary receiver opened" % self.log_prefix)
             self.primary_open = True
             receiver_event = True
         if event.receiver == self.fallback_receiver:
+            self.logger.log("%s Fallback receiver opened" % self.log_prefix)
             self.fallback_open = True
             receiver_event = True
         if receiver_event and self.primary_open and self.fallback_open:
+            self.logger.log("%s Opening sender to %s" % (self.log_prefix, 
self.sender_name))
             self.sender = event.container.create_sender(self.sender_conn, 
self.addr, name=(self.addr + "_sender"))
 
     def on_link_closed(self, event):
         if event.receiver == self.primary_receiver:
+            self.logger.log("%s Primary receiver closed. Start phase 1 send" % 
self.log_prefix)
             self.n_rx = 0
             self.n_tx = 0
             self.send()
 
     def send(self):
-        while self.sender.credit > 0 and self.n_tx < self.count:
-            self.sender.send(Message("Msg %s %d %d" % (self.addr, self.tx_seq, 
self.n_tx)))
+        e_credit = self.sender.credit
+        e_n_tx = self.n_tx
+        e_tx_seq = self.tx_seq
+        last_message = Message("None")
+        while self.sender.credit > 0 and self.n_tx < self.count and not 
self.sender.drain_mode:
+            last_message = Message("Msg %s %d %d" % (self.addr, self.tx_seq, 
self.n_tx))
+            self.sender.send(last_message)
             self.n_tx += 1
             self.tx_seq += 1
+        if self.sender.drain_mode:
+            n_drained = self.sender.drained()
+            self.logger.log("%s sender.drained() drained %d credits" % 
(self.log_prefix, n_drained))
+        self.logger.log("%s send() exit: last sent '%s' phase=%d, 
credit=%3d->%3d, n_tx=%4d->%4d, tx_seq=%4d->%4d, n_rel=%4d" %
+                        (self.log_prefix, last_message.body, self.phase, 
e_credit, self.sender.credit,
+                         e_n_tx, self.n_tx, e_tx_seq, self.tx_seq, self.n_rel))
 
     def on_sendable(self, event):
         if event.sender == self.sender:
             self.send()
+        else:
+            self.fail("%s on_sendable event not from the only sender")
 
     def on_message(self, event):
-        if not (self.phase == 0 and event.receiver == self.fallback_receiver):
-            # Phase 0 message over primary receiver. Phase 1 can come in only 
on primary.
-            self.n_rx += 1
-            if self.n_rx == self.count:
-                if self.phase == 0:
+        if event.receiver == self.primary_receiver:
+            if self.phase == 0:
+                self.n_rx += 1
+                self.logger.log("%s Received phase 0 message '%s', n_rx=%d" %
+                                (self.log_prefix, event.message.body, 
self.n_rx))
+                if self.n_rx == self.count:
+                    self.logger.log("%s Triggering fallback by closing primary 
receiver on %s. Test phase 0->1." %
+                                    (self.log_prefix, self.primary_name))
                     self.phase = 1
                     self.primary_receiver.close()
-                else:
+            else:
+                # Phase 1 messages are unexpected on primary receiver
+                self.logger.log("%s Phase %d message received on primary: 
'%s'" % (self.log_prefix, self.phase, event.message.body))
+                self.fail("Receive phase1 message on primary receiver")
+        elif event.receiver == self.fallback_receiver:
+            if self.phase == 0:
+                # Phase 0 message over fallback receiver. This may happen 
because
+                # primary receiver is on a distant router and the fallback 
receiver is local.
+                # Release the message to keep trying until the primary 
receiver kicks in.
+                self.release(event.delivery)
+                self.n_rel += 1
+                self.n_tx -= 1
+                self.local_rel += 1
+                self.logger.log("%s Released phase 0 over fallback: msg:'%s', 
n_rx=%d, n_tx=%d, n_rel=%d, local_rel=%d" %
+                                (self.log_prefix, event.message.body, 
self.n_rx, self.n_tx, self.n_rel, self.local_rel))
+            else:
+                self.n_rx += 1
+                self.logger.log("%s Received phase 1 over fallback: msg:'%s', 
n_rx=%d" %
+                                (self.log_prefix, event.message.body, 
self.n_rx))
+                if self.n_rx == self.count:
+                    self.logger.log("%s Success" % self.log_prefix)
                     self.fail(None)
         else:
-            # Phase 0 message over fallback receiver. This may happen because
-            # primary receiver is on a distant router and the fallback 
receiver is local.
-            # Release the message to keep trying until the primary receiver 
kicks in.
-            self.release(event.delivery)
-            self.n_rel += 1
-            self.n_tx -= 1
-            self.local_rel += 1
+            self.fail("%s message received on unidentified receiver" % 
self.addr)
 
     def on_released(self, event):
+        # event type pn_delivery for sender
         self.n_rel += 1
         self.n_tx  -= 1
+        self.logger.log("%s on_released: sender delivery was released. 
Adjusted counts: n_rel=%d, n_tx=%d" %
+                        (self.log_prefix, self.n_rel, self.n_tx))
+        if event.sender is None:
+            self.fail("on_released event not related to sender")
 
     def run(self):
         Container(self).run()
+        if self.error is not None:
+            self.logger.dump()
 
 
 class SenderFirstAutoLinkTest(MessagingHandler):

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to