http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/JmsSenderShim.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/JmsSenderShim.py b/shims/qpid-proton-python/src/JmsSenderShim.py deleted file mode 100755 index 5a1108a..0000000 --- a/shims/qpid-proton-python/src/JmsSenderShim.py +++ /dev/null @@ -1,390 +0,0 @@ -#!/usr/bin/env python - -""" -JMS sender shim for qpid-interop-test -""" - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import sys -from json import loads -from proton import byte, char, float32, int32, Message, short, symbol -from proton.handlers import MessagingHandler -from proton.reactor import Container -from interop_test_errors import InteropTestError -from subprocess import check_output -from struct import pack, unpack -from traceback import format_exc - -# These values must tie in with the Qpid-JMS client values found in -# org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport -QPID_JMS_TYPE_ANNOTATION_NAME = symbol(u'x-opt-jms-msg-type') -QPID_JMS_TYPE_ANNOTATIONS = { - 'JMS_MESSAGE_TYPE': byte(0), - 'JMS_BYTESMESSAGE_TYPE': byte(3), - 'JMS_MAPMESSAGE_TYPE': byte(2), - 'JMS_OBJECTMESSAGE_TYPE': byte(1), - 'JMS_STREAMMESSAGE_TYPE': byte(4), - 'JMS_TEXTMESSAGE_TYPE': byte(5) - } -def create_annotation(jms_msg_type): - """Function which creates a message annotation for JMS message type as used by the Qpid JMS client""" - return {QPID_JMS_TYPE_ANNOTATION_NAME: QPID_JMS_TYPE_ANNOTATIONS[jms_msg_type]} - -class JmsSenderShim(MessagingHandler): - """ - This shim sends JMS messages of a particular JMS message type according to the test parameters list. This list - contains three maps: - 0: The test value map, which contains test value types as keys, and lists of values of that type; - 1. The test headers map, which contains the JMS headers as keys and a submap conatining types and values; - 2. The test proprties map, which contains the name of the properties as keys, and a submap containing types - and values - This shim takes the combinations of the above map and creates test cases, each of which sends a single message - with (or without) JMS headers and properties. - """ - def __init__(self, broker_ip_addr, queue_name, jms_msg_type, test_parameters_list): - super(JmsSenderShim, self).__init__() - self.broker_ip_addr = broker_ip_addr - self.queue_name = queue_name - self.jms_msg_type = jms_msg_type - self.test_value_map = test_parameters_list[0] - self.test_headers_map = test_parameters_list[1] - self.test_properties_map = test_parameters_list[2] - self.sent = 0 - self.confirmed = 0 - self.total = self._get_total_num_msgs() - - def on_start(self, event): - """Event callback for when the client starts""" - event.container.create_sender('%s/%s' % (self.broker_ip_addr, self.queue_name)) - - def on_sendable(self, event): - """Event callback for when send credit is received, allowing the sending of messages""" - if self.sent == 0: - # These types expect a test_values Python string representation of a map: '{type:[val, val, val], ...}' - for sub_type in sorted(self.test_value_map.keys()): - if self._send_test_values(event, sub_type, self.test_value_map[sub_type]): - return - - def on_connection_error(self, event): - print 'JmsSenderShim.on_connection_error' - - def on_session_error(self, event): - print 'JmsSenderShim.on_session_error' - - def on_link_error(self, event): - print 'JmsSenderShim.on_link_error' - - def on_accepted(self, event): - """Event callback for when a sent message is accepted by the broker""" - self.confirmed += 1 - if self.confirmed == self.total: - event.connection.close() - - def on_disconnected(self, event): - """Event callback for when the broker disconnects with the client""" - self.sent = self.confirmed - - def _get_total_num_msgs(self): - """ - Calculates the total number of messages to be sent based on the message parameters received on the command-line - """ - total = 0 - for key in self.test_value_map.keys(): - total += len(self.test_value_map[key]) - return total - - def _send_test_values(self, event, test_value_type, test_values): - """Method which loops through recieved parameters and sends the corresponding messages""" - value_num = 0 - for test_value in test_values: - if event.sender.credit: - message = self._create_message(test_value_type, test_value, value_num) - # TODO: set message to address - if message is not None: - self._add_jms_message_headers(message) - self._add_jms_message_properties(message) - event.sender.send(message) - self.sent += 1 - value_num += 1 - else: - event.connection.close() - return True - return False - - # TODO: Change this to return a list of messages. That way each test can return more than one message - def _create_message(self, test_value_type, test_value, value_num): - """Create a single message of the appropriate JMS message type""" - if self.jms_msg_type == 'JMS_MESSAGE_TYPE': - return self._create_jms_message(test_value_type, test_value) - elif self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE': - return self._create_jms_bytesmessage(test_value_type, test_value) - elif self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE': - return self._create_jms_mapmessage(test_value_type, test_value, "%s%03d" % (test_value_type, value_num)) - elif self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE': - return self._create_jms_objectmessage('%s:%s' % (test_value_type, test_value)) - elif self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE': - return self._create_jms_streammessage(test_value_type, test_value) - elif self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE': - return self._create_jms_textmessage(test_value) - else: - print 'jms-send: Unsupported JMS message type "%s"' % self.jms_msg_type - return None - - def _create_jms_message(self, test_value_type, test_value): - """Create a JMS message type (without message body)""" - if test_value_type != 'none': - raise InteropTestError('JmsSenderShim._create_jms_message: Unknown or unsupported subtype "%s"' % - test_value_type) - if test_value is not None: - raise InteropTestError('JmsSenderShim._create_jms_message: Invalid value "%s" for subtype "%s"' % - (test_value, test_value_type)) - return Message(id=(self.sent+1), - content_type='application/octet-stream', - annotations=create_annotation('JMS_MESSAGE_TYPE')) - - def _create_jms_bytesmessage(self, test_value_type, test_value): - """Create a JMS bytes message""" - # NOTE: test_value contains all unicode strings u'...' as returned by json - body_bytes = None - if test_value_type == 'boolean': - body_bytes = b'\x01' if test_value == 'True' else b'\x00' - elif test_value_type == 'byte': - body_bytes = pack('b', int(test_value, 16)) - elif test_value_type == 'bytes': - body_bytes = str(test_value) # remove unicode - elif test_value_type == 'char': - # JMS expects two-byte chars, ASCII chars can be prefixed with '\x00' - body_bytes = '\x00' + str(test_value) # remove unicode - elif test_value_type == 'double' or test_value_type == 'float': - body_bytes = test_value[2:].decode('hex') - elif test_value_type == 'int': - body_bytes = pack('!i', int(test_value, 16)) - elif test_value_type == 'long': - body_bytes = pack('!q', long(test_value, 16)) - elif test_value_type == 'short': - body_bytes = pack('!h', short(test_value, 16)) - elif test_value_type == 'string': - # NOTE: First two bytes must be string length - test_value_str = str(test_value) # remove unicode - body_bytes = pack('!H', len(test_value_str)) + test_value_str - else: - raise InteropTestError('JmsSenderShim._create_jms_bytesmessage: Unknown or unsupported subtype "%s"' % - test_value_type) - return Message(id=(self.sent+1), - body=body_bytes, - inferred=True, - content_type='application/octet-stream', - annotations=create_annotation('JMS_BYTESMESSAGE_TYPE')) - - def _create_jms_mapmessage(self, test_value_type, test_value, name): - """Create a JMS map message""" - if test_value_type == 'boolean': - value = test_value == 'True' - elif test_value_type == 'byte': - value = byte(int(test_value, 16)) - elif test_value_type == 'bytes': - value = str(test_value) # remove unicode - elif test_value_type == 'char': - value = char(test_value) - elif test_value_type == 'double': - value = unpack('!d', test_value[2:].decode('hex'))[0] - elif test_value_type == 'float': - value = float32(unpack('!f', test_value[2:].decode('hex'))[0]) - elif test_value_type == 'int': - value = int32(int(test_value, 16)) - elif test_value_type == 'long': - value = long(test_value, 16) - elif test_value_type == 'short': - value = short(int(test_value, 16)) - elif test_value_type == 'string': - value = test_value - else: - raise InteropTestError('JmsSenderShim._create_jms_mapmessage: Unknown or unsupported subtype "%s"' % - test_value_type) - return Message(id=(self.sent+1), - body={name: value}, - inferred=False, - annotations=create_annotation('JMS_MAPMESSAGE_TYPE')) - - def _create_jms_objectmessage(self, test_value): - """Create a JMS object message""" - java_binary = self._s_get_java_obj_binary(test_value) - return Message(id=(self.sent+1), - body=java_binary, - inferred=True, - content_type='application/x-java-serialized-object', - annotations=create_annotation('JMS_OBJECTMESSAGE_TYPE')) - - @staticmethod - def _s_get_java_obj_binary(java_class_str): - """Call external utility to create Java object and stringify it, returning the string representation""" - out_str = check_output(['java', - '-cp', - 'target/JavaObjUtils.jar', - 'org.apache.qpid.interop_test.obj_util.JavaObjToBytes', - java_class_str]) - out_str_list = out_str.split('\n')[:-1] # remove trailing \n - if out_str_list[0] != java_class_str: - raise InteropTestError('JmsSenderShim._s_get_java_obj_binary(): Call to JavaObjToBytes failed\n%s' % - out_str) - return out_str_list[1].decode('hex') - - def _create_jms_streammessage(self, test_value_type, test_value): - """Create a JMS stream message""" - if test_value_type == 'boolean': - body_list = [test_value == 'True'] - elif test_value_type == 'byte': - body_list = [byte(int(test_value, 16))] - elif test_value_type == 'bytes': - body_list = [str(test_value)] - elif test_value_type == 'char': - body_list = [char(test_value)] - elif test_value_type == 'double': - body_list = [unpack('!d', test_value[2:].decode('hex'))[0]] - elif test_value_type == 'float': - body_list = [float32(unpack('!f', test_value[2:].decode('hex'))[0])] - elif test_value_type == 'int': - body_list = [int32(int(test_value, 16))] - elif test_value_type == 'long': - body_list = [long(test_value, 16)] - elif test_value_type == 'short': - body_list = [short(int(test_value, 16))] - elif test_value_type == 'string': - body_list = [test_value] - else: - raise InteropTestError('JmsSenderShim._create_jms_streammessage: Unknown or unsupported subtype "%s"' % - test_value_type) - return Message(id=(self.sent+1), - body=body_list, - inferred=True, - annotations=create_annotation('JMS_STREAMMESSAGE_TYPE')) - - def _create_jms_textmessage(self, test_value_text): - """Create a JMS text message""" - return Message(id=(self.sent+1), - body=unicode(test_value_text), - annotations=create_annotation('JMS_TEXTMESSAGE_TYPE')) - - def _add_jms_message_headers(self, message): - """Add JMS headers to the supplied message from self.test_headers_map""" - for jms_header in self.test_headers_map.iterkeys(): - value_map = self.test_headers_map[jms_header] - value_type = value_map.keys()[0] # There is only ever one value in map - value = value_map[value_type] - if jms_header == 'JMS_TYPE_HEADER': - if value_type == 'string': - self._s_set_jms_type_header(message, value) - else: - raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' + - 'JMS_TYPE_HEADER requires value type "string", type "%s" found' % - value_type) - elif jms_header == 'JMS_CORRELATIONID_HEADER': - if value_type == 'string': - self._s_set_jms_correlation_id(message, value) - elif value_type == 'bytes': - self._s_set_jms_correlation_id(message, str(value)) - else: - raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' + - 'JMS_CORRELATIONID_HEADER requires value type "string" or "bytes", ' + - 'type "%s" found' % value_type) - elif jms_header == 'JMS_REPLYTO_HEADER': - if value_type == 'queue' or value_type == 'topic': - self._s_set_jms_reply_to(message, value_type, value) - elif value_type == 'temp_queue' or value_type == 'temp_topic': - raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' + - 'JMS_REPLYTO_HEADER type "temp_queue" or "temp_topic" not handled') - else: - raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' + - 'JMS_REPLYTO_HEADER requires value type "queue" or "topic", ' + - 'type "%s" found' % value_type) - else: - raise InteropTestError('JmsSenderShim._add_jms_message_headers(): Invalid JMS message header "%s"' % - jms_header) - - - @staticmethod - def _s_set_jms_type_header(message, message_type): - """Adds a JMS message type header""" - message._set_subject(message_type) - - @staticmethod - def _s_set_jms_correlation_id(message, correlation_id): - """Adds a JMS correlation id header""" - message._set_correlation_id(correlation_id) - message.annotations[symbol(u'x-opt-app-correlation-id')] = True - - @staticmethod - def _s_set_jms_reply_to(message, jms_destination_type_str, destination): - """Adds a JMS reply-to header""" - if jms_destination_type_str == 'queue': - message._set_reply_to(destination) - message.annotations[symbol(u'x-opt-jms-reply-to')] = byte(0) - elif jms_destination_type_str == 'topic': - message._set_reply_to(destination) - message.annotations[symbol(u'x-opt-jms-reply-to')] = byte(1) - else: - raise InteropTestError('JmsSenderShim._s_set_jms_reply_to(): ' + - 'Invalid value for jms_destination_type_str "%s"' % jms_destination_type_str) - - def _add_jms_message_properties(self, message): - """Adds message properties to the supplied message from self.test_properties_map""" - for property_name in self.test_properties_map.iterkeys(): - value_map = self.test_properties_map[property_name] - value_type = value_map.keys()[0] # There is only ever one value in map - value = value_map[value_type] - if message.properties is None: - message.properties = {} - if value_type == 'boolean': - message.properties[property_name] = value == 'True' - elif value_type == 'byte': - message.properties[property_name] = byte(int(value, 16)) - elif value_type == 'double': - message.properties[property_name] = unpack('!d', value[2:].decode('hex'))[0] - elif value_type == 'float': - message.properties[property_name] = float32(unpack('!f', value[2:].decode('hex'))[0]) - elif value_type == 'int': - message.properties[property_name] = int(value, 16) - elif value_type == 'long': - message.properties[property_name] = long(value, 16) - elif value_type == 'short': - message.properties[property_name] = short(int(value, 16)) - elif value_type == 'string': - message.properties[property_name] = value - else: - raise InteropTestError('JmsSenderShim._add_jms_message_properties: ' + - 'Unknown or unhandled message property type ?%s"' % value_type) - - - -# --- main --- -# Args: 1: Broker address (ip-addr:port) -# 2: Queue name -# 3: JMS message type -# 4: JSON Test parameters containing 3 maps: [testValueMap, testHeadersMap, testPropertiesMap] -#print '#### sys.argv=%s' % sys.argv -#print '>>> test_values=%s' % loads(sys.argv[4]) -try: - Container(JmsSenderShim(sys.argv[1], sys.argv[2], sys.argv[3], loads(sys.argv[4]))).run() -except KeyboardInterrupt: - pass -except Exception as exc: - print 'jms-sender-shim EXCEPTION:', exc - print format_exc()
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/TypesReceiverShim.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/TypesReceiverShim.py b/shims/qpid-proton-python/src/TypesReceiverShim.py deleted file mode 100755 index 2876f51..0000000 --- a/shims/qpid-proton-python/src/TypesReceiverShim.py +++ /dev/null @@ -1,127 +0,0 @@ -#!/usr/bin/env python - -""" -AMQP type test receiver shim for qpid-interop-test -""" - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Issues: -# * Capturing errors from client or broker - -import sys -from json import dumps -from proton.handlers import MessagingHandler -from proton.reactor import Container -from traceback import format_exc -from string import digits, letters, punctuation -from struct import pack, unpack - -class AmqpTypesReceiverShim(MessagingHandler): - """ - Reciver shim for AMQP types test - This shim receives the number of messages supplied on the command-line and checks that they contain message - bodies of the exptected AMQP type. The values are then aggregated and returned. - """ - def __init__(self, url, amqp_type, num_expected_messages_str): - super(AmqpTypesReceiverShim, self).__init__() - self.url = url - self.received_value_list = [] - self.amqp_type = amqp_type - self.expected = int(num_expected_messages_str) - self.received = 0 - - def get_received_value_list(self): - """Return the received list of AMQP values""" - return self.received_value_list - - def on_start(self, event): - """Event callback for when the client starts""" - event.container.create_receiver(self.url) - - def on_message(self, event): - """Event callback when a message is received by the client""" - if event.message.id and event.message.id < self.received: - return # ignore duplicate message - if self.expected == 0 or self.received < self.expected: - if self.amqp_type == 'null' or \ - self.amqp_type == 'boolean' or \ - self.amqp_type == 'uuid': - self.received_value_list.append(str(event.message.body)) - elif self.amqp_type == 'ubyte' or \ - self.amqp_type == 'ushort' or \ - self.amqp_type == 'byte' or \ - self.amqp_type == 'short' or \ - self.amqp_type == 'int': - self.received_value_list.append(hex(event.message.body)) - elif self.amqp_type == 'uint' or \ - self.amqp_type == 'ulong' or \ - self.amqp_type == 'long' or \ - self.amqp_type == 'timestamp': - hex_str = hex(int(event.message.body)) - if len(hex_str) == 19 and hex_str[-1] == 'L': - self.received_value_list.append(hex_str[:-1]) # strip trailing 'L' if present on some ulongs - else: - self.received_value_list.append(hex_str) - elif self.amqp_type == 'float': - self.received_value_list.append('0x%08x' % unpack('!L', pack('!f', event.message.body))[0]) - elif self.amqp_type == 'double': - self.received_value_list.append('0x%016x' % unpack('!Q', pack('!d', event.message.body))[0]) - elif self.amqp_type == 'decimal32': - self.received_value_list.append('0x%08x' % event.message.body) - elif self.amqp_type == 'decimal64': - self.received_value_list.append('0x%016x' % event.message.body) - elif self.amqp_type == 'decimal128': - self.received_value_list.append('0x' + ''.join(['%02x' % ord(c) for c in event.message.body]).strip()) - elif self.amqp_type == 'char': - if ord(event.message.body) < 0x80 and event.message.body in digits + letters + punctuation: - self.received_value_list.append(event.message.body) - else: - self.received_value_list.append(hex(ord(event.message.body))) - elif self.amqp_type == 'binary' or \ - self.amqp_type == 'string' or \ - self.amqp_type == 'symbol': - self.received_value_list.append(event.message.body) - elif self.amqp_type == 'list' or \ - self.amqp_type == 'map': - self.received_value_list.append(event.message.body) - else: - print 'receive: Unsupported AMQP type "%s"' % self.amqp_type - return - self.received += 1 - if self.received == self.expected: - event.receiver.close() - event.connection.close() - -# --- main --- -# Args: 1: Broker address (ip-addr:port) -# 2: Queue name -# 3: AMQP type -# 4: Expected number of test values to receive -try: - RECEIVER = AmqpTypesReceiverShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], sys.argv[4]) - Container(RECEIVER).run() - print sys.argv[3] - print dumps(RECEIVER.get_received_value_list()) -except KeyboardInterrupt: - pass -except Exception as exc: - print 'proton-python-receive EXCEPTION:', exc - print format_exc() http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/TypesSenderShim.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/TypesSenderShim.py b/shims/qpid-proton-python/src/TypesSenderShim.py deleted file mode 100755 index 19d183f..0000000 --- a/shims/qpid-proton-python/src/TypesSenderShim.py +++ /dev/null @@ -1,155 +0,0 @@ -#!/usr/bin/env python - -""" -AMQP type test sender shim for qpid-interop-test -""" - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Issues: -# * Capturing errors from client or broker - -import sys -import os.path -from json import loads -from proton import byte, char, decimal32, decimal64, decimal128, float32, int32, Message, short, symbol, timestamp, \ - ubyte, uint, ulong, ushort -from proton.handlers import MessagingHandler -from proton.reactor import Container -from struct import unpack -from traceback import format_exc -from uuid import UUID - -class AmqpTypesSenderShim(MessagingHandler): - """ - Sender shim for AMQP types test - This shim receives the AMQP type and a list of test values. Each value is sent in a message body of the appropriate - AMQP type. There is no returned value. - """ - def __init__(self, url, amqp_type, test_value_list): - super(AmqpTypesSenderShim, self).__init__() - self.url = url - self.amqp_type = amqp_type - self.test_value_list = test_value_list - self.sent = 0 - self.confirmed = 0 - self.total = len(test_value_list) - - def on_start(self, event): - """Event callback for when the client starts""" - event.container.create_sender(self.url) - - def on_sendable(self, event): - """Event callback for when send credit is received, allowing the sending of messages""" - if self.sent == 0: - for test_value in self.test_value_list: - if event.sender.credit: - message = self.create_message(test_value) - if message is not None: - event.sender.send(message) - self.sent += 1 - else: - event.connection.close() - return - - def create_message(self, test_value): - """ - Creates a single message with the test value translated from its string representation to the appropriate - AMQP value (set in self.amqp_type). - """ - if self.amqp_type == 'null': - return Message(id=(self.sent+1), body=None) - elif self.amqp_type == 'boolean': - return Message(id=(self.sent+1), body=True if test_value == 'True' else False) - elif self.amqp_type == 'ubyte': - return Message(id=(self.sent+1), body=ubyte(int(test_value, 16))) - elif self.amqp_type == 'ushort': - return Message(id=(self.sent+1), body=ushort(int(test_value, 16))) - elif self.amqp_type == 'uint': - return Message(id=(self.sent+1), body=uint(int(test_value, 16))) - elif self.amqp_type == 'ulong': - return Message(id=(self.sent+1), body=ulong(int(test_value, 16))) - elif self.amqp_type == 'byte': - return Message(id=(self.sent+1), body=byte(int(test_value, 16))) - elif self.amqp_type == 'short': - return Message(id=(self.sent+1), body=short(int(test_value, 16))) - elif self.amqp_type == 'int': - return Message(id=(self.sent+1), body=int32(int(test_value, 16))) - elif self.amqp_type == 'long': - return Message(id=(self.sent+1), body=long(int(test_value, 16))) - elif self.amqp_type == 'float': - return Message(id=(self.sent+1), body=float32(unpack('!f', test_value[2:].decode('hex'))[0])) - elif self.amqp_type == 'double': - return Message(id=(self.sent+1), body=unpack('!d', test_value[2:].decode('hex'))[0]) - elif self.amqp_type == 'decimal32': - return Message(id=(self.sent+1), body=decimal32(int(test_value[2:], 16))) - elif self.amqp_type == 'decimal64': - l64 = long(test_value[2:], 16) - return Message(id=(self.sent+1), body=decimal64(l64)) - elif self.amqp_type == 'decimal128': - return Message(id=(self.sent+1), body=decimal128(test_value[2:].decode('hex'))) - elif self.amqp_type == 'char': - if len(test_value) == 1: # Format 'a' - return Message(id=(self.sent+1), body=char(test_value)) - else: - val = int(test_value, 16) - return Message(id=(self.sent+1), body=char(unichr(val))) - elif self.amqp_type == 'timestamp': - return Message(id=(self.sent+1), body=timestamp(int(test_value, 16))) - elif self.amqp_type == 'uuid': - return Message(id=(self.sent+1), body=UUID(test_value)) - elif self.amqp_type == 'binary': - return Message(id=(self.sent+1), body=bytes(test_value)) - elif self.amqp_type == 'string': - return Message(id=(self.sent+1), body=unicode(test_value)) - elif self.amqp_type == 'symbol': - return Message(id=(self.sent+1), body=symbol(test_value)) - elif self.amqp_type == 'list': - return Message(id=(self.sent+1), body=test_value) - elif self.amqp_type == 'map': - return Message(id=(self.sent+1), body=test_value) - else: - print 'send: Unsupported AMQP type "%s"' % self.amqp_type - return None - - def on_accepted(self, event): - """Event callback for when a sent message is accepted by the broker""" - self.confirmed += 1 - if self.confirmed == self.total: - event.connection.close() - - def on_disconnected(self, event): - """Event callback for when the broker disconnects with the client""" - self.sent = self.confirmed - - -# --- main --- -# Args: 1: Broker address (ip-addr:port) -# 2: Queue name -# 3: AMQP type -# 4...n: Test value(s) as strings -try: - Container(AmqpTypesSenderShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], loads(sys.argv[4]))).run() -except KeyboardInterrupt: - pass -except Exception as exc: - print os.path.basename(sys.argv[0]), 'EXCEPTION:', exc - print format_exc() - \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/amqp_types_test/Receiver.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/amqp_types_test/Receiver.py b/shims/qpid-proton-python/src/amqp_types_test/Receiver.py new file mode 100755 index 0000000..2876f51 --- /dev/null +++ b/shims/qpid-proton-python/src/amqp_types_test/Receiver.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python + +""" +AMQP type test receiver shim for qpid-interop-test +""" + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Issues: +# * Capturing errors from client or broker + +import sys +from json import dumps +from proton.handlers import MessagingHandler +from proton.reactor import Container +from traceback import format_exc +from string import digits, letters, punctuation +from struct import pack, unpack + +class AmqpTypesReceiverShim(MessagingHandler): + """ + Reciver shim for AMQP types test + This shim receives the number of messages supplied on the command-line and checks that they contain message + bodies of the exptected AMQP type. The values are then aggregated and returned. + """ + def __init__(self, url, amqp_type, num_expected_messages_str): + super(AmqpTypesReceiverShim, self).__init__() + self.url = url + self.received_value_list = [] + self.amqp_type = amqp_type + self.expected = int(num_expected_messages_str) + self.received = 0 + + def get_received_value_list(self): + """Return the received list of AMQP values""" + return self.received_value_list + + def on_start(self, event): + """Event callback for when the client starts""" + event.container.create_receiver(self.url) + + def on_message(self, event): + """Event callback when a message is received by the client""" + if event.message.id and event.message.id < self.received: + return # ignore duplicate message + if self.expected == 0 or self.received < self.expected: + if self.amqp_type == 'null' or \ + self.amqp_type == 'boolean' or \ + self.amqp_type == 'uuid': + self.received_value_list.append(str(event.message.body)) + elif self.amqp_type == 'ubyte' or \ + self.amqp_type == 'ushort' or \ + self.amqp_type == 'byte' or \ + self.amqp_type == 'short' or \ + self.amqp_type == 'int': + self.received_value_list.append(hex(event.message.body)) + elif self.amqp_type == 'uint' or \ + self.amqp_type == 'ulong' or \ + self.amqp_type == 'long' or \ + self.amqp_type == 'timestamp': + hex_str = hex(int(event.message.body)) + if len(hex_str) == 19 and hex_str[-1] == 'L': + self.received_value_list.append(hex_str[:-1]) # strip trailing 'L' if present on some ulongs + else: + self.received_value_list.append(hex_str) + elif self.amqp_type == 'float': + self.received_value_list.append('0x%08x' % unpack('!L', pack('!f', event.message.body))[0]) + elif self.amqp_type == 'double': + self.received_value_list.append('0x%016x' % unpack('!Q', pack('!d', event.message.body))[0]) + elif self.amqp_type == 'decimal32': + self.received_value_list.append('0x%08x' % event.message.body) + elif self.amqp_type == 'decimal64': + self.received_value_list.append('0x%016x' % event.message.body) + elif self.amqp_type == 'decimal128': + self.received_value_list.append('0x' + ''.join(['%02x' % ord(c) for c in event.message.body]).strip()) + elif self.amqp_type == 'char': + if ord(event.message.body) < 0x80 and event.message.body in digits + letters + punctuation: + self.received_value_list.append(event.message.body) + else: + self.received_value_list.append(hex(ord(event.message.body))) + elif self.amqp_type == 'binary' or \ + self.amqp_type == 'string' or \ + self.amqp_type == 'symbol': + self.received_value_list.append(event.message.body) + elif self.amqp_type == 'list' or \ + self.amqp_type == 'map': + self.received_value_list.append(event.message.body) + else: + print 'receive: Unsupported AMQP type "%s"' % self.amqp_type + return + self.received += 1 + if self.received == self.expected: + event.receiver.close() + event.connection.close() + +# --- main --- +# Args: 1: Broker address (ip-addr:port) +# 2: Queue name +# 3: AMQP type +# 4: Expected number of test values to receive +try: + RECEIVER = AmqpTypesReceiverShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], sys.argv[4]) + Container(RECEIVER).run() + print sys.argv[3] + print dumps(RECEIVER.get_received_value_list()) +except KeyboardInterrupt: + pass +except Exception as exc: + print 'proton-python-receive EXCEPTION:', exc + print format_exc() http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/amqp_types_test/Sender.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/amqp_types_test/Sender.py b/shims/qpid-proton-python/src/amqp_types_test/Sender.py new file mode 100755 index 0000000..19d183f --- /dev/null +++ b/shims/qpid-proton-python/src/amqp_types_test/Sender.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python + +""" +AMQP type test sender shim for qpid-interop-test +""" + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Issues: +# * Capturing errors from client or broker + +import sys +import os.path +from json import loads +from proton import byte, char, decimal32, decimal64, decimal128, float32, int32, Message, short, symbol, timestamp, \ + ubyte, uint, ulong, ushort +from proton.handlers import MessagingHandler +from proton.reactor import Container +from struct import unpack +from traceback import format_exc +from uuid import UUID + +class AmqpTypesSenderShim(MessagingHandler): + """ + Sender shim for AMQP types test + This shim receives the AMQP type and a list of test values. Each value is sent in a message body of the appropriate + AMQP type. There is no returned value. + """ + def __init__(self, url, amqp_type, test_value_list): + super(AmqpTypesSenderShim, self).__init__() + self.url = url + self.amqp_type = amqp_type + self.test_value_list = test_value_list + self.sent = 0 + self.confirmed = 0 + self.total = len(test_value_list) + + def on_start(self, event): + """Event callback for when the client starts""" + event.container.create_sender(self.url) + + def on_sendable(self, event): + """Event callback for when send credit is received, allowing the sending of messages""" + if self.sent == 0: + for test_value in self.test_value_list: + if event.sender.credit: + message = self.create_message(test_value) + if message is not None: + event.sender.send(message) + self.sent += 1 + else: + event.connection.close() + return + + def create_message(self, test_value): + """ + Creates a single message with the test value translated from its string representation to the appropriate + AMQP value (set in self.amqp_type). + """ + if self.amqp_type == 'null': + return Message(id=(self.sent+1), body=None) + elif self.amqp_type == 'boolean': + return Message(id=(self.sent+1), body=True if test_value == 'True' else False) + elif self.amqp_type == 'ubyte': + return Message(id=(self.sent+1), body=ubyte(int(test_value, 16))) + elif self.amqp_type == 'ushort': + return Message(id=(self.sent+1), body=ushort(int(test_value, 16))) + elif self.amqp_type == 'uint': + return Message(id=(self.sent+1), body=uint(int(test_value, 16))) + elif self.amqp_type == 'ulong': + return Message(id=(self.sent+1), body=ulong(int(test_value, 16))) + elif self.amqp_type == 'byte': + return Message(id=(self.sent+1), body=byte(int(test_value, 16))) + elif self.amqp_type == 'short': + return Message(id=(self.sent+1), body=short(int(test_value, 16))) + elif self.amqp_type == 'int': + return Message(id=(self.sent+1), body=int32(int(test_value, 16))) + elif self.amqp_type == 'long': + return Message(id=(self.sent+1), body=long(int(test_value, 16))) + elif self.amqp_type == 'float': + return Message(id=(self.sent+1), body=float32(unpack('!f', test_value[2:].decode('hex'))[0])) + elif self.amqp_type == 'double': + return Message(id=(self.sent+1), body=unpack('!d', test_value[2:].decode('hex'))[0]) + elif self.amqp_type == 'decimal32': + return Message(id=(self.sent+1), body=decimal32(int(test_value[2:], 16))) + elif self.amqp_type == 'decimal64': + l64 = long(test_value[2:], 16) + return Message(id=(self.sent+1), body=decimal64(l64)) + elif self.amqp_type == 'decimal128': + return Message(id=(self.sent+1), body=decimal128(test_value[2:].decode('hex'))) + elif self.amqp_type == 'char': + if len(test_value) == 1: # Format 'a' + return Message(id=(self.sent+1), body=char(test_value)) + else: + val = int(test_value, 16) + return Message(id=(self.sent+1), body=char(unichr(val))) + elif self.amqp_type == 'timestamp': + return Message(id=(self.sent+1), body=timestamp(int(test_value, 16))) + elif self.amqp_type == 'uuid': + return Message(id=(self.sent+1), body=UUID(test_value)) + elif self.amqp_type == 'binary': + return Message(id=(self.sent+1), body=bytes(test_value)) + elif self.amqp_type == 'string': + return Message(id=(self.sent+1), body=unicode(test_value)) + elif self.amqp_type == 'symbol': + return Message(id=(self.sent+1), body=symbol(test_value)) + elif self.amqp_type == 'list': + return Message(id=(self.sent+1), body=test_value) + elif self.amqp_type == 'map': + return Message(id=(self.sent+1), body=test_value) + else: + print 'send: Unsupported AMQP type "%s"' % self.amqp_type + return None + + def on_accepted(self, event): + """Event callback for when a sent message is accepted by the broker""" + self.confirmed += 1 + if self.confirmed == self.total: + event.connection.close() + + def on_disconnected(self, event): + """Event callback for when the broker disconnects with the client""" + self.sent = self.confirmed + + +# --- main --- +# Args: 1: Broker address (ip-addr:port) +# 2: Queue name +# 3: AMQP type +# 4...n: Test value(s) as strings +try: + Container(AmqpTypesSenderShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], loads(sys.argv[4]))).run() +except KeyboardInterrupt: + pass +except Exception as exc: + print os.path.basename(sys.argv[0]), 'EXCEPTION:', exc + print format_exc() + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/jms_messages_test/Receiver.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/jms_messages_test/Receiver.py b/shims/qpid-proton-python/src/jms_messages_test/Receiver.py new file mode 100755 index 0000000..9140db1 --- /dev/null +++ b/shims/qpid-proton-python/src/jms_messages_test/Receiver.py @@ -0,0 +1,358 @@ +#!/usr/bin/env python + +""" +JMS receiver shim for qpid-interop-test +""" + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +from interop_test_errors import InteropTestError +from json import dumps, loads +from proton import byte, symbol +from proton.handlers import MessagingHandler +from proton.reactor import Container +from struct import pack, unpack +from subprocess import check_output +from traceback import format_exc + +# These values must tie in with the Qpid-JMS client values found in +# org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport +QPID_JMS_TYPE_ANNOTATION_NAME = symbol(u'x-opt-jms-msg-type') + +class JmsReceiverShim(MessagingHandler): + """ + Receiver shim: This shim receives JMS messages sent by the Sender shim and prints the contents of the received + messages onto the terminal in JSON format for retrieval by the test harness. The JMS messages type and, where + applicable, body values, as well as the combinations of JMS headers and properties which may be attached to + the message are received on the command-line in JSON format when this program is launched. + """ + def __init__(self, url, jms_msg_type, test_parameters_list): + super(JmsReceiverShim, self).__init__() + self.url = url + self.jms_msg_type = jms_msg_type + self.expteced_msg_map = test_parameters_list[0] + self.flag_map = test_parameters_list[1] + self.subtype_itr = iter(sorted(self.expteced_msg_map.keys())) + self.expected = self._get_tot_num_messages() + self.received = 0 + self.received_value_map = {} + self.current_subtype = None + self.current_subtype_msg_list = None + self.jms_header_map = {} + self.jms_property_map = {} + + def get_received_value_map(self): + """"Return the collected message values received""" + return self.received_value_map + + def get_jms_header_map(self): + """Return the collected message headers received""" + return self.jms_header_map + + def get_jms_property_map(self): + """Return the collected message properties received""" + return self.jms_property_map + + def on_start(self, event): + """Event callback for when the client starts""" + event.container.create_receiver(self.url) + + def on_message(self, event): + """Event callback when a message is received by the client""" + if event.message.id and event.message.id < self.received: + return # ignore duplicate message + if self.expected == 0 or self.received < self.expected: + if self.current_subtype is None: + self.current_subtype = self.subtype_itr.next() + self.current_subtype_msg_list = [] + self.current_subtype_msg_list.append(self._handle_message(event.message)) + self._process_jms_headers(event.message) + self._process_jms_properties(event.message) + if len(self.current_subtype_msg_list) >= self.expteced_msg_map[self.current_subtype]: + self.received_value_map[self.current_subtype] = self.current_subtype_msg_list + self.current_subtype = None + self.current_subtype_msg_list = [] + self.received += 1 + if self.received == self.expected: + event.receiver.close() + event.connection.close() + + def on_connection_error(self, event): + print 'JmsReceiverShim.on_connection_error' + + def on_session_error(self, event): + print 'JmsReceiverShim.on_session_error' + + def on_link_error(self, event): + print 'JmsReceiverShim.on_link_error' + + def _handle_message(self, message): + """Handles the analysis of a received message""" + if self.jms_msg_type == 'JMS_MESSAGE_TYPE': + return self._receive_jms_message(message) + if self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE': + return self._receive_jms_bytesmessage(message) + if self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE': + return self._recieve_jms_mapmessage(message) + if self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE': + return self._recieve_jms_objectmessage(message) + if self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE': + return self._receive_jms_streammessage(message) + if self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE': + return self._receive_jms_textmessage(message) + print 'jms-receive: Unsupported JMS message type "%s"' % self.jms_msg_type + return None + + def _get_tot_num_messages(self): + """"Counts up the total number of messages which should be received from the expected message map""" + total = 0 + for key in self.expteced_msg_map: + total += int(self.expteced_msg_map[key]) + return total + + def _receive_jms_message(self, message): + """"Receives a JMS message (without a body)""" + assert self.jms_msg_type == 'JMS_MESSAGE_TYPE' + assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(0) + if message.body is not None: + raise InteropTestError('_receive_jms_message: Invalid body for type JMS_MESSAGE_TYPE: %s' % + str(message.body)) + return None + + def _receive_jms_bytesmessage(self, message): + """"Receives a JMS bytes message""" + assert self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE' + assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(3) + if self.current_subtype == 'boolean': + if message.body == b'\x00': + return 'False' + if message.body == b'\x01': + return 'True' + raise InteropTestError('_receive_jms_bytesmessage: Invalid encoding for subtype boolean: %s' % + str(message.body)) + if self.current_subtype == 'byte': + return hex(unpack('b', message.body)[0]) + if self.current_subtype == 'bytes': + return str(message.body) + if self.current_subtype == 'char': + if len(message.body) == 2: # format 'a' or '\xNN' + return str(message.body[1]) # strip leading '\x00' char + raise InteropTestError('Unexpected strring length for type char: %d' % len(message.body)) + if self.current_subtype == 'double': + return '0x%016x' % unpack('!Q', message.body)[0] + if self.current_subtype == 'float': + return '0x%08x' % unpack('!L', message.body)[0] + if self.current_subtype == 'int': + return hex(unpack('!i', message.body)[0]) + if self.current_subtype == 'long': + return hex(unpack('!q', message.body)[0]) + if self.current_subtype == 'short': + return hex(unpack('!h', message.body)[0]) + if self.current_subtype == 'string': + # NOTE: first 2 bytes are string length, must be present + if len(message.body) >= 2: + str_len = unpack('!H', message.body[:2])[0] + str_body = str(message.body[2:]) + if len(str_body) != str_len: + raise InteropTestError('String length mismatch: size=%d, but len(\'%s\')=%d' % + (str_len, str_body, len(str_body))) + return str_body + else: + raise InteropTestError('Malformed string binary: len(\'%s\')=%d' % + (repr(message.body), len(message.body))) + raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' % + (self.jms_msg_type, self.current_subtype)) + + def _recieve_jms_mapmessage(self, message): + """"Receives a JMS map message""" + assert self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE' + assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(2) + key, value = message.body.items()[0] + assert key[:-3] == self.current_subtype + if self.current_subtype == 'boolean': + return str(value) + if self.current_subtype == 'byte': + return hex(value) + if self.current_subtype == 'bytes': + return str(value) + if self.current_subtype == 'char': + return str(value) + if self.current_subtype == 'double': + return '0x%016x' % unpack('!Q', pack('!d', value))[0] + if self.current_subtype == 'float': + return '0x%08x' % unpack('!L', pack('!f', value))[0] + if self.current_subtype == 'int': + return hex(value) + if self.current_subtype == 'long': + return hex(int(value)) + if self.current_subtype == 'short': + return hex(value) + if self.current_subtype == 'string': + return str(value) + raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' % + (self.jms_msg_type, self.current_subtype)) + + def _recieve_jms_objectmessage(self, message): + """"Receives a JMS Object message""" + assert self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE' + assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(1) + return self._get_java_obj(message.body) + + def _get_java_obj(self, java_obj_bytes): + """ + Take bytes from serialized Java object and construct a Java object, then return its toString() value. The + work of 'translating' the bytes to a Java object and obtaining its class and value is done in a Java + utility org.apache.qpid.interop_test.obj_util.BytesToJavaObj located in jar JavaObjUtils.jar. + java_obj_bytes: hex string representation of bytes from Java object (eg 'aced00057372...') + returns: string containing Java class value as returned by the toString() method + """ + java_obj_bytes_str = ''.join(["%02x" % ord(x) for x in java_obj_bytes]).strip() + out_str = check_output(['java', + '-cp', + 'target/JavaObjUtils.jar', + 'org.apache.qpid.interop_test.obj_util.BytesToJavaObj', + java_obj_bytes_str]) + out_str_list = out_str.split('\n')[:-1] # remove trailing \n + if len(out_str_list) > 1: + raise InteropTestError('Unexpected return from JavaObjUtils: %s' % out_str) + colon_index = out_str_list[0].index(':') + if colon_index < 0: + raise InteropTestError('Unexpected format from JavaObjUtils: %s' % out_str) + java_class_name = out_str_list[0][:colon_index] + java_class_value_str = out_str_list[0][colon_index+1:] + if java_class_name != self.current_subtype: + raise InteropTestError('Unexpected class name from JavaObjUtils: expected %s, recieved %s' % + (self.current_subtype, java_class_name)) + return java_class_value_str + + def _receive_jms_streammessage(self, message): + """Receives a JMS stream message""" + assert self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE' + assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(4) + # Every message is a list with one item [value] + assert len(message.body) == 1 + value = message.body[0] + if self.current_subtype == 'boolean': + return str(value) + if self.current_subtype == 'byte': + return hex(value) + if self.current_subtype == 'bytes': + return str(value) + if self.current_subtype == 'char': + return str(value) + if self.current_subtype == 'double': + return '0x%016x' % unpack('!Q', pack('!d', value))[0] + if self.current_subtype == 'float': + return '0x%08x' % unpack('!L', pack('!f', value))[0] + if self.current_subtype == 'int': + return hex(value) + if self.current_subtype == 'long': + return hex(int(value)) + if self.current_subtype == 'short': + return hex(value) + if self.current_subtype == 'string': + return str(value) + raise InteropTestError('JmsRecieverShim._receive_jms_streammessage(): ' + + 'JMS message type %s: Unknown or unsupported subtype \'%s\'' % + (self.jms_msg_type, self.current_subtype)) + + def _receive_jms_textmessage(self, message): + """"Receives a JMS text message""" + assert self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE' + assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(5) + return message.body + + def _process_jms_headers(self, message): + """"Checks the supplied message for three JMS headers: message type, correlation-id and reply-to""" + # JMS message type header + message_type_header = message._get_subject() + if message_type_header is not None: + self.jms_header_map['JMS_TYPE_HEADER'] = {'string': message_type_header} + + # JMS correlation ID + correlation_id = message._get_correlation_id() + if correlation_id is not None: + if 'JMS_CORRELATIONID_AS_BYTES' in self.flag_map and self.flag_map['JMS_CORRELATIONID_AS_BYTES']: + self.jms_header_map['JMS_CORRELATIONID_HEADER'] = {'bytes': correlation_id} + else: + self.jms_header_map['JMS_CORRELATIONID_HEADER'] = {'string': correlation_id} + + # JMS reply-to + reply_to = message._get_reply_to() + if reply_to is not None: + if 'JMS_REPLYTO_AS_TOPIC' in self.flag_map and self.flag_map['JMS_REPLYTO_AS_TOPIC']: + # Some brokers prepend 'queue://' and 'topic://' to reply_to addresses, strip these when present + if len(reply_to) > 8 and reply_to[0:8] == 'topic://': + reply_to = reply_to[8:] + self.jms_header_map['JMS_REPLYTO_HEADER'] = {'topic': reply_to} + else: + if len(reply_to) > 8 and reply_to[0:8] == 'queue://': + reply_to = reply_to[8:] + self.jms_header_map['JMS_REPLYTO_HEADER'] = {'queue': reply_to} + + def _process_jms_properties(self, message): + """"Checks the supplied message for JMS message properties and decodes them""" + if message.properties is not None: + for jms_property_name in message.properties: + underscore_index = jms_property_name.find('_') + if underscore_index >= 0: # Ignore any other properties without '_' + jms_property_type = jms_property_name[0:underscore_index] + value = message.properties[jms_property_name] + if jms_property_type == 'boolean': + self.jms_property_map[jms_property_name] = {'boolean': str(value)} + elif jms_property_type == 'byte': + self.jms_property_map[jms_property_name] = {'byte': hex(value)} + elif jms_property_type == 'double': + self.jms_property_map[jms_property_name] = {'double': '0x%016x' % + unpack('!Q', pack('!d', value))[0]} + elif jms_property_type == 'float': + self.jms_property_map[jms_property_name] = {'float': '0x%08x' % + unpack('!L', pack('!f', value))[0]} + elif jms_property_type == 'int': + self.jms_property_map[jms_property_name] = {'int': hex(value)} + elif jms_property_type == 'long': + self.jms_property_map[jms_property_name] = {'long': hex(int(value))} + elif jms_property_type == 'short': + self.jms_property_map[jms_property_name] = {'short': hex(value)} + elif jms_property_type == 'string': + self.jms_property_map[jms_property_name] = {'string': str(value)} + else: + pass # Ignore any other properties, brokers can add them and we don't know what they may be + + +# --- main --- +# Args: 1: Broker address (ip-addr:port) +# 2: Queue name +# 3: JMS message type +# 4: JSON Test parameters containing 2 maps: [testValuesMap, flagMap] +#print '#### sys.argv=%s' % sys.argv +try: + RECEIVER = JmsReceiverShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], loads(sys.argv[4])) + Container(RECEIVER).run() + print sys.argv[3] + print dumps(RECEIVER.get_received_value_map()) + print dumps(RECEIVER.get_jms_header_map()) + print dumps(RECEIVER.get_jms_property_map()) +except KeyboardInterrupt: + pass +except Exception as exc: + print 'jms-receiver-shim EXCEPTION:', exc + print format_exc() http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/jms_messages_test/Sender.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/jms_messages_test/Sender.py b/shims/qpid-proton-python/src/jms_messages_test/Sender.py new file mode 100755 index 0000000..5a1108a --- /dev/null +++ b/shims/qpid-proton-python/src/jms_messages_test/Sender.py @@ -0,0 +1,390 @@ +#!/usr/bin/env python + +""" +JMS sender shim for qpid-interop-test +""" + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +from json import loads +from proton import byte, char, float32, int32, Message, short, symbol +from proton.handlers import MessagingHandler +from proton.reactor import Container +from interop_test_errors import InteropTestError +from subprocess import check_output +from struct import pack, unpack +from traceback import format_exc + +# These values must tie in with the Qpid-JMS client values found in +# org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport +QPID_JMS_TYPE_ANNOTATION_NAME = symbol(u'x-opt-jms-msg-type') +QPID_JMS_TYPE_ANNOTATIONS = { + 'JMS_MESSAGE_TYPE': byte(0), + 'JMS_BYTESMESSAGE_TYPE': byte(3), + 'JMS_MAPMESSAGE_TYPE': byte(2), + 'JMS_OBJECTMESSAGE_TYPE': byte(1), + 'JMS_STREAMMESSAGE_TYPE': byte(4), + 'JMS_TEXTMESSAGE_TYPE': byte(5) + } +def create_annotation(jms_msg_type): + """Function which creates a message annotation for JMS message type as used by the Qpid JMS client""" + return {QPID_JMS_TYPE_ANNOTATION_NAME: QPID_JMS_TYPE_ANNOTATIONS[jms_msg_type]} + +class JmsSenderShim(MessagingHandler): + """ + This shim sends JMS messages of a particular JMS message type according to the test parameters list. This list + contains three maps: + 0: The test value map, which contains test value types as keys, and lists of values of that type; + 1. The test headers map, which contains the JMS headers as keys and a submap conatining types and values; + 2. The test proprties map, which contains the name of the properties as keys, and a submap containing types + and values + This shim takes the combinations of the above map and creates test cases, each of which sends a single message + with (or without) JMS headers and properties. + """ + def __init__(self, broker_ip_addr, queue_name, jms_msg_type, test_parameters_list): + super(JmsSenderShim, self).__init__() + self.broker_ip_addr = broker_ip_addr + self.queue_name = queue_name + self.jms_msg_type = jms_msg_type + self.test_value_map = test_parameters_list[0] + self.test_headers_map = test_parameters_list[1] + self.test_properties_map = test_parameters_list[2] + self.sent = 0 + self.confirmed = 0 + self.total = self._get_total_num_msgs() + + def on_start(self, event): + """Event callback for when the client starts""" + event.container.create_sender('%s/%s' % (self.broker_ip_addr, self.queue_name)) + + def on_sendable(self, event): + """Event callback for when send credit is received, allowing the sending of messages""" + if self.sent == 0: + # These types expect a test_values Python string representation of a map: '{type:[val, val, val], ...}' + for sub_type in sorted(self.test_value_map.keys()): + if self._send_test_values(event, sub_type, self.test_value_map[sub_type]): + return + + def on_connection_error(self, event): + print 'JmsSenderShim.on_connection_error' + + def on_session_error(self, event): + print 'JmsSenderShim.on_session_error' + + def on_link_error(self, event): + print 'JmsSenderShim.on_link_error' + + def on_accepted(self, event): + """Event callback for when a sent message is accepted by the broker""" + self.confirmed += 1 + if self.confirmed == self.total: + event.connection.close() + + def on_disconnected(self, event): + """Event callback for when the broker disconnects with the client""" + self.sent = self.confirmed + + def _get_total_num_msgs(self): + """ + Calculates the total number of messages to be sent based on the message parameters received on the command-line + """ + total = 0 + for key in self.test_value_map.keys(): + total += len(self.test_value_map[key]) + return total + + def _send_test_values(self, event, test_value_type, test_values): + """Method which loops through recieved parameters and sends the corresponding messages""" + value_num = 0 + for test_value in test_values: + if event.sender.credit: + message = self._create_message(test_value_type, test_value, value_num) + # TODO: set message to address + if message is not None: + self._add_jms_message_headers(message) + self._add_jms_message_properties(message) + event.sender.send(message) + self.sent += 1 + value_num += 1 + else: + event.connection.close() + return True + return False + + # TODO: Change this to return a list of messages. That way each test can return more than one message + def _create_message(self, test_value_type, test_value, value_num): + """Create a single message of the appropriate JMS message type""" + if self.jms_msg_type == 'JMS_MESSAGE_TYPE': + return self._create_jms_message(test_value_type, test_value) + elif self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE': + return self._create_jms_bytesmessage(test_value_type, test_value) + elif self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE': + return self._create_jms_mapmessage(test_value_type, test_value, "%s%03d" % (test_value_type, value_num)) + elif self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE': + return self._create_jms_objectmessage('%s:%s' % (test_value_type, test_value)) + elif self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE': + return self._create_jms_streammessage(test_value_type, test_value) + elif self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE': + return self._create_jms_textmessage(test_value) + else: + print 'jms-send: Unsupported JMS message type "%s"' % self.jms_msg_type + return None + + def _create_jms_message(self, test_value_type, test_value): + """Create a JMS message type (without message body)""" + if test_value_type != 'none': + raise InteropTestError('JmsSenderShim._create_jms_message: Unknown or unsupported subtype "%s"' % + test_value_type) + if test_value is not None: + raise InteropTestError('JmsSenderShim._create_jms_message: Invalid value "%s" for subtype "%s"' % + (test_value, test_value_type)) + return Message(id=(self.sent+1), + content_type='application/octet-stream', + annotations=create_annotation('JMS_MESSAGE_TYPE')) + + def _create_jms_bytesmessage(self, test_value_type, test_value): + """Create a JMS bytes message""" + # NOTE: test_value contains all unicode strings u'...' as returned by json + body_bytes = None + if test_value_type == 'boolean': + body_bytes = b'\x01' if test_value == 'True' else b'\x00' + elif test_value_type == 'byte': + body_bytes = pack('b', int(test_value, 16)) + elif test_value_type == 'bytes': + body_bytes = str(test_value) # remove unicode + elif test_value_type == 'char': + # JMS expects two-byte chars, ASCII chars can be prefixed with '\x00' + body_bytes = '\x00' + str(test_value) # remove unicode + elif test_value_type == 'double' or test_value_type == 'float': + body_bytes = test_value[2:].decode('hex') + elif test_value_type == 'int': + body_bytes = pack('!i', int(test_value, 16)) + elif test_value_type == 'long': + body_bytes = pack('!q', long(test_value, 16)) + elif test_value_type == 'short': + body_bytes = pack('!h', short(test_value, 16)) + elif test_value_type == 'string': + # NOTE: First two bytes must be string length + test_value_str = str(test_value) # remove unicode + body_bytes = pack('!H', len(test_value_str)) + test_value_str + else: + raise InteropTestError('JmsSenderShim._create_jms_bytesmessage: Unknown or unsupported subtype "%s"' % + test_value_type) + return Message(id=(self.sent+1), + body=body_bytes, + inferred=True, + content_type='application/octet-stream', + annotations=create_annotation('JMS_BYTESMESSAGE_TYPE')) + + def _create_jms_mapmessage(self, test_value_type, test_value, name): + """Create a JMS map message""" + if test_value_type == 'boolean': + value = test_value == 'True' + elif test_value_type == 'byte': + value = byte(int(test_value, 16)) + elif test_value_type == 'bytes': + value = str(test_value) # remove unicode + elif test_value_type == 'char': + value = char(test_value) + elif test_value_type == 'double': + value = unpack('!d', test_value[2:].decode('hex'))[0] + elif test_value_type == 'float': + value = float32(unpack('!f', test_value[2:].decode('hex'))[0]) + elif test_value_type == 'int': + value = int32(int(test_value, 16)) + elif test_value_type == 'long': + value = long(test_value, 16) + elif test_value_type == 'short': + value = short(int(test_value, 16)) + elif test_value_type == 'string': + value = test_value + else: + raise InteropTestError('JmsSenderShim._create_jms_mapmessage: Unknown or unsupported subtype "%s"' % + test_value_type) + return Message(id=(self.sent+1), + body={name: value}, + inferred=False, + annotations=create_annotation('JMS_MAPMESSAGE_TYPE')) + + def _create_jms_objectmessage(self, test_value): + """Create a JMS object message""" + java_binary = self._s_get_java_obj_binary(test_value) + return Message(id=(self.sent+1), + body=java_binary, + inferred=True, + content_type='application/x-java-serialized-object', + annotations=create_annotation('JMS_OBJECTMESSAGE_TYPE')) + + @staticmethod + def _s_get_java_obj_binary(java_class_str): + """Call external utility to create Java object and stringify it, returning the string representation""" + out_str = check_output(['java', + '-cp', + 'target/JavaObjUtils.jar', + 'org.apache.qpid.interop_test.obj_util.JavaObjToBytes', + java_class_str]) + out_str_list = out_str.split('\n')[:-1] # remove trailing \n + if out_str_list[0] != java_class_str: + raise InteropTestError('JmsSenderShim._s_get_java_obj_binary(): Call to JavaObjToBytes failed\n%s' % + out_str) + return out_str_list[1].decode('hex') + + def _create_jms_streammessage(self, test_value_type, test_value): + """Create a JMS stream message""" + if test_value_type == 'boolean': + body_list = [test_value == 'True'] + elif test_value_type == 'byte': + body_list = [byte(int(test_value, 16))] + elif test_value_type == 'bytes': + body_list = [str(test_value)] + elif test_value_type == 'char': + body_list = [char(test_value)] + elif test_value_type == 'double': + body_list = [unpack('!d', test_value[2:].decode('hex'))[0]] + elif test_value_type == 'float': + body_list = [float32(unpack('!f', test_value[2:].decode('hex'))[0])] + elif test_value_type == 'int': + body_list = [int32(int(test_value, 16))] + elif test_value_type == 'long': + body_list = [long(test_value, 16)] + elif test_value_type == 'short': + body_list = [short(int(test_value, 16))] + elif test_value_type == 'string': + body_list = [test_value] + else: + raise InteropTestError('JmsSenderShim._create_jms_streammessage: Unknown or unsupported subtype "%s"' % + test_value_type) + return Message(id=(self.sent+1), + body=body_list, + inferred=True, + annotations=create_annotation('JMS_STREAMMESSAGE_TYPE')) + + def _create_jms_textmessage(self, test_value_text): + """Create a JMS text message""" + return Message(id=(self.sent+1), + body=unicode(test_value_text), + annotations=create_annotation('JMS_TEXTMESSAGE_TYPE')) + + def _add_jms_message_headers(self, message): + """Add JMS headers to the supplied message from self.test_headers_map""" + for jms_header in self.test_headers_map.iterkeys(): + value_map = self.test_headers_map[jms_header] + value_type = value_map.keys()[0] # There is only ever one value in map + value = value_map[value_type] + if jms_header == 'JMS_TYPE_HEADER': + if value_type == 'string': + self._s_set_jms_type_header(message, value) + else: + raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' + + 'JMS_TYPE_HEADER requires value type "string", type "%s" found' % + value_type) + elif jms_header == 'JMS_CORRELATIONID_HEADER': + if value_type == 'string': + self._s_set_jms_correlation_id(message, value) + elif value_type == 'bytes': + self._s_set_jms_correlation_id(message, str(value)) + else: + raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' + + 'JMS_CORRELATIONID_HEADER requires value type "string" or "bytes", ' + + 'type "%s" found' % value_type) + elif jms_header == 'JMS_REPLYTO_HEADER': + if value_type == 'queue' or value_type == 'topic': + self._s_set_jms_reply_to(message, value_type, value) + elif value_type == 'temp_queue' or value_type == 'temp_topic': + raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' + + 'JMS_REPLYTO_HEADER type "temp_queue" or "temp_topic" not handled') + else: + raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' + + 'JMS_REPLYTO_HEADER requires value type "queue" or "topic", ' + + 'type "%s" found' % value_type) + else: + raise InteropTestError('JmsSenderShim._add_jms_message_headers(): Invalid JMS message header "%s"' % + jms_header) + + + @staticmethod + def _s_set_jms_type_header(message, message_type): + """Adds a JMS message type header""" + message._set_subject(message_type) + + @staticmethod + def _s_set_jms_correlation_id(message, correlation_id): + """Adds a JMS correlation id header""" + message._set_correlation_id(correlation_id) + message.annotations[symbol(u'x-opt-app-correlation-id')] = True + + @staticmethod + def _s_set_jms_reply_to(message, jms_destination_type_str, destination): + """Adds a JMS reply-to header""" + if jms_destination_type_str == 'queue': + message._set_reply_to(destination) + message.annotations[symbol(u'x-opt-jms-reply-to')] = byte(0) + elif jms_destination_type_str == 'topic': + message._set_reply_to(destination) + message.annotations[symbol(u'x-opt-jms-reply-to')] = byte(1) + else: + raise InteropTestError('JmsSenderShim._s_set_jms_reply_to(): ' + + 'Invalid value for jms_destination_type_str "%s"' % jms_destination_type_str) + + def _add_jms_message_properties(self, message): + """Adds message properties to the supplied message from self.test_properties_map""" + for property_name in self.test_properties_map.iterkeys(): + value_map = self.test_properties_map[property_name] + value_type = value_map.keys()[0] # There is only ever one value in map + value = value_map[value_type] + if message.properties is None: + message.properties = {} + if value_type == 'boolean': + message.properties[property_name] = value == 'True' + elif value_type == 'byte': + message.properties[property_name] = byte(int(value, 16)) + elif value_type == 'double': + message.properties[property_name] = unpack('!d', value[2:].decode('hex'))[0] + elif value_type == 'float': + message.properties[property_name] = float32(unpack('!f', value[2:].decode('hex'))[0]) + elif value_type == 'int': + message.properties[property_name] = int(value, 16) + elif value_type == 'long': + message.properties[property_name] = long(value, 16) + elif value_type == 'short': + message.properties[property_name] = short(int(value, 16)) + elif value_type == 'string': + message.properties[property_name] = value + else: + raise InteropTestError('JmsSenderShim._add_jms_message_properties: ' + + 'Unknown or unhandled message property type ?%s"' % value_type) + + + +# --- main --- +# Args: 1: Broker address (ip-addr:port) +# 2: Queue name +# 3: JMS message type +# 4: JSON Test parameters containing 3 maps: [testValueMap, testHeadersMap, testPropertiesMap] +#print '#### sys.argv=%s' % sys.argv +#print '>>> test_values=%s' % loads(sys.argv[4]) +try: + Container(JmsSenderShim(sys.argv[1], sys.argv[2], sys.argv[3], loads(sys.argv[4]))).run() +except KeyboardInterrupt: + pass +except Exception as exc: + print 'jms-sender-shim EXCEPTION:', exc + print format_exc() http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/src/python/qpid-interop-test/__init__.py ---------------------------------------------------------------------- diff --git a/src/python/qpid-interop-test/__init__.py b/src/python/qpid-interop-test/__init__.py index 7b8aee3..a94c993 100644 --- a/src/python/qpid-interop-test/__init__.py +++ b/src/python/qpid-interop-test/__init__.py @@ -19,6 +19,7 @@ import broker_properties import interop_test_errors +import shims import test_type_map import types import jms --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
