This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 80698a3  DISPATCH-2065: Added timer to check for deleted connection
80698a3 is described below

commit 80698a39784493b180ae2564551ef9a27c829877
Author: Ganesh Murthy <[email protected]>
AuthorDate: Wed Nov 3 16:41:53 2021 -0400

    DISPATCH-2065: Added timer to check for deleted connection
---
 tests/system_tests_two_routers.py | 82 ++++++++++++++++++++++++++++++++-------
 1 file changed, 67 insertions(+), 15 deletions(-)

diff --git a/tests/system_tests_two_routers.py 
b/tests/system_tests_two_routers.py
index 8b19191..5c589dc 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -24,7 +24,7 @@ import logging
 from threading import Timer
 from subprocess import PIPE, STDOUT
 from proton import Message, Delivery, symbol, Condition
-from system_test import Logger, TestCase, Process, Qdrouterd, main_module, 
TIMEOUT, DIR, TestTimeout
+from system_test import Logger, TestCase, Process, Qdrouterd, main_module, 
TIMEOUT, DIR, TestTimeout, PollTimeout
 from system_test import AsyncTestReceiver
 from system_test import AsyncTestSender
 from system_test import get_inter_router_links
@@ -409,6 +409,18 @@ class DeleteConnectionWithReceiver(MessagingHandler):
         self.mgmt_sender = None
         self.success = False
         self.error = None
+        self.receiver_to_kill = None
+        self.timer = None
+        self.n_sent = 0
+        self.n_received = 0
+        self.mgmt_receiver_link_opened = False
+        self.mgmt_receiver_1_link_opened = False
+        self.mgmt_receiver_2_link_opened = False
+        self.receiver_to_kill_link_opened = False
+        self.query_timer = None
+        self.deleted_admin_status = "deleted"
+        self.num_attempts = 0
+        self.max_attempts = 2
 
     def on_start(self, event):
         self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
@@ -429,16 +441,29 @@ class DeleteConnectionWithReceiver(MessagingHandler):
 
     def timeout(self):
         self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, 
self.n_received)
-        self.mgmt_conn.close()
+        self.bail(self.error)
 
     def bail(self, error):
         self.error = error
         self.timer.cancel()
         self.mgmt_conn.close()
         self.conn_to_kill.close()
+        if self.query_timer:
+            self.query_timer.cancel()
 
     def on_link_opened(self, event):
         if event.receiver == self.mgmt_receiver:
+            self.mgmt_receiver_link_opened = True
+        elif event.receiver == self.mgmt_receiver_1:
+            self.mgmt_receiver_1_link_opened = True
+        elif event.receiver == self.mgmt_receiver_2:
+            self.mgmt_receiver_2_link_opened = True
+        elif event.receiver == self.receiver_to_kill:
+            self.receiver_to_kill_link_opened = True
+
+        # All the management receiver links have been opened, now send the 
first message.
+        if self.mgmt_receiver_link_opened and self.mgmt_receiver_1_link_opened 
and \
+                self.mgmt_receiver_2_link_opened and 
self.receiver_to_kill_link_opened:
             request = Message()
             request.address = "amqp:/_local/$management"
             request.properties = {
@@ -446,13 +471,24 @@ class DeleteConnectionWithReceiver(MessagingHandler):
                 'operation': 'QUERY'}
             request.reply_to = self.mgmt_receiver.remote_source.address
             self.mgmt_sender.send(request)
+            self.n_sent += 1
+
+    def poll_timeout(self):
+        request = Message()
+        request.address = "amqp:/_local/$management"
+        request.properties = {'type': 'org.apache.qpid.dispatch.connection',
+                              'operation': 'QUERY'}
+        request.reply_to = self.mgmt_receiver_2.remote_source.address
+        self.mgmt_sender.send(request)
+        self.n_sent += 1
 
     def on_message(self, event):
         if event.receiver == self.mgmt_receiver:
+            self.n_received += 1
             attribute_names = event.message.body['attributeNames']
-            property_index = attribute_names .index('properties')
-            identity_index = attribute_names .index('identity')
-
+            property_index = attribute_names.index('properties')
+            identity_index = attribute_names.index('identity')
+            conn_found = False
             for result in event.message.body['results']:
                 if result[property_index]:
                     properties = result[property_index]
@@ -467,28 +503,44 @@ class DeleteConnectionWithReceiver(MessagingHandler):
                                 'operation': 'UPDATE'
                             }
                             request.body = {
-                                'adminStatus': 'deleted'}
+                                'adminStatus': self.deleted_admin_status
+                            }
                             request.reply_to = 
self.mgmt_receiver_1.remote_source.address
                             self.mgmt_sender.send(request)
+                            conn_found = True
+                            self.n_sent += 1
+            if not conn_found:
+                self.bail("The connection we wanted to delete was not found")
         elif event.receiver == self.mgmt_receiver_1:
-            if event.message.properties['statusDescription'] == 'OK' and 
event.message.body['adminStatus'] == 'deleted':
-                request = Message()
-                request.address = "amqp:/_local/$management"
-                request.properties = {'type': 
'org.apache.qpid.dispatch.connection',
-                                      'operation': 'QUERY'}
-                request.reply_to = self.mgmt_receiver_2.remote_source.address
-                self.mgmt_sender.send(request)
+            self.n_received += 1
+            if event.message.properties['statusDescription'] == 'OK' and \
+                    event.message.body['adminStatus'] == 
self.deleted_admin_status:
+                # Wait for 3 sends for the connection to be gone completely.
+                self.num_attempts += 1
+                self.query_timer = event.reactor.schedule(3.0, 
PollTimeout(self))
+            else:
+                if event.message.properties['statusDescription'] != 'OK':
+                    error = "Expected statusDescription to be OK but instead 
got %s" % \
+                            event.message.properties['statusDescription']
+                if event.message.body['adminStatus'] != 
self.deleted_admin_status:
+                    error = "Expected adminStatus to be %s but instead got %s" 
% \
+                            (self.deleted_admin_status, 
event.message.properties['adminStatus'])
+                self.bail(error)
 
         elif event.receiver == self.mgmt_receiver_2:
+            self.n_received += 1
             attribute_names = event.message.body['attributeNames']
             property_index = attribute_names .index('properties')
-            identity_index = attribute_names .index('identity')
 
             for result in event.message.body['results']:
                 if result[property_index]:
                     properties = result[property_index]
                     if properties and properties.get('int_property'):
-                        self.bail("Connection not deleted")
+                        if self.num_attempts == self.max_attempts:
+                            self.bail("Connection not deleted")
+                        else:
+                            self.num_attempts += 1
+                            self.query_timer = event.reactor.schedule(3.0, 
PollTimeout(self))
             self.bail(None)
 
     def run(self):

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to