This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push:
new 80698a3 DISPATCH-2065: Added timer to check for deleted connection
80698a3 is described below
commit 80698a39784493b180ae2564551ef9a27c829877
Author: Ganesh Murthy <[email protected]>
AuthorDate: Wed Nov 3 16:41:53 2021 -0400
DISPATCH-2065: Added timer to check for deleted connection
---
tests/system_tests_two_routers.py | 82 ++++++++++++++++++++++++++++++++-------
1 file changed, 67 insertions(+), 15 deletions(-)
diff --git a/tests/system_tests_two_routers.py
b/tests/system_tests_two_routers.py
index 8b19191..5c589dc 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -24,7 +24,7 @@ import logging
from threading import Timer
from subprocess import PIPE, STDOUT
from proton import Message, Delivery, symbol, Condition
-from system_test import Logger, TestCase, Process, Qdrouterd, main_module,
TIMEOUT, DIR, TestTimeout
+from system_test import Logger, TestCase, Process, Qdrouterd, main_module,
TIMEOUT, DIR, TestTimeout, PollTimeout
from system_test import AsyncTestReceiver
from system_test import AsyncTestSender
from system_test import get_inter_router_links
@@ -409,6 +409,18 @@ class DeleteConnectionWithReceiver(MessagingHandler):
self.mgmt_sender = None
self.success = False
self.error = None
+ self.receiver_to_kill = None
+ self.timer = None
+ self.n_sent = 0
+ self.n_received = 0
+ self.mgmt_receiver_link_opened = False
+ self.mgmt_receiver_1_link_opened = False
+ self.mgmt_receiver_2_link_opened = False
+ self.receiver_to_kill_link_opened = False
+ self.query_timer = None
+ self.deleted_admin_status = "deleted"
+ self.num_attempts = 0
+ self.max_attempts = 2
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
@@ -429,16 +441,29 @@ class DeleteConnectionWithReceiver(MessagingHandler):
def timeout(self):
self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent,
self.n_received)
- self.mgmt_conn.close()
+ self.bail(self.error)
def bail(self, error):
self.error = error
self.timer.cancel()
self.mgmt_conn.close()
self.conn_to_kill.close()
+ if self.query_timer:
+ self.query_timer.cancel()
def on_link_opened(self, event):
if event.receiver == self.mgmt_receiver:
+ self.mgmt_receiver_link_opened = True
+ elif event.receiver == self.mgmt_receiver_1:
+ self.mgmt_receiver_1_link_opened = True
+ elif event.receiver == self.mgmt_receiver_2:
+ self.mgmt_receiver_2_link_opened = True
+ elif event.receiver == self.receiver_to_kill:
+ self.receiver_to_kill_link_opened = True
+
+ # All the management receiver links have been opened, now send the
first message.
+ if self.mgmt_receiver_link_opened and self.mgmt_receiver_1_link_opened
and \
+ self.mgmt_receiver_2_link_opened and
self.receiver_to_kill_link_opened:
request = Message()
request.address = "amqp:/_local/$management"
request.properties = {
@@ -446,13 +471,24 @@ class DeleteConnectionWithReceiver(MessagingHandler):
'operation': 'QUERY'}
request.reply_to = self.mgmt_receiver.remote_source.address
self.mgmt_sender.send(request)
+ self.n_sent += 1
+
+ def poll_timeout(self):
+ request = Message()
+ request.address = "amqp:/_local/$management"
+ request.properties = {'type': 'org.apache.qpid.dispatch.connection',
+ 'operation': 'QUERY'}
+ request.reply_to = self.mgmt_receiver_2.remote_source.address
+ self.mgmt_sender.send(request)
+ self.n_sent += 1
def on_message(self, event):
if event.receiver == self.mgmt_receiver:
+ self.n_received += 1
attribute_names = event.message.body['attributeNames']
- property_index = attribute_names .index('properties')
- identity_index = attribute_names .index('identity')
-
+ property_index = attribute_names.index('properties')
+ identity_index = attribute_names.index('identity')
+ conn_found = False
for result in event.message.body['results']:
if result[property_index]:
properties = result[property_index]
@@ -467,28 +503,44 @@ class DeleteConnectionWithReceiver(MessagingHandler):
'operation': 'UPDATE'
}
request.body = {
- 'adminStatus': 'deleted'}
+ 'adminStatus': self.deleted_admin_status
+ }
request.reply_to =
self.mgmt_receiver_1.remote_source.address
self.mgmt_sender.send(request)
+ conn_found = True
+ self.n_sent += 1
+ if not conn_found:
+ self.bail("The connection we wanted to delete was not found")
elif event.receiver == self.mgmt_receiver_1:
- if event.message.properties['statusDescription'] == 'OK' and
event.message.body['adminStatus'] == 'deleted':
- request = Message()
- request.address = "amqp:/_local/$management"
- request.properties = {'type':
'org.apache.qpid.dispatch.connection',
- 'operation': 'QUERY'}
- request.reply_to = self.mgmt_receiver_2.remote_source.address
- self.mgmt_sender.send(request)
+ self.n_received += 1
+ if event.message.properties['statusDescription'] == 'OK' and \
+ event.message.body['adminStatus'] ==
self.deleted_admin_status:
+ # Wait for 3 sends for the connection to be gone completely.
+ self.num_attempts += 1
+ self.query_timer = event.reactor.schedule(3.0,
PollTimeout(self))
+ else:
+ if event.message.properties['statusDescription'] != 'OK':
+ error = "Expected statusDescription to be OK but instead
got %s" % \
+ event.message.properties['statusDescription']
+ if event.message.body['adminStatus'] !=
self.deleted_admin_status:
+ error = "Expected adminStatus to be %s but instead got %s"
% \
+ (self.deleted_admin_status,
event.message.properties['adminStatus'])
+ self.bail(error)
elif event.receiver == self.mgmt_receiver_2:
+ self.n_received += 1
attribute_names = event.message.body['attributeNames']
property_index = attribute_names .index('properties')
- identity_index = attribute_names .index('identity')
for result in event.message.body['results']:
if result[property_index]:
properties = result[property_index]
if properties and properties.get('int_property'):
- self.bail("Connection not deleted")
+ if self.num_attempts == self.max_attempts:
+ self.bail("Connection not deleted")
+ else:
+ self.num_attempts += 1
+ self.query_timer = event.reactor.schedule(3.0,
PollTimeout(self))
self.bail(None)
def run(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]