Repository: qpid-interop-test Updated Branches: refs/heads/master c810fc3af -> f5d177d05
QPIDIT-40: Added thread kill after timeout. Also improved reporting of errors from shims using stderr. Project: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/commit/f5d177d0 Tree: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/tree/f5d177d0 Diff: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/diff/f5d177d0 Branch: refs/heads/master Commit: f5d177d0537aa22ccce5a384ec6419e7cf7992a3 Parents: c810fc3 Author: Kim van der Riet <[email protected]> Authored: Fri Sep 30 13:52:28 2016 -0400 Committer: Kim van der Riet <[email protected]> Committed: Fri Sep 30 13:52:28 2016 -0400 ---------------------------------------------------------------------- .../src/qpidit/shim/JmsReceiver.cpp | 8 ++- .../qpid-interop-test/jms/jms_message_tests.py | 8 +-- src/python/qpid-interop-test/shims.py | 58 ++++++++++++++------ .../types/simple_type_tests.py | 8 +-- 4 files changed, 54 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f5d177d0/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp index c2ab893..8cbe515 100644 --- a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp +++ b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp @@ -77,7 +77,13 @@ namespace qpidit void JmsReceiver::on_message(proton::delivery &d, proton::message &m) { try { if (_received < _expected) { - switch (m.message_annotations().get(proton::symbol("x-opt-jms-msg-type")).get<int8_t>()) { + int8_t t = JMS_MESSAGE_TYPE; + try {t = m.message_annotations().get(proton::symbol("x-opt-jms-msg-type")).get<int8_t>();} + catch (const std::exception& e) { + std::cout << "JmsReceiver::on_message(): Missing annotation \"x-opt-jms-msg-type\"" << std::endl; + throw; + } + switch (t) { case JMS_MESSAGE_TYPE: receiveJmsMessage(m); break; http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f5d177d0/src/python/qpid-interop-test/jms/jms_message_tests.py ---------------------------------------------------------------------- diff --git a/src/python/qpid-interop-test/jms/jms_message_tests.py b/src/python/qpid-interop-test/jms/jms_message_tests.py index 4907872..6ea1c01 100755 --- a/src/python/qpid-interop-test/jms/jms_message_tests.py +++ b/src/python/qpid-interop-test/jms/jms_message_tests.py @@ -278,12 +278,8 @@ class JmsMessageTypeTestCase(unittest.TestCase): sender.start() # Wait for both shims to finish - sender.join(shims.THREAD_TIMEOUT) - if sender.isAlive(): - print 'Sender thread %s timed out' % sender.getName() - receiver.join(shims.THREAD_TIMEOUT) - if receiver.isAlive(): - print 'Receiver thread %s timed out' % receiver.getName() + sender.join_or_kill(shims.THREAD_TIMEOUT) + receiver.join_or_kill(shims.THREAD_TIMEOUT) # Process return string from sender send_obj = sender.get_return_object() http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f5d177d0/src/python/qpid-interop-test/shims.py ---------------------------------------------------------------------- diff --git a/src/python/qpid-interop-test/shims.py b/src/python/qpid-interop-test/shims.py index 08e1e7d..a2a48dd 100644 --- a/src/python/qpid-interop-test/shims.py +++ b/src/python/qpid-interop-test/shims.py @@ -22,11 +22,12 @@ Module containing worker thread classes and shims from json import loads from os import getenv, path -from subprocess import check_output, CalledProcessError +from subprocess import Popen, PIPE, CalledProcessError from threading import Thread +from time import sleep -THREAD_TIMEOUT = 15.0 # seconds to complete before join is forced +THREAD_TIMEOUT = 10.0 # seconds to complete before join is forced class ShimWorkerThread(Thread): @@ -35,11 +36,29 @@ class ShimWorkerThread(Thread): super(ShimWorkerThread, self).__init__(name=thread_name) self.arg_list = [] self.return_obj = None + self.proc = None def get_return_object(self): """Get the return object from the completed thread""" return self.return_obj + def join_or_kill(self, timeout): + self.join(timeout) + if self.is_alive(): + print '\n Thread %s (pid=%d) alive after timeout, terminating...' % (self.name, self.proc.pid), + self.proc.terminate() + sleep(1) + if self.is_alive(): + print ' Thread %s (pid=%d) alive after terminate, killing...' % (self.name, self.proc.pid), + self.proc.kill() + sleep(1) + if self.is_alive(): + print ' ERROR: Thread %s (pid=%d) alive after kill' % (self.name, self.proc.pid) + else: + print 'Killed' + else: + print 'Terminated' + class Sender(ShimWorkerThread): """Sender class for multi-threaded send""" @@ -55,9 +74,10 @@ class Sender(ShimWorkerThread): """Thread starts here""" try: #print '\n>>>', self.arg_list # DEBUG - useful to see command-line sent to shim - return_str = check_output(self.arg_list, shell=self.use_shell_flag) - if len(return_str) > 0: - self.return_obj = return_str + self.proc = Popen(self.arg_list, stdout=PIPE, stderr=PIPE, shell=self.use_shell_flag) + (stdoutdata, stderrdata) = self.proc.communicate() + if len(stdoutdata) > 0 or len(stderrdata) > 0: + self.return_obj = (stdoutdata, stderrdata) except CalledProcessError as exc: self.return_obj = str(exc) + '\n\nOutput:\n' + exc.output @@ -75,18 +95,22 @@ class Receiver(ShimWorkerThread): """Thread starts here""" try: #print '\n>>>', self.arg_list # DEBUG - useful to see command-line sent to shim - output = check_output(self.arg_list) - #print '<<<', output # DEBUG - useful to see text received from shim - str_tvl = output.split('\n')[0:-1] # remove trailing \n - #if len(str_tvl) == 1: - # self.return_obj = output - if len(str_tvl) == 2: # AMQP type test return - self.return_obj = loads(str_tvl[1]) - elif len(str_tvl) == 4: # JMS test return - self.return_obj = (str_tvl[0], loads(str_tvl[1]), loads(str_tvl[2]), loads(str_tvl[3])) - else: # Make a single line of all the bits and return that - #self.return_obj = loads("".join(str_tvl[1:])) - self.return_obj = output + self.proc = Popen(self.arg_list, stdout=PIPE, stderr=PIPE) + (stdoutdata, stderrdata) = self.proc.communicate() + if len(stderrdata) > 0: + self.return_obj = (stdoutdata, stderrdata) + else: + #print '<<<', stdoutdata # DEBUG - useful to see text received from shim + str_tvl = stdoutdata.split('\n')[0:-1] # remove trailing \n + #if len(str_tvl) == 1: + # self.return_obj = output + if len(str_tvl) == 2: # AMQP type test return + self.return_obj = loads(str_tvl[1]) + elif len(str_tvl) == 4: # JMS test return + self.return_obj = (str_tvl[0], loads(str_tvl[1]), loads(str_tvl[2]), loads(str_tvl[3])) + else: # Make a single line of all the bits and return that + #self.return_obj = loads("".join(str_tvl[1:])) + self.return_obj = stdoutdata except CalledProcessError as exc: self.return_obj = str(exc) + '\n\n' + exc.output http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f5d177d0/src/python/qpid-interop-test/types/simple_type_tests.py ---------------------------------------------------------------------- diff --git a/src/python/qpid-interop-test/types/simple_type_tests.py b/src/python/qpid-interop-test/types/simple_type_tests.py index 429f506..17d4470 100755 --- a/src/python/qpid-interop-test/types/simple_type_tests.py +++ b/src/python/qpid-interop-test/types/simple_type_tests.py @@ -279,8 +279,8 @@ class AmqpTypeTestCase(unittest.TestCase): sender.start() # Wait for both shims to finish - sender.join(shims.THREAD_TIMEOUT) - receiver.join(shims.THREAD_TIMEOUT) + sender.join_or_kill(shims.THREAD_TIMEOUT) + receiver.join_or_kill(shims.THREAD_TIMEOUT) # Process return string from sender send_obj = sender.get_return_object() @@ -288,7 +288,7 @@ class AmqpTypeTestCase(unittest.TestCase): if isinstance(send_obj, str) and len(send_obj) > 0: self.fail('Send shim \'%s\':\n%s' % (send_shim.NAME, send_obj)) else: - self.fail(str(send_obj)) + self.fail('Sender error: %s' % str(send_obj)) # Process return string from receiver receive_obj = receiver.get_return_object() @@ -296,7 +296,7 @@ class AmqpTypeTestCase(unittest.TestCase): self.assertEqual(receive_obj, test_value_list, msg='\n sent:%s\nreceived:%s' % \ (test_value_list, receive_obj)) else: - self.fail(receive_obj) + self.fail('Receiver error: %s' % str(receive_obj)) def create_testcase_class(broker_name, types, broker_addr, amqp_type, shim_product): """ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
