Repository: qpid-interop-test Updated Branches: refs/heads/master f5d177d05 -> 5dc10051e
QPIDIT-40: Improvement to terminate and kill code based on testing, also some pylint-suggested tidy-ups Project: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/commit/5dc10051 Tree: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/tree/5dc10051 Diff: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/diff/5dc10051 Branch: refs/heads/master Commit: 5dc10051e27922896c73deb9fdf05b18e4315c87 Parents: f5d177d Author: Kim van der Riet <[email protected]> Authored: Fri Sep 30 16:19:30 2016 -0400 Committer: Kim van der Riet <[email protected]> Committed: Fri Sep 30 16:19:30 2016 -0400 ---------------------------------------------------------------------- src/python/qpid-interop-test/shims.py | 60 ++++++++++++++------ .../types/simple_type_tests.py | 39 ++++++------- 2 files changed, 62 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/5dc10051/src/python/qpid-interop-test/shims.py ---------------------------------------------------------------------- diff --git a/src/python/qpid-interop-test/shims.py b/src/python/qpid-interop-test/shims.py index a2a48dd..1110d47 100644 --- a/src/python/qpid-interop-test/shims.py +++ b/src/python/qpid-interop-test/shims.py @@ -23,6 +23,7 @@ Module containing worker thread classes and shims from json import loads from os import getenv, path from subprocess import Popen, PIPE, CalledProcessError +from sys import stdout from threading import Thread from time import sleep @@ -43,32 +44,55 @@ class ShimWorkerThread(Thread): return self.return_obj def join_or_kill(self, timeout): + """ + Wait for thread to join after timeout (seconds). If still alive, it is then terminated, then if still alive, + killed + """ self.join(timeout) if self.is_alive(): - print '\n Thread %s (pid=%d) alive after timeout, terminating...' % (self.name, self.proc.pid), - self.proc.terminate() - sleep(1) - if self.is_alive(): - print ' Thread %s (pid=%d) alive after terminate, killing...' % (self.name, self.proc.pid), - self.proc.kill() - sleep(1) - if self.is_alive(): - print ' ERROR: Thread %s (pid=%d) alive after kill' % (self.name, self.proc.pid) + if self._terminate_loop(): + if self._kill_loop(): + print '\n ERROR: Thread %s (pid=%d) alive after kill' % (self.name, self.proc.pid) + stdout.flush() else: print 'Killed' + stdout.flush() else: print 'Terminated' + stdout.flush() + + def _terminate_loop(self, num_attempts=2, wait_time=2): + cnt = 0 + while cnt < num_attempts and self.is_alive(): + cnt += 1 + print '\n Thread %s (pid=%d) alive after timeout, terminating (try #%d)...' % (self.name, self.proc.pid, + cnt), + stdout.flush() + self.proc.terminate() + sleep(wait_time) + return self.is_alive() + + def _kill_loop(self, num_attempts=5, wait_time=5): + cnt = 0 + while cnt < num_attempts and self.is_alive(): + cnt += 1 + print '\n Thread %s (pid=%d) alive after terminate, killing (try #%d)...' % (self.name, self.proc.pid, + cnt), + stdout.flush() + self.proc.kill() + sleep(wait_time) + return self.is_alive() class Sender(ShimWorkerThread): """Sender class for multi-threaded send""" - def __init__(self, use_shell_flag, send_shim_args, broker_addr, queue_name, type, json_test_str): - super(Sender, self).__init__('sender_thread') + def __init__(self, use_shell_flag, send_shim_args, broker_addr, queue_name, msg_type, json_test_str): + super(Sender, self).__init__('sender_thread_%s' % queue_name) if send_shim_args is None: print 'ERROR: Sender: send_shim_args == None' self.use_shell_flag = use_shell_flag self.arg_list.extend(send_shim_args) - self.arg_list.extend([broker_addr, queue_name, type, json_test_str]) + self.arg_list.extend([broker_addr, queue_name, msg_type, json_test_str]) def run(self): """Thread starts here""" @@ -84,12 +108,12 @@ class Sender(ShimWorkerThread): class Receiver(ShimWorkerThread): """Receiver class for multi-threaded receive""" - def __init__(self, receive_shim_args, broker_addr, queue_name, type, json_test_str): + def __init__(self, receive_shim_args, broker_addr, queue_name, msg_type, json_test_str): super(Receiver, self).__init__('receiver_thread') if receive_shim_args is None: print 'ERROR: Receiver: receive_shim_args == None' self.arg_list.extend(receive_shim_args) - self.arg_list.extend([broker_addr, queue_name, type, json_test_str]) + self.arg_list.extend([broker_addr, queue_name, msg_type, json_test_str]) def run(self): """Thread starts here""" @@ -124,13 +148,13 @@ class Shim(object): self.receive_params = None self.use_shell_flag = False - def create_sender(self, broker_addr, queue_name, type, json_test_str): + def create_sender(self, broker_addr, queue_name, msg_type, json_test_str): """Create a new sender instance""" - return Sender(self.use_shell_flag, self.send_params, broker_addr, queue_name, type, json_test_str) + return Sender(self.use_shell_flag, self.send_params, broker_addr, queue_name, msg_type, json_test_str) - def create_receiver(self, broker_addr, queue_name, type, json_test_str): + def create_receiver(self, broker_addr, queue_name, msg_type, json_test_str): """Create a new receiver instance""" - return Receiver(self.receive_params, broker_addr, queue_name, type, json_test_str) + return Receiver(self.receive_params, broker_addr, queue_name, msg_type, json_test_str) class ProtonPythonShim(Shim): """Shim for qpid-proton Python client""" http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/5dc10051/src/python/qpid-interop-test/types/simple_type_tests.py ---------------------------------------------------------------------- diff --git a/src/python/qpid-interop-test/types/simple_type_tests.py b/src/python/qpid-interop-test/types/simple_type_tests.py index 17d4470..0482f2e 100755 --- a/src/python/qpid-interop-test/types/simple_type_tests.py +++ b/src/python/qpid-interop-test/types/simple_type_tests.py @@ -200,25 +200,26 @@ class AmqpPrimitiveTypes(TestTypeMap): 'short:8', 'short:9'] * 10 ], - 'map': [# Enpty map - {}, - # Map with string keys - {'string:one': 'ubyte:1', - 'string:two': 'ushort:2'}, - # Map with other AMQP simple types as keys - {'none:': 'string:None', - 'string:None': 'none:', - 'string:One': 'long:-1234567890', - 'short:2': 'int:2', - 'boolean:True': 'string:True', - 'string:False': 'boolean:False', - #['string:AAA', 'ushort:5951']: 'string:list value', - #{'byte:-55': 'ubyte:200', - # 'boolean:True': 'string:Hello, world!'}: 'symbol:map.value', - #'string:list': [], - 'string:map': {'char:A': 'int:1', - 'char:B': 'int:2'}}, - ], + 'map': [ + # Enpty map + {}, + # Map with string keys + {'string:one': 'ubyte:1', + 'string:two': 'ushort:2'}, + # Map with other AMQP simple types as keys + {'none:': 'string:None', + 'string:None': 'none:', + 'string:One': 'long:-1234567890', + 'short:2': 'int:2', + 'boolean:True': 'string:True', + 'string:False': 'boolean:False', + #['string:AAA', 'ushort:5951']: 'string:list value', + #{'byte:-55': 'ubyte:200', + # 'boolean:True': 'string:Hello, world!'}: 'symbol:map.value', + #'string:list': [], + 'string:map': {'char:A': 'int:1', + 'char:B': 'int:2'}}, + ], # TODO: Support all AMQP types in array (including keys) #'array': [[], # [1, 2, 3], --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
