QPIDIT-18: Added JSON transfer of structured data between test and shims. Also: QPIDIT-6: AMQP type decimal64 fails..., QPIDIT-5: AMQP type decimal32 fails..., QPIDIT-3: AMQP type decimal128 hangs...
Project: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/commit/899a9a86 Tree: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/tree/899a9a86 Diff: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/diff/899a9a86 Branch: refs/heads/master Commit: 899a9a863cf0b95bb78f59ac9b190e67b1a811e5 Parents: 4dfa2bd Author: Kim van der Riet <[email protected]> Authored: Tue Sep 29 17:07:39 2015 -0400 Committer: Kim van der Riet <[email protected]> Committed: Tue Sep 29 17:07:39 2015 -0400 ---------------------------------------------------------------------- QUICKSTART | 14 +- etc/proton-python-amqp-types.patch | 8 +- shims/qpid-proton-python/src/JmsReceiverShim.py | 244 ++++++++++++++++ shims/qpid-proton-python/src/JmsSenderShim.py | 237 ++++++++++++++++ .../qpid-proton-python/src/TypesReceiverShim.py | 109 ++++++++ shims/qpid-proton-python/src/TypesSenderShim.py | 132 +++++++++ shims/qpid-proton-python/src/amqp-receive | 102 ------- shims/qpid-proton-python/src/amqp-send | 135 --------- .../qpid-proton-python/src/jms-receiver-shim.py | 234 ---------------- shims/qpid-proton-python/src/jms-sender-shim.py | 241 ---------------- .../qpid-interop-test/jms/jms_message_tests.py | 113 ++++++-- .../types/simple_type_tests.py | 279 +++++++++++-------- 12 files changed, 977 insertions(+), 871 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/899a9a86/QUICKSTART ---------------------------------------------------------------------- diff --git a/QUICKSTART b/QUICKSTART index bb30d97..d94fc2a 100644 --- a/QUICKSTART +++ b/QUICKSTART @@ -45,14 +45,18 @@ Note that installation is still to be completed, this section will change to reflect installation details when complete. Assuming proton's make install has been run, from top level qpid-interop-test directory: -export PYTHONPATH=/usr/local/lib64/proton/bindings/python:src/py/qpid-interop-test +export PYTHONPATH=/usr/lib64/python2.7/site-packages:src/py/qpid-interop-test export LD_LIBRARY_PATH=/usr/local/lib64 export QPID_INTEROP_TEST_HOME=<abs path to top level qpid-interop-test directory> -Start a broker. If using qpidd: -qpidd --load-module amqp.so -m yes --auth no --queue-pattern qpid-interop --default-flow-stop-threshold 0 --default-flow-resume-threshold 0 --default-queue-limit 0 --log-enable info+ - -NOTE: for qpidd, YOU MUST USE THE --queue-pattern qpid-interop parameter so it will create non-existent queues as needed. +Start a broker (Active-MQ or Qpid). +NOTE: For qpidd, YOU MUST USE THE --queue-pattern qpid-interop parameter so it will create non-existent queues as needed. +NOTE: For qpidd, there are some bugs in the broker which will cause the simple type tests for the following types to fail: + * char + * decimal32 + * decimal64 + * deciaml128 + See https://issues.apache.org/jira/browse/QPID-6328 for more info on this. From top level directory: ./src/py/qpid-interop-test/jms/jms_message_tests.py http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/899a9a86/etc/proton-python-amqp-types.patch ---------------------------------------------------------------------- diff --git a/etc/proton-python-amqp-types.patch b/etc/proton-python-amqp-types.patch index 9a4da22..a337f2d 100644 --- a/etc/proton-python-amqp-types.patch +++ b/etc/proton-python-amqp-types.patch @@ -1,5 +1,5 @@ diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py -index 353c396..e5e0eee 100644 +index 0567095..9bf2ce0 100644 --- a/proton-c/bindings/python/proton/__init__.py +++ b/proton-c/bindings/python/proton/__init__.py @@ -1290,6 +1290,56 @@ class char(unicode): @@ -46,10 +46,10 @@ index 353c396..e5e0eee 100644 + def __repr__(self): + return "decimal32(%s)" % int.__repr__(self) + -+class decimal64(int): ++class decimal64(long): + + def __repr__(self): -+ return "decimal64(%s)" % int.__repr__(self) ++ return "decimal64(%s)" % long.__repr__(self) + +class decimal128(bytes): + @@ -182,7 +182,7 @@ index 353c396..e5e0eee 100644 Described: put_py_described, Array: put_py_array } -@@ -4062,5 +4123,15 @@ __all__ = [ +@@ -4123,5 +4184,15 @@ __all__ = [ "dispatch", "symbol", "timestamp", http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/899a9a86/shims/qpid-proton-python/src/JmsReceiverShim.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/JmsReceiverShim.py b/shims/qpid-proton-python/src/JmsReceiverShim.py new file mode 100755 index 0000000..0ad4425 --- /dev/null +++ b/shims/qpid-proton-python/src/JmsReceiverShim.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +from interop_test_errors import InteropTestError +from json import dumps, loads +from proton import byte, symbol +from proton.handlers import MessagingHandler +from proton.reactor import Container +from struct import pack, unpack +from subprocess import check_output +from traceback import format_exc + +# These values must tie in with the Qpid-JMS client values found in +# org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport +QPID_JMS_TYPE_ANNOTATION_NAME = symbol(u'x-opt-jms-msg-type') + +class JmsReceiverShim(MessagingHandler): + def __init__(self, url, jms_msg_type, expected_msg_map): + super(JmsReceiverShim, self).__init__() + self.url = url + self.jms_msg_type = jms_msg_type + self.expteced_msg_map = expected_msg_map + self.subtype_itr = iter(sorted(self.expteced_msg_map.keys())) + self.expected = self._get_tot_num_messages() + self.received = 0 + self.received_value_map = {} + self.current_subtype = None + self.current_subtype_msg_list = None + + def get_received_value_map(self): + return self.received_value_map + + def on_start(self, event): + event.container.create_receiver(self.url) + + def on_message(self, event): + if event.message.id and event.message.id < self.received: + return # ignore duplicate message + if self.expected == 0 or self.received < self.expected: + if self.current_subtype is None: + self.current_subtype = self.subtype_itr.next() + self.current_subtype_msg_list = [] + self.current_subtype_msg_list.append(self._handle_message(event.message)) + if len(self.current_subtype_msg_list) >= self.expteced_msg_map[self.current_subtype]: + self.received_value_map[self.current_subtype] = self.current_subtype_msg_list + self.current_subtype = None + self.current_subtype_msg_list = [] + self.received += 1 + if self.received == self.expected: + event.receiver.close() + event.connection.close() + + def _handle_message(self, message): + if self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE': + return self._receive_jms_bytesmessage(message) + if self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE': + return self._recieve_jms_mapmessage(message) + if self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE': + return self._recieve_jms_objectmessage(message) + if self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE': + return self._receive_jms_streammessage(message) + if self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE': + return self._receive_jms_textmessage(message) + print 'jms-receive: Unsupported JMS message type "%s"' % self.jms_msg_type + return None + + def _get_tot_num_messages(self): + total = 0 + for key in self.expteced_msg_map: + total += int(self.expteced_msg_map[key]) + return total + + def _receive_jms_bytesmessage(self, message): + assert self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE' + assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(3) + if self.current_subtype == 'boolean': + if message.body == b'\x00': + return 'False' + if message.body == b'\x01': + return 'True' + raise InteropTestError('_receive_jms_bytesmessage: Invalid encoding for subtype boolean: %s' % + str(message.body)) + if self.current_subtype == 'byte': + return hex(unpack('b', message.body)[0]) + if self.current_subtype == 'bytes': + return str(message.body) + if self.current_subtype == 'char': + if len(message.body) == 2: # format 'a' or '\xNN' + return str(message.body[1]) # strip leading '\x00' char + raise InteropTestError('Unexpected strring length for type char: %d' % len(message.body)) + if self.current_subtype == 'double': + return '0x%016x' % unpack('!Q', message.body)[0] + if self.current_subtype == 'float': + return '0x%08x' % unpack('!L', message.body)[0] + if self.current_subtype == 'int': + return hex(unpack('!i', message.body)[0]) + if self.current_subtype == 'long': + return hex(unpack('!q', message.body)[0]) + if self.current_subtype == 'short': + return hex(unpack('!h', message.body)[0]) + if self.current_subtype == 'string': + # NOTE: first 2 bytes are string length, must be present + if len(message.body) >= 2: + str_len = unpack('!H', message.body[:2])[0] + str_body = str(message.body[2:]) + if len(str_body) != str_len: + raise InteropTestError('String length mismatch: size=%d, but len(\'%s\')=%d' % + (str_len, str_body, len(str_body))) + return str_body + else: + raise InteropTestError('Malformed string binary: len(\'%s\')=%d' % + (repr(message.body), len(message.body))) + raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' % + (self.jms_msg_type, self.current_subtype)) + + def _recieve_jms_mapmessage(self, message): + assert self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE' + assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(2) + key, value = message.body.items()[0] + assert key[:-3] == self.current_subtype + if self.current_subtype == 'boolean': + return str(value) + if self.current_subtype == 'byte': + return hex(value) + if self.current_subtype == 'bytes': + return str(value) + if self.current_subtype == 'char': + return str(value) + if self.current_subtype == 'double': + return '0x%016x' % unpack('!Q', pack('!d', value))[0] + if self.current_subtype == 'float': + return '0x%08x' % unpack('!L', pack('!f', value))[0] + if self.current_subtype == 'int': + return hex(value) + if self.current_subtype == 'long': + return hex(int(value)) + if self.current_subtype == 'short': + return hex(value) + if self.current_subtype == 'string': + return str(value) + raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' % + (self.jms_msg_type, self.current_subtype)) + + def _recieve_jms_objectmessage(self, message): + assert self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE' + assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(1) + return self._get_java_obj(message.body) + + def _get_java_obj(self, java_obj_bytes): + ''' + Take bytes from serialized Java object and construct a Java object, then return its toString() value. The + work of 'translating' the bytes to a Java object and obtaining its class and value is done in a Java + utility org.apache.qpid.interop_test.obj_util.BytesToJavaObj located in jar JavaObjUtils.jar. + java_obj_bytes: hex string representation of bytes from Java object (eg 'aced00057372...') + returns: string containing Java class value as returned by the toString() method + ''' + java_obj_bytes_str = ''.join(["%02x" % ord(x) for x in java_obj_bytes]).strip() + out_str = check_output(['java', + '-cp', + 'target/JavaObjUtils.jar', + 'org.apache.qpid.interop_test.obj_util.BytesToJavaObj', + java_obj_bytes_str]) + out_str_list = out_str.split('\n')[:-1] # remove trailing \n + if len(out_str_list) > 1: + raise InteropTestError('Unexpected return from JavaObjUtils: %s' % out_str) + colon_index = out_str_list[0].index(':') + if colon_index < 0: + raise InteropTestError('Unexpected format from JavaObjUtils: %s' % out_str) + java_class_name = out_str_list[0][:colon_index] + java_class_value_str = out_str_list[0][colon_index+1:] + if java_class_name != self.current_subtype: + raise InteropTestError('Unexpected class name from JavaObjUtils: expected %s, recieved %s' % + (self.current_subtype, java_class_name)) + return java_class_value_str + + def _receive_jms_streammessage(self, message): + assert self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE' + assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(4) + # Every message is a list with one item [value] + assert len(message.body) == 1 + value = message.body[0] + if self.current_subtype == 'boolean': + return str(value) + if self.current_subtype == 'byte': + return hex(value) + if self.current_subtype == 'bytes': + return str(value) + if self.current_subtype == 'char': + return str(value) + if self.current_subtype == 'double': + return '0x%016x' % unpack('!Q', pack('!d', value))[0] + if self.current_subtype == 'float': + return '0x%08x' % unpack('!L', pack('!f', value))[0] + if self.current_subtype == 'int': + return hex(value) + if self.current_subtype == 'long': + return hex(int(value)) + if self.current_subtype == 'short': + return hex(value) + if self.current_subtype == 'string': + return str(value) + raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' % + (self.jms_msg_type, self.current_subtype)) + + def _receive_jms_textmessage(self, message): + assert self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE' + assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(5) + return message.body + + +# --- main --- +# Args: 1: Broker address (ip-addr:port) +# 2: Queue name +# 3: JMS message type +# 4: JSON string of map containing number of test values to receive for each type/subtype +#print '#### sys.argv=%s' % sys.argv +try: + RECEIVER = JmsReceiverShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], loads(sys.argv[4])) + Container(RECEIVER).run() + print sys.argv[3] + print dumps(RECEIVER.get_received_value_map()) +except KeyboardInterrupt: + pass +except Exception as exc: + print 'jms-receiver-shim EXCEPTION:', exc + print format_exc() http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/899a9a86/shims/qpid-proton-python/src/JmsSenderShim.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/JmsSenderShim.py b/shims/qpid-proton-python/src/JmsSenderShim.py new file mode 100755 index 0000000..ca3d0a9 --- /dev/null +++ b/shims/qpid-proton-python/src/JmsSenderShim.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +from json import loads +from proton import byte, char, float32, int32, Message, short, symbol +from proton.handlers import MessagingHandler +from proton.reactor import Container +from interop_test_errors import InteropTestError +from subprocess import check_output +from struct import pack, unpack +from traceback import format_exc + +# These values must tie in with the Qpid-JMS client values found in +# org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport +QPID_JMS_TYPE_ANNOTATION_NAME = symbol(u'x-opt-jms-msg-type') +QPID_JMS_TYPE_ANNOTATIONS = { + 'JMS_BYTESMESSAGE_TYPE': byte(3), + 'JMS_MAPMESSAGE_TYPE': byte(2), + 'JMS_OBJECTMESSAGE_TYPE': byte(1), + 'JMS_STREAMMESSAGE_TYPE': byte(4), + 'JMS_TEXTMESSAGE_TYPE': byte(5) + } +def create_annotation(jms_msg_type): + return {QPID_JMS_TYPE_ANNOTATION_NAME: QPID_JMS_TYPE_ANNOTATIONS[jms_msg_type]} + +class JmsSenderShim(MessagingHandler): + def __init__(self, url, jms_msg_type, test_value_map): + super(JmsSenderShim, self).__init__() + self.url = url + self.jms_msg_type = jms_msg_type + self.test_value_map = test_value_map + self.sent = 0 + self.confirmed = 0 + self.total = self._get_total_num_msgs() + + def on_start(self, event): + event.container.create_sender(self.url) + + def on_sendable(self, event): + if self.sent == 0: + # These types expect a test_values Python string representation of a map: '{type:[val, val, val], ...}' + for sub_type in sorted(self.test_value_map.keys()): + if self._send_test_values(event, sub_type, self.test_value_map[sub_type]): + return + + def _get_total_num_msgs(self): + total = 0 + for key in self.test_value_map.keys(): + total += len(self.test_value_map[key]) + return total + + def _send_test_values(self, event, test_value_type, test_values): + value_num = 0 + for test_value in test_values: + if event.sender.credit: + message = self._create_message(test_value_type, test_value, value_num) + if message is not None: + event.sender.send(message) + self.sent += 1 + value_num += 1 + else: + event.connection.close() + return True + return False + + # TODO: Change this to return a list of messages. That way each test can return more than one message + def _create_message(self, test_value_type, test_value, value_num): + if self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE': + return self._create_jms_bytesmessage(test_value_type, test_value) + elif self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE': + return self._create_jms_mapmessage(test_value_type, test_value, "%s%03d" % (test_value_type, value_num)) + elif self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE': + return self._create_jms_objectmessage('%s:%s' % (test_value_type, test_value)) + elif self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE': + return self._create_jms_streammessage(test_value_type, test_value) + elif self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE': + return self._create_jms_textmessage(test_value) + else: + print 'jms-send: Unsupported JMS message type "%s"' % self.jms_msg_type + return None + + def _create_jms_bytesmessage(self, test_value_type, test_value): + # NOTE: test_value contains all unicode strings u'...' as returned by json + body_bytes = None + if test_value_type == 'boolean': + body_bytes = b'\x01' if test_value == 'True' else b'\x00' + elif test_value_type == 'byte': + body_bytes = pack('b', int(test_value, 16)) + elif test_value_type == 'bytes': + body_bytes = str(test_value) # remove unicode + elif test_value_type == 'char': + # JMS expects two-byte chars, ASCII chars can be prefixed with '\x00' + body_bytes = '\x00' + str(test_value) # remove unicode + elif test_value_type == 'double' or test_value_type == 'float': + body_bytes = test_value[2:].decode('hex') + elif test_value_type == 'int': + body_bytes = pack('!i', int(test_value, 16)) + elif test_value_type == 'long': + body_bytes = pack('!q', long(test_value, 16)) + elif test_value_type == 'short': + body_bytes = pack('!h', short(test_value, 16)) + elif test_value_type == 'string': + # NOTE: First two bytes must be string length + test_value_str = str(test_value) # remove unicode + body_bytes = pack('!H', len(test_value_str)) + test_value_str + else: + raise InteropTestError('JmsSenderShim._create_jms_bytesmessage: Unknown or unsupported subtype "%s"' % + test_value_type) + return Message(id=(self.sent+1), + body=body_bytes, + inferred=True, + content_type='application/octet-stream', + annotations=create_annotation('JMS_BYTESMESSAGE_TYPE')) + + def _create_jms_mapmessage(self, test_value_type, test_value, name): + if test_value_type == 'boolean': + value = test_value == 'True' + elif test_value_type == 'byte': + value = byte(int(test_value, 16)) + elif test_value_type == 'bytes': + value = str(test_value) # remove unicode + elif test_value_type == 'char': + value = char(test_value) + elif test_value_type == 'double': + value = unpack('!d', test_value[2:].decode('hex'))[0] + elif test_value_type == 'float': + value = float32(unpack('!f', test_value[2:].decode('hex'))[0]) + elif test_value_type == 'int': + value = int32(int(test_value, 16)) + elif test_value_type == 'long': + value = int(test_value, 16) + elif test_value_type == 'short': + value = short(int(test_value, 16)) + elif test_value_type == 'string': + value = test_value + else: + raise InteropTestError('JmsSenderShim._create_jms_mapmessage: Unknown or unsupported subtype "%s"' % + test_value_type) + return Message(id=(self.sent+1), + body={name: value}, + inferred=False, + annotations=create_annotation('JMS_MAPMESSAGE_TYPE')) + + def _create_jms_objectmessage(self, test_value): + java_binary = self._get_java_obj_binary(test_value) + return Message(id=(self.sent+1), + body=java_binary, + inferred=True, + content_type='application/x-java-serialized-object', + annotations=create_annotation('JMS_OBJECTMESSAGE_TYPE')) + + @staticmethod + def _get_java_obj_binary(java_class_str): + out_str = check_output(['java', + '-cp', + 'target/JavaObjUtils.jar', + 'org.apache.qpid.interop_test.obj_util.JavaObjToBytes', + java_class_str]) + out_str_list = out_str.split('\n')[:-1] # remove trailing \n + if out_str_list[0] != java_class_str: + raise InteropTestError('JmsSenderShim._get_java_obj_binary(): Call to JavaObjToBytes failed\n%s' % out_str) + return out_str_list[1].decode('hex') + + def _create_jms_streammessage(self, test_value_type, test_value): + if test_value_type == 'boolean': + body_list = [test_value == 'True'] + elif test_value_type == 'byte': + body_list = [byte(int(test_value, 16))] + elif test_value_type == 'bytes': + body_list = [str(test_value)] + elif test_value_type == 'char': + body_list = [char(test_value)] + elif test_value_type == 'double': + body_list = [unpack('!d', test_value[2:].decode('hex'))[0]] + elif test_value_type == 'float': + body_list = [float32(unpack('!f', test_value[2:].decode('hex'))[0])] + elif test_value_type == 'int': + body_list = [int32(int(test_value, 16))] + elif test_value_type == 'long': + body_list = [int(test_value, 16)] + elif test_value_type == 'short': + body_list = [short(int(test_value, 16))] + elif test_value_type == 'string': + body_list = [test_value] + else: + raise InteropTestError('JmsSenderShim._create_jms_streammessage: Unknown or unsupported subtype "%s"' % + test_value_type) + return Message(id=(self.sent+1), + body=body_list, + inferred=True, + annotations=create_annotation('JMS_STREAMMESSAGE_TYPE')) + + def _create_jms_textmessage(self, test_value_text): + return Message(id=(self.sent+1), + body=unicode(test_value_text), + annotations=create_annotation('JMS_TEXTMESSAGE_TYPE')) + + def on_accepted(self, event): + self.confirmed += 1 + if self.confirmed == self.total: + event.connection.close() + + def on_disconnected(self, event): + self.sent = self.confirmed + +# --- main --- +# Args: 1: Broker address (ip-addr:port) +# 2: Queue name +# 3: JMS message type +# 4: Test value(s) as JSON string +#print '#### sys.argv=%s' % sys.argv +#print '>>> test_values=%s' % loads(sys.argv[4]) +try: + Container(JmsSenderShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], loads(sys.argv[4]))).run() +except KeyboardInterrupt: + pass +except Exception as exc: + print 'jms-sender-shim EXCEPTION:', exc + print format_exc() http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/899a9a86/shims/qpid-proton-python/src/TypesReceiverShim.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/TypesReceiverShim.py b/shims/qpid-proton-python/src/TypesReceiverShim.py new file mode 100755 index 0000000..e08ecdb --- /dev/null +++ b/shims/qpid-proton-python/src/TypesReceiverShim.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Issues: +# * Capturing errors from client or broker + +import sys +from json import dumps +from proton.handlers import MessagingHandler +from proton.reactor import Container +from traceback import format_exc +from struct import pack, unpack + +class Receiver(MessagingHandler): + def __init__(self, url, amqp_type, num_expected_messages_str): + super(Receiver, self).__init__() + self.url = url + self.received_value_list = [] + self.amqp_type = amqp_type + self.expected = int(num_expected_messages_str) + self.received = 0 + + def get_received_value_list(self): + return self.received_value_list + + def on_start(self, event): + event.container.create_receiver(self.url) + + def on_message(self, event): + if event.message.id and event.message.id < self.received: + return # ignore duplicate message + if self.expected == 0 or self.received < self.expected: + if self.amqp_type == 'null' or \ + self.amqp_type == 'boolean' or \ + self.amqp_type == 'timestamp' or \ + self.amqp_type == 'uuid': + self.received_value_list.append(str(event.message.body)) + elif self.amqp_type == 'ubyte' or \ + self.amqp_type == 'ushort' or \ + self.amqp_type == 'uint' or \ + self.amqp_type == 'byte' or \ + self.amqp_type == 'short' or \ + self.amqp_type == 'int': + self.received_value_list.append(hex(event.message.body)) + elif self.amqp_type == 'ulong' or \ + self.amqp_type == 'long': + hex_str = hex(int(event.message.body)) + if len(hex_str) == 19 and hex_str[-1] == 'L': + self.received_value_list.append(hex_str[:-1]) # strip trailing 'L' if present on some ulongs + else: + self.received_value_list.append(hex_str) + elif self.amqp_type == 'float': + self.received_value_list.append('0x%08x' % unpack('!L', pack('!f', event.message.body))[0]) + elif self.amqp_type == 'double': + self.received_value_list.append('0x%016x' % unpack('!Q', pack('!d', event.message.body))[0]) + elif self.amqp_type == 'decimal32': + self.received_value_list.append('0x%08x' % event.message.body) + elif self.amqp_type == 'decimal64': + self.received_value_list.append('0x%016x' % event.message.body) + elif self.amqp_type == 'decimal128': + self.received_value_list.append('0x' + ''.join(['%02x' % ord(c) for c in event.message.body]).strip()) + elif self.amqp_type == 'char' or \ + self.amqp_type == 'binary' or \ + self.amqp_type == 'string' or \ + self.amqp_type == 'symbol': + self.received_value_list.append(event.message.body) + elif self.amqp_type == 'list' or \ + self.amqp_type == 'map': + self.received_value_list.append(event.message.body) + else: + print 'receive: Unsupported AMQP type "%s"' % self.amqp_type + return + self.received += 1 + if self.received == self.expected: + event.receiver.close() + event.connection.close() + +# --- main --- +# Args: 1: Broker address (ip-addr:port) +# 2: Queue name +# 3: AMQP type +# 4: Expected number of test values to receive +try: + RECEIVER = Receiver('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], sys.argv[4]) + Container(RECEIVER).run() + print sys.argv[3] + print dumps(RECEIVER.get_received_value_list()) +except KeyboardInterrupt: + pass +except Exception as exc: + print 'proton-python-receive EXCEPTION:', exc + print format_exc() http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/899a9a86/shims/qpid-proton-python/src/TypesSenderShim.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/TypesSenderShim.py b/shims/qpid-proton-python/src/TypesSenderShim.py new file mode 100755 index 0000000..dd338b0 --- /dev/null +++ b/shims/qpid-proton-python/src/TypesSenderShim.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Issues: +# * Capturing errors from client or broker + +import sys +from json import loads +from proton import byte, char, decimal32, decimal64, decimal128, float32, int32, Message, short, symbol, timestamp, \ + ubyte, uint, ulong, ushort +from proton.handlers import MessagingHandler +from proton.reactor import Container +from struct import unpack +from traceback import format_exc +from uuid import UUID + +class Sender(MessagingHandler): + def __init__(self, url, amqp_type, test_value_list): + super(Sender, self).__init__() + self.url = url + self.amqp_type = amqp_type + self.test_value_list = test_value_list + self.sent = 0 + self.confirmed = 0 + self.total = len(test_value_list) + + def on_start(self, event): + event.container.create_sender(self.url) + + def on_sendable(self, event): + if self.sent == 0: + for test_value in self.test_value_list: + if event.sender.credit: + message = self.create_message(test_value) + if message is not None: + event.sender.send(message) + self.sent += 1 + else: + event.connection.close() + return + + def create_message(self, test_value): + if self.amqp_type == 'null': + return Message(id=(self.sent+1), body=None) + elif self.amqp_type == 'boolean': + return Message(id=(self.sent+1), body=True if test_value == 'True' else False) + elif self.amqp_type == 'ubyte': + return Message(id=(self.sent+1), body=ubyte(int(test_value, 16))) + elif self.amqp_type == 'ushort': + return Message(id=(self.sent+1), body=ushort(int(test_value, 16))) + elif self.amqp_type == 'uint': + return Message(id=(self.sent+1), body=uint(int(test_value, 16))) + elif self.amqp_type == 'ulong': + return Message(id=(self.sent+1), body=ulong(int(test_value, 16))) + elif self.amqp_type == 'byte': + return Message(id=(self.sent+1), body=byte(int(test_value, 16))) + elif self.amqp_type == 'short': + return Message(id=(self.sent+1), body=short(int(test_value, 16))) + elif self.amqp_type == 'int': + return Message(id=(self.sent+1), body=int32(int(test_value, 16))) + elif self.amqp_type == 'long': + return Message(id=(self.sent+1), body=long(int(test_value, 16))) + elif self.amqp_type == 'float': + return Message(id=(self.sent+1), body=float32(unpack('!f', test_value[2:].decode('hex'))[0])) + elif self.amqp_type == 'double': + return Message(id=(self.sent+1), body=unpack('!d', test_value[2:].decode('hex'))[0]) + elif self.amqp_type == 'decimal32': + return Message(id=(self.sent+1), body=decimal32(int(test_value[2:], 16))) + elif self.amqp_type == 'decimal64': + l64 = long(test_value[2:], 16) + return Message(id=(self.sent+1), body=decimal64(l64)) + elif self.amqp_type == 'decimal128': + return Message(id=(self.sent+1), body=decimal128(test_value[2:].decode('hex'))) + elif self.amqp_type == 'char': + return Message(id=(self.sent+1), body=char(test_value)) + elif self.amqp_type == 'timestamp': + return Message(id=(self.sent+1), body=timestamp(int(test_value))) + elif self.amqp_type == 'uuid': + return Message(id=(self.sent+1), body=UUID(test_value)) + elif self.amqp_type == 'binary': + return Message(id=(self.sent+1), body=bytes(test_value)) + elif self.amqp_type == 'string': + return Message(id=(self.sent+1), body=unicode(test_value)) + elif self.amqp_type == 'symbol': + return Message(id=(self.sent+1), body=symbol(test_value)) + elif self.amqp_type == 'list': + return Message(id=(self.sent+1), body=test_value) + elif self.amqp_type == 'map': + return Message(id=(self.sent+1), body=test_value) + else: + print 'send: Unsupported AMQP type "%s"' % self.amqp_type + return None + + def on_accepted(self, event): + self.confirmed += 1 + if self.confirmed == self.total: + event.connection.close() + + def on_disconnected(self, event): + self.sent = self.confirmed + + +# --- main --- +# Args: 1: Broker address (ip-addr:port) +# 2: Queue name +# 3: AMQP type +# 4...n: Test value(s) as strings +try: + Container(Sender('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], loads(sys.argv[4]))).run() +except KeyboardInterrupt: + pass +except Exception as exc: + print 'proton-python-send EXCEPTION:', exc + print format_exc() + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/899a9a86/shims/qpid-proton-python/src/amqp-receive ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/amqp-receive b/shims/qpid-proton-python/src/amqp-receive deleted file mode 100755 index c0cebff..0000000 --- a/shims/qpid-proton-python/src/amqp-receive +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Issues: -# * Capturing errors from client or broker - -import sys -from proton.handlers import MessagingHandler -from proton.reactor import Container -from traceback import format_exc -from struct import pack, unpack - -class Receiver(MessagingHandler): - def __init__(self, url, amqp_type, expected_num_messages): - super(Receiver, self).__init__() - self.url = url - self.amqp_type = amqp_type - self.received_value_list = [] - self.expected = int(expected_num_messages) - self.received = 0 - - def get_received_value_list(self): - return self.received_value_list - - def on_start(self, event): - event.container.create_receiver(self.url) - - def on_message(self, event): - if event.message.id and event.message.id < self.received: - return # ignore duplicate message - if self.expected == 0 or self.received < self.expected: - if self.amqp_type == 'null' or \ - self.amqp_type == 'boolean' or \ - self.amqp_type == 'ubyte' or \ - self.amqp_type == 'ushort' or \ - self.amqp_type == 'uint' or \ - self.amqp_type == 'ulong' or \ - self.amqp_type == 'byte' or \ - self.amqp_type == 'short' or \ - self.amqp_type == 'int' or \ - self.amqp_type == 'long' or \ - self.amqp_type == 'decimal32' or \ - self.amqp_type == 'decimal64' or \ - self.amqp_type == 'decimal128' or \ - self.amqp_type == 'timestamp' or \ - self.amqp_type == 'uuid': - self.received_value_list.append(str(event.message.body)) - elif self.amqp_type == 'float': - self.received_value_list.append('0x%08x' % unpack('!L', pack('!f', event.message.body))[0]) - elif self.amqp_type == 'double': - self.received_value_list.append('0x%016x' % unpack('!Q', pack('!d', event.message.body))[0]) - elif self.amqp_type == 'decimal128': - self.received_value_list.append(event.message.body.encode('hex')) - elif self.amqp_type == 'char' or \ - self.amqp_type == 'binary' or \ - self.amqp_type == 'string' or \ - self.amqp_type == 'symbol': - self.received_value_list.append(event.message.body) - elif self.amqp_type == 'list' or \ - self.amqp_type == 'map': - self.received_value_list.append(event.message.body) - else: - print 'receive: Unsupported AMQP type "%s"' % self.amqp_type - return - self.received += 1 - if self.received == self.expected: - event.receiver.close() - event.connection.close() - -# --- main --- -# Args: 1: Broker address (ip-addr:port) -# 2: Queue name -# 3: AMQP type -# 4: Expected number of test values to receive -try: - rcv = Receiver('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], sys.argv[4]) - Container(rcv).run() - print sys.argv[3] - for val in rcv.get_received_value_list(): - print val -except KeyboardInterrupt: - pass -except Exception as e: - print 'proton-python-receive EXCEPTION:', e - print format_exc() http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/899a9a86/shims/qpid-proton-python/src/amqp-send ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/amqp-send b/shims/qpid-proton-python/src/amqp-send deleted file mode 100755 index 288243f..0000000 --- a/shims/qpid-proton-python/src/amqp-send +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Issues: -# * Capturing errors from client or broker - -import sys -from ast import literal_eval -from proton import byte, char, decimal32, decimal64, decimal128, float32, int32, Message, short, symbol, timestamp, ubyte, uint, ulong, ushort -from proton.handlers import MessagingHandler -from proton.reactor import Container -from shim_utils import StrToObj -from struct import unpack -from traceback import format_exc -from uuid import UUID - -class Sender(MessagingHandler): - def __init__(self, url, amqp_type, test_value_list): - super(Sender, self).__init__() - self.url = url - self.amqp_type = amqp_type - self.test_value_list = test_value_list - self.sent = 0 - self.confirmed = 0 - self.total = len(test_value_list) - - def on_start(self, event): - event.container.create_sender(self.url) - - def on_sendable(self, event): - if self.sent == 0: - for test_value in self.test_value_list: - if event.sender.credit: - message = self.create_message(test_value) - if message is not None: - event.sender.send(message) - self.sent += 1 - else: - event.connection.close() - return - - def create_message(self, test_value): - # Non-string types using literal_eval - if self.amqp_type == 'null': - return Message(id=(self.sent+1), body=None) - elif self.amqp_type == 'boolean': - return Message(id=(self.sent+1), body=True if test_value == 'True' else False) - elif self.amqp_type == 'ubyte': - return Message(id=(self.sent+1), body=ubyte(literal_eval(test_value))) - elif self.amqp_type == 'ushort': - return Message(id=(self.sent+1), body=ushort(literal_eval(test_value))) - elif self.amqp_type == 'uint': - return Message(id=(self.sent+1), body=uint(literal_eval(test_value))) - elif self.amqp_type == 'ulong': - return Message(id=(self.sent+1), body=ulong(literal_eval(test_value))) - elif self.amqp_type == 'byte': - return Message(id=(self.sent+1), body=byte(literal_eval(test_value))) - elif self.amqp_type == 'short': - return Message(id=(self.sent+1), body=short(literal_eval(test_value))) - elif self.amqp_type == 'int': - return Message(id=(self.sent+1), body=int32(literal_eval(test_value))) - elif self.amqp_type == 'long': - return Message(id=(self.sent+1), body=long(literal_eval(test_value))) - elif self.amqp_type == 'float': - return Message(id=(self.sent+1), body=float32(unpack('!f', test_value[2:].decode('hex'))[0])) - elif self.amqp_type == 'double': - return Message(id=(self.sent+1), body=unpack('!d', test_value[2:].decode('hex'))[0]) - elif self.amqp_type == 'decimal32': - return Message(id=(self.sent+1), body=decimal32(literal_eval(test_value))) - elif self.amqp_type == 'decimal64': - return Message(id=(self.sent+1), body=decimal64(literal_eval(test_value))) - elif self.amqp_type == 'decimal128': - return Message(id=(self.sent+1), body=decimal128(literal_eval(test_value))) - elif self.amqp_type == 'char': - return Message(id=(self.sent+1), body=char(test_value)) - elif self.amqp_type == 'timestamp': - return Message(id=(self.sent+1), body=timestamp(literal_eval(test_value))) - elif self.amqp_type == 'uuid': - return Message(id=(self.sent+1), body=UUID(test_value)) - elif self.amqp_type == 'binary': - return Message(id=(self.sent+1), body=bytes(test_value)) - elif self.amqp_type == 'string': - return Message(id=(self.sent+1), body=unicode(test_value)) - elif self.amqp_type == 'symbol': - return Message(id=(self.sent+1), body=symbol(test_value)) - elif self.amqp_type == 'list': - return Message(id=(self.sent+1), body=StrToObj(list(test_value).__iter__()).run()) - elif self.amqp_type == 'map': - return Message(id=(self.sent+1), body=StrToObj(list(test_value).__iter__()).run()) - else: - print 'send: Unsupported AMQP type "%s"' % self.amqp_type - return None - - def on_accepted(self, event): - self.confirmed += 1 - if self.confirmed == self.total: - event.connection.close() - - def on_disconnected(self, event): - self.sent = self.confirmed - - @staticmethod - def _map_string_to_map(str_list): - return {} - -# --- main --- -# Args: 1: Broker address (ip-addr:port) -# 2: Queue name -# 3: AMQP type -# 4...n: Test value(s) as strings -try: - Container(Sender('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], sys.argv[4:])).run() -except KeyboardInterrupt: - pass -except Exception as e: - print 'proton-python-send EXCEPTION:', e - print format_exc() - \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/899a9a86/shims/qpid-proton-python/src/jms-receiver-shim.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/jms-receiver-shim.py b/shims/qpid-proton-python/src/jms-receiver-shim.py deleted file mode 100755 index 9091251..0000000 --- a/shims/qpid-proton-python/src/jms-receiver-shim.py +++ /dev/null @@ -1,234 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import sys -from interop_test_errors import InteropTestError -from json import dumps, loads -from proton.handlers import MessagingHandler -from proton.reactor import Container -from struct import pack, unpack -from subprocess import check_output -from traceback import format_exc - -class JmsReceiverShim(MessagingHandler): - def __init__(self, url, jms_msg_type, expected_msg_map): - super(JmsReceiverShim, self).__init__() - self.url = url - self.jms_msg_type = jms_msg_type - self.expteced_msg_map = expected_msg_map - self.subtype_itr = iter(sorted(self.expteced_msg_map.keys())) - self.expected = self._get_tot_num_messages() - self.received = 0 - self.received_value_map = {} - self.current_subtype = None - self.current_subtype_msg_list = None - - def get_received_value_map(self): - return self.received_value_map - - def on_start(self, event): - event.container.create_receiver(self.url) - - def on_message(self, event): - if event.message.id and event.message.id < self.received: - return # ignore duplicate message - if self.expected == 0 or self.received < self.expected: - if self.current_subtype is None: - self.current_subtype = self.subtype_itr.next() - self.current_subtype_msg_list = [] - self.current_subtype_msg_list.append(self._handle_message(event.message)) - if len(self.current_subtype_msg_list) >= self.expteced_msg_map[self.current_subtype]: - self.received_value_map[self.current_subtype] = self.current_subtype_msg_list - self.current_subtype = None - self.current_subtype_msg_list = [] - self.received += 1 - if self.received == self.expected: - event.receiver.close() - event.connection.close() - - def _handle_message(self, message): - if self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE': - return self._receive_jms_bytesmessage(message) - if self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE': - return self._recieve_jms_mapmessage(message) - if self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE': - return self._recieve_jms_objectmessage(message) - if self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE': - return self._receive_jms_streammessage(message) - if self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE': - return self._receive_jms_textmessage(message) - print 'jms-receive: Unsupported JMS message type "%s"' % self.jms_msg_type - return None - - def _get_tot_num_messages(self): - total = 0 - for key in self.expteced_msg_map: - total += int(self.expteced_msg_map[key]) - return total - - def _receive_jms_bytesmessage(self, message): - assert self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE' - if self.current_subtype == 'boolean': - if message.body == b'\x00': - return 'False' - if message.body == b'\x01': - return 'True' - raise InteropTestError('_receive_jms_bytesmessage: Invalid encoding for subtype boolean: %s' % - str(message.body)) - if self.current_subtype == 'byte': - return hex(unpack('b', message.body)[0]) - if self.current_subtype == 'bytes': - return str(message.body) - if self.current_subtype == 'char': - if len(message.body) == 2: # format 'a' or '\xNN' - return str(message.body[1]) # strip leading '\x00' char - raise InteropTestError('Unexpected strring length for type char: %d' % len(message.body)) - if self.current_subtype == 'double': - return '0x%016x' % unpack('!Q', message.body)[0] - if self.current_subtype == 'float': - return '0x%08x' % unpack('!L', message.body)[0] - if self.current_subtype == 'int': - return hex(unpack('!i', message.body)[0]) - if self.current_subtype == 'long': - return hex(unpack('!q', message.body)[0]) - if self.current_subtype == 'short': - return hex(unpack('!h', message.body)[0]) - if self.current_subtype == 'string': - # NOTE: first 2 bytes are string length, must be present - if len(message.body) >= 2: - str_len = unpack('!H', message.body[:2])[0] - str_body = str(message.body[2:]) - if len(str_body) != str_len: - raise InteropTestError('String length mismatch: size=%d, but len(\'%s\')=%d' % - (str_len, str_body, len(str_body))) - return str_body - else: - raise InteropTestError('Malformed string binary: len(\'%s\')=%d' % - (repr(message.body), len(message.body))) - raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' % - (self.jms_msg_type, self.current_subtype)) - - def _recieve_jms_mapmessage(self, message): - assert self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE' - key, value = message.body.items()[0] - assert key[:-3] == self.current_subtype - if self.current_subtype == 'boolean': - return str(value) - if self.current_subtype == 'byte': - return hex(value) - if self.current_subtype == 'bytes': - return str(value) - if self.current_subtype == 'char': - return str(value) - if self.current_subtype == 'double': - return '0x%016x' % unpack('!Q', pack('!d', value))[0] - if self.current_subtype == 'float': - return '0x%08x' % unpack('!L', pack('!f', value))[0] - if self.current_subtype == 'int': - return hex(value) - if self.current_subtype == 'long': - return hex(int(value)) - if self.current_subtype == 'short': - return hex(value) - if self.current_subtype == 'string': - return str(value) - raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' % - (self.jms_msg_type, self.current_subtype)) - - def _recieve_jms_objectmessage(self, message): - assert self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE' - return self._get_java_obj(message.body) - - def _get_java_obj(self, java_obj_bytes): - ''' - Take bytes from serialized Java object and construct a Java object, then return its toString() value. The - work of 'translating' the bytes to a Java object and obtaining its class and value is done in a Java - utility org.apache.qpid.interop_test.obj_util.BytesToJavaObj located in jar JavaObjUtils.jar. - java_obj_bytes: hex string representation of bytes from Java object (eg 'aced00057372...') - returns: string containing Java class value as returned by the toString() method - ''' - java_obj_bytes_str = ''.join(["%02x" % ord(x) for x in java_obj_bytes]).strip() - out_str = check_output(['java', - '-cp', - 'target/JavaObjUtils.jar', - 'org.apache.qpid.interop_test.obj_util.BytesToJavaObj', - java_obj_bytes_str]) - out_str_list = out_str.split('\n')[:-1] # remove trailing \n - if len(out_str_list) > 1: - raise InteropTestError('Unexpected return from JavaObjUtils: %s' % out_str) - colon_index = out_str_list[0].index(':') - if colon_index < 0: - raise InteropTestError('Unexpected format from JavaObjUtils: %s' % out_str) - java_class_name = out_str_list[0][:colon_index] - java_class_value_str = out_str_list[0][colon_index+1:] - if java_class_name != self.current_subtype: - raise InteropTestError('Unexpected class name from JavaObjUtils: expected %s, recieved %s' % - (self.current_subtype, java_class_name)) - return java_class_value_str - - def _receive_jms_streammessage(self, message): - assert self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE' - # Every message is a list with one item [value] - assert len(message.body) == 1 - value = message.body[0] - if self.current_subtype == 'boolean': - return str(value) - if self.current_subtype == 'byte': - return hex(value) - if self.current_subtype == 'bytes': - return str(value) - if self.current_subtype == 'char': - return str(value) - if self.current_subtype == 'double': - return '0x%016x' % unpack('!Q', pack('!d', value))[0] - if self.current_subtype == 'float': - return '0x%08x' % unpack('!L', pack('!f', value))[0] - if self.current_subtype == 'int': - return hex(value) - if self.current_subtype == 'long': - return hex(int(value)) - if self.current_subtype == 'short': - return hex(value) - if self.current_subtype == 'string': - return str(value) - raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' % - (self.jms_msg_type, self.current_subtype)) - - def _receive_jms_textmessage(self, message): - assert self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE' - return message.body - - -# --- main --- -# Args: 1: Broker address (ip-addr:port) -# 2: Queue name -# 3: JMS message type -# 4: JSON string of map containing number of test values to receive for each type/subtype -#print '#### sys.argv=%s' % sys.argv -try: - RECEIVER = JmsReceiverShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], loads(sys.argv[4])) - Container(RECEIVER).run() - print sys.argv[3] - print dumps(RECEIVER.get_received_value_map()) -except KeyboardInterrupt: - pass -except Exception as exc: - print 'jms-receiver-shim EXCEPTION:', exc - print format_exc() http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/899a9a86/shims/qpid-proton-python/src/jms-sender-shim.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/jms-sender-shim.py b/shims/qpid-proton-python/src/jms-sender-shim.py deleted file mode 100755 index d78e52b..0000000 --- a/shims/qpid-proton-python/src/jms-sender-shim.py +++ /dev/null @@ -1,241 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import sys -from json import loads -from proton import byte, char, float32, int32, Message, short, symbol -from proton.handlers import MessagingHandler -from proton.reactor import Container -from interop_test_errors import InteropTestError -from subprocess import check_output -from struct import pack, unpack -from traceback import format_exc - -# These values must tie in with the Qpid-JMS client values found in -# org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport -QPID_JMS_TYPE_ANNOTATION_NAME = symbol(u'x-opt-jms-msg-type') -QPID_JMS_TYPE_ANNOTATIONS = { - 'JMS_BYTESMESSAGE_TYPE': byte(3), - 'JMS_MAPMESSAGE_TYPE': byte(2), - 'JMS_OBJECTMESSAGE_TYPE': byte(1), - 'JMS_STREAMMESSAGE_TYPE': byte(4), - 'JMS_TEXTMESSAGE_TYPE': byte(5) - } -def create_annotation(jms_msg_type): - return {QPID_JMS_TYPE_ANNOTATION_NAME: QPID_JMS_TYPE_ANNOTATIONS[jms_msg_type]} - -class JmsSenderShim(MessagingHandler): - def __init__(self, url, jms_msg_type, test_value_map): - super(JmsSenderShim, self).__init__() - self.url = url - self.jms_msg_type = jms_msg_type - self.test_value_map = test_value_map - self.sent = 0 - self.confirmed = 0 - self.total = self._get_total_num_msgs() - - def on_start(self, event): - event.container.create_sender(self.url) - - def on_sendable(self, event): - if self.sent == 0: - # These types expect a test_values Python string representation of a map: '{type:[val, val, val], ...}' - for sub_type in sorted(self.test_value_map.keys()): - if self._send_test_values(event, sub_type, self.test_value_map[sub_type]): - return - - def _get_total_num_msgs(self): - total = 0 - for key in self.test_value_map.keys(): - total += len(self.test_value_map[key]) - return total - - def _send_test_values(self, event, test_value_type, test_values): - value_num = 0 - for test_value in test_values: - if event.sender.credit: - message = self._create_message(test_value_type, test_value, value_num) - if message is not None: - event.sender.send(message) - self.sent += 1 - value_num += 1 - else: - event.connection.close() - return True - return False - - # TODO: Change this to return a list of messages. That way each test can return more than one message - def _create_message(self, test_value_type, test_value, value_num): - if self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE': - return self._create_jms_bytesmessage(test_value_type, test_value) - elif self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE': - return self._create_jms_mapmessage(test_value_type, test_value, "%s%03d" % (test_value_type, value_num)) - elif self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE': - return self._create_jms_objectmessage('%s:%s' % (test_value_type, test_value)) - elif self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE': - return self._create_jms_streammessage(test_value_type, test_value) - elif self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE': - return self._create_jms_textmessage(test_value) - else: - print 'jms-send: Unsupported JMS message type "%s"' % self.jms_msg_type - return None - - def _create_jms_bytesmessage(self, test_value_type, test_value): - # NOTE: test_value contains all unicode strings u'...' as returned by json - body_bytes = None - if test_value_type == 'boolean': - body_bytes = b'\x01' if test_value == 'True' else b'\x00' - elif test_value_type == 'byte': - body_bytes = pack('b', int(test_value, 16)) - elif test_value_type == 'bytes': - body_bytes = str(test_value) # remove unicode - elif test_value_type == 'char': - # JMS expects two-byte chars, ASCII chars can be prefixed with '\x00' - body_bytes = '\x00' + str(test_value) # remove unicode - elif test_value_type == 'double' or test_value_type == 'float': - body_bytes = test_value[2:].decode('hex') - elif test_value_type == 'int': - body_bytes = pack('!i', int(test_value, 16)) - elif test_value_type == 'long': - body_bytes = pack('!q', long(test_value, 16)) - elif test_value_type == 'short': - body_bytes = pack('!h', short(test_value, 16)) - elif test_value_type == 'string': - # NOTE: First two bytes must be string length - test_value_str = str(test_value) # remove unicode - body_bytes = pack('!H', len(test_value_str)) + test_value_str - else: - raise InteropTestError('JmsSenderShim._create_jms_bytesmessage: Unknown or unsupported subtype "%s"' % - test_value_type) - return Message(id=(self.sent+1), - body=body_bytes, - inferred=True, - content_type='application/octet-stream', - annotations=create_annotation('JMS_BYTESMESSAGE_TYPE')) - - def _create_jms_mapmessage(self, test_value_type, test_value, name): - if test_value_type == 'boolean': - value = test_value == 'True' - elif test_value_type == 'byte': - value = byte(int(test_value, 16)) - elif test_value_type == 'bytes': - value = str(test_value) # remove unicode - elif test_value_type == 'char': - value = char(test_value) - elif test_value_type == 'double': - value = unpack('!d', test_value[2:].decode('hex'))[0] - elif test_value_type == 'float': - value = float32(unpack('!f', test_value[2:].decode('hex'))[0]) - elif test_value_type == 'int': - value = int32(int(test_value, 16)) - elif test_value_type == 'long': - value = int(test_value, 16) - elif test_value_type == 'short': - value = short(int(test_value, 16)) - elif test_value_type == 'string': - value = test_value - else: - raise InteropTestError('JmsSenderShim._create_jms_mapmessage: Unknown or unsupported subtype "%s"' % - test_value_type) - return Message(id=(self.sent+1), - body={name: value}, - inferred=False, - annotations=create_annotation('JMS_MAPMESSAGE_TYPE')) - - def _create_jms_objectmessage(self, test_value): - java_binary = self._get_java_obj_binary(test_value) - return Message(id=(self.sent+1), - body=java_binary, - inferred=True, - content_type='application/x-java-serialized-object', - annotations=create_annotation('JMS_OBJECTMESSAGE_TYPE')) - - @staticmethod - def _get_java_obj_binary(java_class_str): - out_str = check_output(['java', - '-cp', - 'target/JavaObjUtils.jar', - 'org.apache.qpid.interop_test.obj_util.JavaObjToBytes', - java_class_str]) - out_str_list = out_str.split('\n')[:-1] # remove trailing \n - if out_str_list[0] != java_class_str: - raise InteropTestError('JmsSenderShim._get_java_obj_binary(): Call to JavaObjToBytes failed\n%s' % out_str) - return out_str_list[1].decode('hex') - - def _create_jms_streammessage(self, test_value_type, test_value): - if test_value_type == 'boolean': - body_list = [test_value == 'True'] - elif test_value_type == 'byte': - body_list = [byte(int(test_value, 16))] - elif test_value_type == 'bytes': - body_list = [str(test_value)] - elif test_value_type == 'char': - body_list = [char(test_value)] - elif test_value_type == 'double': - body_list = [unpack('!d', test_value[2:].decode('hex'))[0]] - elif test_value_type == 'float': - body_list = [float32(unpack('!f', test_value[2:].decode('hex'))[0])] - elif test_value_type == 'int': - body_list = [int32(int(test_value, 16))] - elif test_value_type == 'long': - body_list = [int(test_value, 16)] - elif test_value_type == 'short': - body_list = [short(int(test_value, 16))] - elif test_value_type == 'string': - body_list = [test_value] - else: - raise InteropTestError('JmsSenderShim._create_jms_streammessage: Unknown or unsupported subtype "%s"' % - test_value_type) - return Message(id=(self.sent+1), - body=body_list, - inferred=True, - annotations=create_annotation('JMS_STREAMMESSAGE_TYPE')) - - def _create_jms_textmessage(self, test_value_text): - return Message(id=(self.sent+1), - body=unicode(test_value_text), - annotations=create_annotation('JMS_TEXTMESSAGE_TYPE')) - - def on_accepted(self, event): - self.confirmed += 1 - if self.confirmed == self.total: - event.connection.close() - - def on_disconnected(self, event): - self.sent = self.confirmed - -# @staticmethod -# def _map_string_to_map(str_list): -# return {} - -# --- main --- -# Args: 1: Broker address (ip-addr:port) -# 2: Queue name -# 3: JMS message type -# 4: Test value(s) as JSON string -#print '#### sys.argv=%s' % sys.argv -#print '>>> test_values=%s' % loads(sys.argv[4]) -try: - Container(JmsSenderShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], loads(sys.argv[4]))).run() -except KeyboardInterrupt: - pass -except Exception as exc: - print 'jms-sender-shim EXCEPTION:', exc - print format_exc() http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/899a9a86/src/py/qpid-interop-test/jms/jms_message_tests.py ---------------------------------------------------------------------- diff --git a/src/py/qpid-interop-test/jms/jms_message_tests.py b/src/py/qpid-interop-test/jms/jms_message_tests.py index a8b9fba..1e53145 100755 --- a/src/py/qpid-interop-test/jms/jms_message_tests.py +++ b/src/py/qpid-interop-test/jms/jms_message_tests.py @@ -43,11 +43,21 @@ class JmsMessageTypes(object): # types defined here are understood to be *Java* types and the stringified values are to be interpreted # as the appropriate Java type by the send shim. TYPE_SUBMAP = { - 'boolean': ['True', 'False'], - 'byte': ['-0x80', '-0x1', '0x0', '0x7f'], - 'bytes': [b'', b'12345', b'Hello, world', b'\\x01\\x02\\x03\\x04\\x05abcde\\x80\\x81\\xfe\\xff'], - #b'The quick brown fox jumped over the lazy dog 0123456789.' * 100], - 'char': ['a', 'Z', '\x01', '\x7f'], + 'boolean': ['True', + 'False'], + 'byte': ['-0x80', + '-0x1', + '0x0', + '0x7f'], + 'bytes': [b'', + b'12345', + b'Hello, world', + b'\\x01\\x02\\x03\\x04\\x05abcde\\x80\\x81\\xfe\\xff', + b'The quick brown fox jumped over the lazy dog 0123456789.' * 100], + 'char': ['a', + 'Z', + '\x01', + '\x7f'], 'double': ['0x0000000000000000', # 0.0 '0x8000000000000000', # -0.0 '0x400921fb54442eea', # pi (3.14159265359) positive decimal @@ -76,28 +86,74 @@ class JmsMessageTypes(object): '0x7f800000', # +Infinity '0xff800000', # -Infinity '0x7fc00000'], # +NaN - 'int': ['-0x80000000', '-0x81', '-0x80', '-0x1', '0x0', '0x7f', '0x80', '0x7fffffff'], - 'long': ['-0x8000000000000000', '-0x81', '-0x80', '-0x1', '0x0', '0x7f', '0x80', '0x7fffffffffffffff'], - 'short': ['-0x8000', '-0x1', '0x0', '0x7fff'], + 'int': ['-0x80000000', + '-0x81', + '-0x80', + '-0x1', + '0x0', + '0x7f', + '0x80', + '0x7fffffff'], + 'long': ['-0x8000000000000000', + '-0x81', + '-0x80', + '-0x1', + '0x0', + '0x7f', + '0x80', + '0x7fffffffffffffff'], + 'short': ['-0x8000', + '-0x1', + '0x0', + '0x7fff'], 'string': ['', 'Hello, world', '"Hello, world"', "Charlie's \"peach\"", - 'Charlie\'s "peach"'], - #'The quick brown fox jumped over the lazy dog 0123456789.' * 100] + 'Charlie\'s "peach"', + 'The quick brown fox jumped over the lazy dog 0123456789.' * 100] } TYPE_MAP = { 'JMS_BYTESMESSAGE_TYPE': TYPE_SUBMAP, 'JMS_MAPMESSAGE_TYPE': TYPE_SUBMAP, 'JMS_OBJECTMESSAGE_TYPE': { - 'java.lang.Boolean': ['true', 'false'], - 'java.lang.Byte': ['-128', '0', '127'], - 'java.lang.Character': [u'a', u'Z'], - 'java.lang.Double': ['0.0', '3.141592654', '-2.71828182846'], - 'java.lang.Float': ['0.0', '3.14159', '-2.71828'], - 'java.lang.Integer': ['-2147483648', '-129', '-128', '-1', '0', '127', '128', '2147483647'], - 'java.lang.Long' : ['-9223372036854775808', '-129', '-128', '-1', '0', '127', '128', '9223372036854775807'], - 'java.lang.Short': ['-32768', '-129', '-128', '-1', '0', '127', '128', '32767'], + 'java.lang.Boolean': ['true', + 'false'], + 'java.lang.Byte': ['-128', + '0', + '127'], + 'java.lang.Character': [u'a', + u'Z'], + 'java.lang.Double': ['0.0', + '3.141592654', + '-2.71828182846'], + 'java.lang.Float': ['0.0', + '3.14159', + '-2.71828'], + 'java.lang.Integer': ['-2147483648', + '-129', + '-128', + '-1', + '0', + '127', + '128', + '2147483647'], + 'java.lang.Long' : ['-9223372036854775808', + '-129', + '-128', + '-1', + '0', + '127', + '128', + '9223372036854775807'], + 'java.lang.Short': ['-32768', + '-129', + '-128', + '-1', + '0', + '127', + '128', + '32767'], 'java.lang.String': [u'', u'Hello, world', u'"Hello, world"', @@ -109,8 +165,8 @@ class JmsMessageTypes(object): 'Hello, world', '"Hello, world"', "Charlie's \"peach\"", - 'Charlie\'s "peach"'],} - #'The quick brown fox jumped over the lazy dog 0123456789.' * 100]} + 'Charlie\'s "peach"', + 'The quick brown fox jumped over the lazy dog 0123456789.' * 100]} } @staticmethod @@ -139,15 +195,13 @@ class JmsMessageTypeTestCase(unittest.TestCase): if len(test_values) > 0: queue_name = 'qpid-interop.jms_message_type_tests.%s.%s.%s' % (jms_message_type, send_shim.NAME, receive_shim.NAME) - json_test_values_str = dumps(test_values) - send_error_text = send_shim.send(broker_addr, queue_name, jms_message_type, json_test_values_str) + send_error_text = send_shim.send(broker_addr, queue_name, jms_message_type, dumps(test_values)) if len(send_error_text) > 0: self.fail('Send shim \'%s\':\n%s' % (send_shim.NAME, send_error_text)) num_test_values = {} for index in test_values.keys(): num_test_values[index] = len(test_values[index]) - json_test_num_values_str = dumps(num_test_values) - receive_text = receive_shim.receive(broker_addr, queue_name, jms_message_type, json_test_num_values_str) + receive_text = receive_shim.receive(broker_addr, queue_name, jms_message_type, dumps(num_test_values)) if isinstance(receive_text, str): self.fail(receive_text) else: @@ -173,8 +227,8 @@ def create_testcase_class(broker_addr, jms_message_type, test_values, shim_produ self.run_test(self.broker_addr, self.jms_message_type, self.test_values, send_shim, receive_shim) inner_test_method.__name__ = 'test_%s_%s->%s' % (jms_message_type[4:-5], send_shim.NAME, receive_shim.NAME) - #inner_test_method.__doc__ = 'JMS message type \'%s\' interop test: %s -> %s' % \ - # (jms_message_type, send_shim.NAME, receive_shim.NAME) + inner_test_method.__doc__ = 'JMS message type \'%s\' interop test: %s -> %s' % \ + (jms_message_type, send_shim.NAME, receive_shim.NAME) setattr(cls, inner_test_method.__name__, inner_test_method) class_name = jms_message_type[4:-5].title() + 'TestCase' @@ -227,8 +281,7 @@ class Shim(object): try: arg_list = [] arg_list.extend(self.RECEIVE) - arg_list.extend([broker_addr, queue_name, jms_message_type]) - arg_list.append(json_test_num_values_str) + arg_list.extend([broker_addr, queue_name, jms_message_type, json_test_num_values_str]) #print '\n>>>', arg_list # DEBUG - useful to see command-line sent to shim output = check_output(arg_list) #print '<<<', output # DEBUG- useful to see text received from shim @@ -249,8 +302,8 @@ class ProtonPythonShim(Shim): """ NAME = 'ProtonPython' SHIM_LOC = path.join(QPID_INTEROP_TEST_HOME, 'shims', 'qpid-proton-python', 'src') - SEND = [path.join(SHIM_LOC, 'jms-sender-shim.py')] - RECEIVE = [path.join(SHIM_LOC, 'jms-receiver-shim.py')] + SEND = [path.join(SHIM_LOC, 'JmsSenderShim.py')] + RECEIVE = [path.join(SHIM_LOC, 'JmsReceiverShim.py')] class QpidJmsShim(Shim): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
