http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/AmqpReceiver.java ---------------------------------------------------------------------- diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/AmqpReceiver.java b/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/AmqpReceiver.java new file mode 100644 index 0000000..cf3ad81 --- /dev/null +++ b/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/AmqpReceiver.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.interop_test.shim; + +import java.math.BigDecimal; +import java.util.UUID; +import java.util.Vector; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; +import javax.naming.Context; +import javax.naming.InitialContext; +import org.apache.qpid.jms.JmsConnectionFactory; + +public class AmqpReceiver { + private static final String USER = "guest"; + private static final String PASSWORD = "guest"; + private static final int TIMEOUT = 1000; + private static final String[] SUPPORTED_AMQP_TYPES = {"null", + "boolean", + "ubyte", + "ushort", + "uint", + "ulong", + "byte", + "short", + "int", + "long", + "float", + "double", + "decimal32", + "decimal64", + "decimal128", + "char", + "timestamp", + "uuid", + "binary", + "string", + "symbol", + "list", + "map", + "array"}; + + public static void main(String[] args) throws Exception { + if (args.length < 4) { + System.out.println("AmqpReceiver: Insufficient number of arguments"); + System.out.println("AmqpReceiver: Expected arguments: broker_address, queue_name, amqp_type, num_test_values"); + System.exit(1); + } + String brokerAddress = "amqp://" + args[0]; + String queueName = args[1]; + String amqpType = args[2]; + int numTestValues = Integer.parseInt(args[3]); + Connection connection = null; + + try { + ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress); + + connection = factory.createConnection(USER, PASSWORD); + connection.setExceptionListener(new MyExceptionListener()); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue(queueName); + + MessageConsumer messageConsumer = session.createConsumer(queue); + + Vector<String> outList = new Vector<String>(); + outList.add(amqpType); + if (isSupportedAmqpType(amqpType)) { + int actualCount = 0; + Message message = null; + for (int i = 1; i <= numTestValues; i++, actualCount++) { + message = messageConsumer.receive(TIMEOUT); + if (message == null) + break; + switch (amqpType) { + case "null": + long bodyLength = ((BytesMessage)message).getBodyLength(); + if (bodyLength == 0L) { + outList.add("None"); + } else { + throw new Exception("AmqpReceiver: JMS BytesMessage size error: Expected 0 bytes, read " + bodyLength); + } + break; + case "boolean": + String bs = String.valueOf(((BytesMessage)message).readBoolean()); + outList.add(Character.toUpperCase(bs.charAt(0)) + bs.substring(1)); + break; + case "ubyte": + byte byteValue = ((BytesMessage)message).readByte(); + short ubyteValue = (short)(byteValue & 0xff); + outList.add(String.valueOf(ubyteValue)); + break; + case "ushort": + { + byte[] byteArray = new byte[2]; + int numBytes = ((BytesMessage)message).readBytes(byteArray); + if (numBytes != 2) { + // TODO: numBytes == -1 means no more bytes in stream - add error message for this case? + throw new Exception("AmqpReceiver: JMS BytesMessage size error: Exptected 2 bytes, read " + numBytes); + } + int ushortValue = 0; + for (int j=0; j<byteArray.length; j++) { + ushortValue = (ushortValue << 8) + (byteArray[j] & 0xff); + } + outList.add(String.valueOf(ushortValue)); + break; + } + case "uint": + { + byte[] byteArray = new byte[4]; + int numBytes = ((BytesMessage)message).readBytes(byteArray); + if (numBytes != 4) { + // TODO: numBytes == -1 means no more bytes in stream - add error message for this case? + throw new Exception("AmqpReceiver: JMS BytesMessage size error: Exptected 4 bytes, read " + numBytes); + } + long uintValue = 0; + for (int j=0; j<byteArray.length; j++) { + uintValue = (uintValue << 8) + (byteArray[j] & 0xff); + } + outList.add(String.valueOf(uintValue)); + break; + } + case "ulong": + case "timestamp": + { + // TODO: Tidy this ugliness up - perhaps use of vector<byte>? + byte[] byteArray = new byte[8]; + int numBytes = ((BytesMessage)message).readBytes(byteArray); + if (numBytes != 8) { + // TODO: numBytes == -1 means no more bytes in stream - add error message for this case? + throw new Exception("AmqpReceiver: JMS BytesMessage size error: Exptected 8 bytes, read " + numBytes); + } + // TODO: shortcut in use here - this byte array should go through a Java type that can represent this as a number - such as BigInteger. + outList.add(String.format("0x%02x%02x%02x%02x%02x%02x%02x%02x", byteArray[0], byteArray[1], + byteArray[2], byteArray[3], byteArray[4], byteArray[5], byteArray[6], byteArray[7])); + break; + } + case "byte": + outList.add(String.valueOf(((BytesMessage)message).readByte())); + break; + case "short": + outList.add(String.valueOf(((BytesMessage)message).readShort())); + break; + case "int": + outList.add(String.valueOf(((BytesMessage)message).readInt())); + break; + case "long": + outList.add(String.valueOf(((BytesMessage)message).readLong())); + break; + case "float": + float f = ((BytesMessage)message).readFloat(); + int i0 = Float.floatToRawIntBits(f); + outList.add(String.format("0x%8s", Integer.toHexString(i0)).replace(' ', '0')); + break; + case "double": + double d = ((BytesMessage)message).readDouble(); + long l = Double.doubleToRawLongBits(d); + outList.add(String.format("0x%16s", Long.toHexString(l)).replace(' ', '0')); + break; + case "decimal32": + BigDecimal bd32 = (BigDecimal)((ObjectMessage)message).getObject(); + outList.add(bd32.toString()); + break; + case "decimal64": + BigDecimal bd64 = (BigDecimal)((ObjectMessage)message).getObject(); + outList.add(bd64.toString()); + break; + case "decimal128": + BigDecimal bd128 = (BigDecimal)((ObjectMessage)message).getObject(); + outList.add(bd128.toString()); + break; + case "char": + outList.add(String.format("%c", ((BytesMessage)message).readChar())); + break; + case "uuid": + UUID uuid = (UUID)((ObjectMessage)message).getObject(); + outList.add(uuid.toString()); + break; + case "binary": + BytesMessage bm = (BytesMessage)message; + int msgLen = (int)bm.getBodyLength(); + byte[] ba = new byte[msgLen]; + if (bm.readBytes(ba) == msgLen) { + outList.add(new String(ba)); + } else { + // TODO: Raise exception or error here: size mismatch + } + break; + case "string": + outList.add(((TextMessage)message).getText()); + break; + case "symbol": + outList.add(((BytesMessage)message).readUTF()); + break; + case "list": + break; + case "map": + break; + case "array": + break; + default: + // Internal error, should never happen if SUPPORTED_AMQP_TYPES matches this case stmt + connection.close(); + throw new Exception("AmqpReceiver: Internal error: unsupported AMQP type \"" + amqpType + "\""); + } + } + } else { + System.out.println("ERROR: AmqpReceiver: AMQP type \"" + amqpType + "\" is not supported"); + connection.close(); + System.exit(1); + } + + connection.close(); + + // No exception, print results + for (int i=0; i<outList.size(); i++) { + System.out.println(outList.get(i)); + } + } catch (Exception exp) { + if (connection != null) + connection.close(); + System.out.println("Caught exception, exiting."); + exp.printStackTrace(System.out); + System.exit(1); + } + } + + protected static boolean isSupportedAmqpType(String amqpType) { + for (String supportedAmqpType: SUPPORTED_AMQP_TYPES) { + if (amqpType.equals(supportedAmqpType)) + return true; + } + return false; + } + + private static class MyExceptionListener implements ExceptionListener { + @Override + public void onException(JMSException exception) { + System.out.println("Connection ExceptionListener fired, exiting."); + exception.printStackTrace(System.out); + System.exit(1); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/AmqpSender.java ---------------------------------------------------------------------- diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/AmqpSender.java b/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/AmqpSender.java new file mode 100644 index 0000000..3fc5a90 --- /dev/null +++ b/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/AmqpSender.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.interop_test.shim; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.MathContext; +import java.util.Arrays; +import java.util.UUID; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; +import javax.naming.Context; +import javax.naming.InitialContext; +import org.apache.qpid.jms.JmsConnectionFactory; + +public class AmqpSender { + private static final String USER = "guest"; + private static final String PASSWORD = "guest"; + private static final String[] SUPPORTED_AMQP_TYPES = {"null", + "boolean", + "ubyte", + "ushort", + "uint", + "ulong", + "byte", + "short", + "int", + "long", + "float", + "double", + "decimal32", + "decimal64", + "decimal128", + "char", + "timestamp", + "uuid", + "binary", + "string", + "symbol", + "list", + "map", + "array"}; + + public static void main(String[] args) throws Exception { + if (args.length < 4) { + System.out.println("AmqpSender: Insufficient number of arguments"); + System.out.println("AmqpSender: Expected arguments: broker_address, queue_name, amqp_type, test_val, test_val, ..."); + System.exit(1); + } + String brokerAddress = "amqp://" + args[0]; + String queueName = args[1]; + String amqpType = args[2]; + String[] testValueList = Arrays.copyOfRange(args, 3, args.length); // Use remaining args as test values + + try { + ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress); + + Connection connection = factory.createConnection(); + connection.setExceptionListener(new MyExceptionListener()); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue(queueName); + + MessageProducer messageProducer = session.createProducer(queue); + + if (isSupportedAmqpType(amqpType)) { + Message message = null; + for (String testValueStr : testValueList) { + switch (amqpType) { + case "null": + message = session.createBytesMessage(); + break; + case "boolean": + message = session.createBytesMessage(); + ((BytesMessage)message).writeBoolean(Boolean.parseBoolean(testValueStr)); + break; + case "ubyte": + { + byte testValue = (byte)Short.parseShort(testValueStr); + message = session.createBytesMessage(); + ((BytesMessage)message).writeByte(testValue); + break; + } + case "ushort": + { + int testValue = Integer.parseInt(testValueStr); + byte[] byteArray = new byte[2]; + byteArray[0] = (byte)(testValue >> 8); + byteArray[1] = (byte)(testValue); + message = session.createBytesMessage(); + ((BytesMessage)message).writeBytes(byteArray); + break; + } + case "uint": + { + long testValue = Long.parseLong(testValueStr); + byte[] byteArray = new byte[4]; + byteArray[0] = (byte)(testValue >> 24); + byteArray[1] = (byte)(testValue >> 16); + byteArray[2] = (byte)(testValue >> 8); + byteArray[3] = (byte)(testValue); + message = session.createBytesMessage(); + ((BytesMessage)message).writeBytes(byteArray); + break; + } + case "ulong": + { + // TODO: Tidy this ugliness up - perhaps use of vector<byte>? + BigInteger testValue = new BigInteger(testValueStr); + byte[] bigIntArray = testValue.toByteArray(); // may be 1 to 9 bytes depending on number + byte[] byteArray = {0, 0, 0, 0, 0, 0, 0, 0}; + int effectiveBigIntArrayLen = bigIntArray.length > 8 ? 8 : bigIntArray.length; // Cap length at 8 + int bigIntArrayOffs = bigIntArray.length > 8 ? bigIntArray.length - 8 : 0; // Offset when length > 8 + for (int i=0; i<bigIntArray.length && i < 8; i++) + byteArray[8 - effectiveBigIntArrayLen + i] = bigIntArray[bigIntArrayOffs + i]; + message = session.createBytesMessage(); + ((BytesMessage)message).writeBytes(byteArray); + break; + } + case "byte": + message = session.createBytesMessage(); + ((BytesMessage)message).writeByte(Byte.parseByte(testValueStr)); + break; + case "short": + message = session.createBytesMessage(); + ((BytesMessage)message).writeShort(Short.parseShort(testValueStr)); + break; + case "int": + message = session.createBytesMessage(); + ((BytesMessage)message).writeInt(Integer.parseInt(testValueStr)); + break; + case "long": + case "timestamp": + message = session.createBytesMessage(); + ((BytesMessage)message).writeLong(Long.parseLong(testValueStr)); + break; + case "float": + Long i = Long.parseLong(testValueStr.substring(2), 16); + message = session.createBytesMessage(); + ((BytesMessage)message).writeFloat(Float.intBitsToFloat(i.intValue())); + break; + case "double": + Long l1 = Long.parseLong(testValueStr.substring(2, 3), 16) << 60; + Long l2 = Long.parseLong(testValueStr.substring(3), 16); + message = session.createBytesMessage(); + ((BytesMessage)message).writeDouble(Double.longBitsToDouble(l1 | l2)); + break; + case "decimal32": + BigDecimal bd32 = new BigDecimal(testValueStr, MathContext.DECIMAL32); + message = session.createObjectMessage(); + ((ObjectMessage)message).setObject(bd32); + break; + case "decimal64": + BigDecimal bd64 = new BigDecimal(testValueStr, MathContext.DECIMAL64); + message = session.createObjectMessage(); + ((ObjectMessage)message).setObject(bd64); + break; + case "decimal128": + BigDecimal bd128 = new BigDecimal(testValueStr, MathContext.DECIMAL128); + message = session.createObjectMessage(); + ((ObjectMessage)message).setObject(bd128); + break; + case "char": + char c = 0; + if (testValueStr.length() == 1) // Single char + c = testValueStr.charAt(0); + else if (testValueStr.length() == 6) // unicode format + c = (char)Integer.parseInt(testValueStr, 16); + message = session.createBytesMessage(); + ((BytesMessage)message).writeChar(c); + break; + case "uuid": + UUID uuid = UUID.fromString(testValueStr); + message = session.createObjectMessage(); + ((ObjectMessage)message).setObject(uuid); + break; + case "binary": + message = session.createBytesMessage(); + byte[] byteArray = testValueStr.getBytes(); + ((BytesMessage)message).writeBytes(byteArray, 0, byteArray.length); + break; + case "string": + message = session.createTextMessage(testValueStr); + break; + case "symbol": + message = session.createBytesMessage(); + ((BytesMessage)message).writeUTF(testValueStr); + break; + case "list": + break; + case "map": + break; + case "array": + break; + default: + // Internal error, should never happen if SUPPORTED_AMQP_TYPES matches this case stmt + connection.close(); + throw new Exception("AmqpSender: Internal error: unsupported AMQP type \"" + amqpType + "\""); + } + messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + } else { + System.out.println("ERROR: AmqpSender: AMQP type \"" + amqpType + "\" is not supported"); + connection.close(); + System.exit(1); + } + + connection.close(); + } catch (Exception exp) { + System.out.println("Caught exception, exiting."); + exp.printStackTrace(System.out); + System.exit(1); + } + } + + protected static boolean isSupportedAmqpType(String amqpType) { + for (String supportedAmqpType: SUPPORTED_AMQP_TYPES) { + if (amqpType.equals(supportedAmqpType)) + return true; + } + return false; + } + + private static class MyExceptionListener implements ExceptionListener { + @Override + public void onException(JMSException exception) { + System.out.println("Connection ExceptionListener fired, exiting."); + exception.printStackTrace(System.out); + System.exit(1); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/JmsReceiverShim.java ---------------------------------------------------------------------- diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/JmsReceiverShim.java b/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/JmsReceiverShim.java new file mode 100644 index 0000000..f567638 --- /dev/null +++ b/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/JmsReceiverShim.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.interop_test.shim; + +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; +import java.io.StringReader; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; +import javax.json.Json; +import javax.json.JsonArrayBuilder; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import javax.json.JsonReader; +import javax.json.JsonWriter; +import org.apache.qpid.jms.JmsConnectionFactory; + +public class JmsReceiverShim { + private static final String USER = "guest"; + private static final String PASSWORD = "guest"; + private static final int TIMEOUT = 1000; + private static final String[] SUPPORTED_JMS_MESSAGE_TYPES = {"JMS_BYTESMESSAGE_TYPE", + "JMS_MAPMESSAGE_TYPE", + "JMS_OBJECTMESSAGE_TYPE", + "JMS_STREAMMESSAGE_TYPE", + "JMS_TEXTMESSAGE_TYPE"}; + + // args[0]: Broker URL + // args[1]: Queue name + // args[2]: JMS message type + // args[3]: JSON Test number map + public static void main(String[] args) throws Exception { + if (args.length < 4) { + System.out.println("JmsReceiverShim: Insufficient number of arguments"); + System.out.println("JmsReceiverShim: Expected arguments: broker_address, queue_name, amqp_type, num_test_values"); + System.exit(1); + } + String brokerAddress = "amqp://" + args[0]; + String queueName = args[1]; + String jmsMessageType = args[2]; + if (!isSupportedJmsMessageType(jmsMessageType)) { + System.out.println("ERROR: JmsReceiverShim: unknown or unsupported JMS message type \"" + jmsMessageType + "\""); + System.exit(1); + } + + JsonReader jsonReader = Json.createReader(new StringReader(args[3])); + JsonObject numTestValuesMap = jsonReader.readObject(); + jsonReader.close(); + + Connection connection = null; + + try { + ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress); + + connection = factory.createConnection(USER, PASSWORD); + connection.setExceptionListener(new MyExceptionListener()); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue(queueName); + + MessageConsumer messageConsumer = session.createConsumer(queue); + + List<String> keyList = new ArrayList<String>(numTestValuesMap.keySet()); + Collections.sort(keyList); + + Message message = null; + JsonObjectBuilder job = Json.createObjectBuilder(); + for (String key: keyList) { + JsonArrayBuilder jab = Json.createArrayBuilder(); + for (int i=0; i<numTestValuesMap.getJsonNumber(key).intValue(); ++i) { + message = messageConsumer.receive(TIMEOUT); + if (message == null) break; + switch (jmsMessageType) { + case "JMS_BYTESMESSAGE_TYPE": + switch (key) { + case "boolean": + jab.add(((BytesMessage)message).readBoolean()?"True":"False"); + break; + case "byte": + jab.add(formatByte(((BytesMessage)message).readByte())); + break; + case "bytes": + { + byte[] bytesBuff = new byte[65536]; + int numBytesRead = ((BytesMessage)message).readBytes(bytesBuff); + if (numBytesRead >= 0) { + jab.add(new String(Arrays.copyOfRange(bytesBuff, 0, numBytesRead))); + } else { + // NOTE: For this case, an empty byte array has nothing to return + jab.add(new String()); + } + } + break; + case "char": + jab.add(formatChar(((BytesMessage)message).readChar())); + break; + case "double": + long l = Double.doubleToRawLongBits(((BytesMessage)message).readDouble()); + jab.add(String.format("0x%16s", Long.toHexString(l)).replace(' ', '0')); + break; + case "float": + int i0 = Float.floatToRawIntBits(((BytesMessage)message).readFloat()); + jab.add(String.format("0x%8s", Integer.toHexString(i0)).replace(' ', '0')); + break; + case "int": + jab.add(formatInt(((BytesMessage)message).readInt())); + break; + case "long": + jab.add(formatLong(((BytesMessage)message).readLong())); + break; + case "object": + { + byte[] bytesBuff = new byte[65536]; + int numBytesRead = ((BytesMessage)message).readBytes(bytesBuff); + if (numBytesRead >= 0) { + ByteArrayInputStream bais = new ByteArrayInputStream(Arrays.copyOfRange(bytesBuff, 0, numBytesRead)); + ObjectInputStream ois = new ObjectInputStream(bais); + Object obj = ois.readObject(); + jab.add(obj.getClass().getName() + ":" + obj.toString()); + } else { + jab.add("<object error>"); + } + } + break; + case "short": + jab.add(formatShort(((BytesMessage)message).readShort())); + break; + case "string": + jab.add(((BytesMessage)message).readUTF()); + break; + default: + throw new Exception("JmsReceiverShim: Unknown subtype for " + jmsMessageType + ": \"" + key + "\""); + } + break; + case "JMS_STREAMMESSAGE_TYPE": + switch (key) { + case "boolean": + jab.add(((StreamMessage)message).readBoolean()?"True":"False"); + break; + case "byte": + jab.add(formatByte(((StreamMessage)message).readByte())); + break; + case "bytes": + byte[] bytesBuff = new byte[65536]; + int numBytesRead = ((StreamMessage)message).readBytes(bytesBuff); + if (numBytesRead >= 0) { + jab.add(new String(Arrays.copyOfRange(bytesBuff, 0, numBytesRead))); + } else { + System.out.println("StreamMessage.readBytes() returned " + numBytesRead); + jab.add("<bytes error>"); + } + break; + case "char": + jab.add(formatChar(((StreamMessage)message).readChar())); + break; + case "double": + long l = Double.doubleToRawLongBits(((StreamMessage)message).readDouble()); + jab.add(String.format("0x%16s", Long.toHexString(l)).replace(' ', '0')); + break; + case "float": + int i0 = Float.floatToRawIntBits(((StreamMessage)message).readFloat()); + jab.add(String.format("0x%8s", Integer.toHexString(i0)).replace(' ', '0')); + break; + case "int": + jab.add(formatInt(((StreamMessage)message).readInt())); + break; + case "long": + jab.add(formatLong(((StreamMessage)message).readLong())); + break; + case "object": + Object obj = ((StreamMessage)message).readObject(); + jab.add(obj.getClass().getName() + ":" + obj.toString()); + break; + case "short": + jab.add(formatShort(((StreamMessage)message).readShort())); + break; + case "string": + jab.add(((StreamMessage)message).readString()); + break; + default: + throw new Exception("JmsReceiverShim: Unknown subtype for " + jmsMessageType + ": \"" + key + "\""); + } + break; + case "JMS_MAPMESSAGE_TYPE": + String name = String.format("%s%03d", key, i); + switch (key) { + case "boolean": + jab.add(((MapMessage)message).getBoolean(name)?"True":"False"); + break; + case "byte": + jab.add(formatByte(((MapMessage)message).getByte(name))); + break; + case "bytes": + jab.add(new String(((MapMessage)message).getBytes(name))); + break; + case "char": + jab.add(formatChar(((MapMessage)message).getChar(name))); + break; + case "double": + long l = Double.doubleToRawLongBits(((MapMessage)message).getDouble(name)); + jab.add(String.format("0x%16s", Long.toHexString(l)).replace(' ', '0')); + break; + case "float": + int i0 = Float.floatToRawIntBits(((MapMessage)message).getFloat(name)); + jab.add(String.format("0x%8s", Integer.toHexString(i0)).replace(' ', '0')); + break; + case "int": + jab.add(formatInt(((MapMessage)message).getInt(name))); + break; + case "long": + jab.add(formatLong(((MapMessage)message).getLong(name))); + break; + case "object": + Object obj = ((MapMessage)message).getObject(name); + jab.add(obj.getClass().getName() + ":" + obj.toString()); + break; + case "short": + jab.add(formatShort(((MapMessage)message).getShort(name))); + break; + case "string": + jab.add(((MapMessage)message).getString(name)); + break; + default: + throw new Exception("JmsReceiverShim: Unknown subtype for " + jmsMessageType + ": \"" + key + "\""); + } + break; + case "JMS_OBJECTMESSAGE_TYPE": + jab.add(((ObjectMessage)message).getObject().toString()); + break; + case "JMS_TEXTMESSAGE_TYPE": + jab.add(((TextMessage)message).getText()); + break; + default: + connection.close(); + throw new Exception("JmsReceiverShim: Internal error: unknown or unsupported JMS message type \"" + jmsMessageType + "\""); + } + } + job.add(key, jab); + } + connection.close(); + + System.out.println(jmsMessageType); + StringWriter out = new StringWriter(); + JsonWriter jsonWriter = Json.createWriter(out); + jsonWriter.writeObject(job.build()); + jsonWriter.close(); + System.out.println(out.toString()); + } catch (Exception exp) { + if (connection != null) + connection.close(); + System.out.println("Caught exception, exiting."); + exp.printStackTrace(System.out); + System.exit(1); + } + } + + protected static String formatByte(byte b) { + boolean neg = false; + if (b < 0) { + neg = true; + b = (byte)-b; + } + return String.format("%s0x%x", neg?"-":"", b); + } + + protected static String formatChar(char c) { + if (Character.isLetterOrDigit(c)) { + return String.format("%c", c); + } + char[] ca = {c}; + return new String(ca); + } + + protected static String formatInt(int i) { + boolean neg = false; + if (i < 0) { + neg = true; + i = -i; + } + return String.format("%s0x%x", neg?"-":"", i); + } + + protected static String formatLong(long l) { + boolean neg = false; + if (l < 0) { + neg = true; + l = -l; + } + return String.format("%s0x%x", neg?"-":"", l); + } + + protected static String formatShort(int s) { + boolean neg = false; + if (s < 0) { + neg = true; + s = -s; + } + return String.format("%s0x%x", neg?"-":"", s); + } + + protected static boolean isSupportedJmsMessageType(String jmsMessageType) { + for (String supportedJmsMessageType: SUPPORTED_JMS_MESSAGE_TYPES) { + if (jmsMessageType.equals(supportedJmsMessageType)) + return true; + } + return false; + } + + private static class MyExceptionListener implements ExceptionListener { + @Override + public void onException(JMSException exception) { + System.out.println("Connection ExceptionListener fired, exiting."); + exception.printStackTrace(System.out); + System.exit(1); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/JmsSenderShim.java ---------------------------------------------------------------------- diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/JmsSenderShim.java b/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/JmsSenderShim.java new file mode 100644 index 0000000..e22be0a --- /dev/null +++ b/shims/qpid-jms/src/main/java/org/apache/qpid/qpid_interop_test/shim/JmsSenderShim.java @@ -0,0 +1,368 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.interop_test.shim; + +import java.io.Serializable; +import java.io.StringReader; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonObject; +import javax.json.JsonReader; +import org.apache.qpid.jms.JmsConnectionFactory; + +public class JmsSenderShim { + private static final String USER = "guest"; + private static final String PASSWORD = "guest"; + private static final String[] SUPPORTED_JMS_MESSAGE_TYPES = {"JMS_BYTESMESSAGE_TYPE", + "JMS_MAPMESSAGE_TYPE", + "JMS_OBJECTMESSAGE_TYPE", + "JMS_STREAMMESSAGE_TYPE", + "JMS_TEXTMESSAGE_TYPE"}; + + // args[0]: Broker URL + // args[1]: Queue name + // args[2]: JMS message type + // args[3]: JSON Test value map + public static void main(String[] args) throws Exception { + if (args.length < 4) { + System.out.println("JmsSenderShim: Insufficient number of arguments"); + System.out.println("JmsSenderShim: Expected arguments: broker_address, queue_name, amqp_type, test_val, test_val, ..."); + System.exit(1); + } + String brokerAddress = "amqp://" + args[0]; + String queueName = args[1]; + String jmsMessageType = args[2]; + if (!isSupportedJmsMessageType(jmsMessageType)) { + System.out.println("ERROR: JmsReceiver: unknown or unsupported JMS message type \"" + jmsMessageType + "\""); + System.exit(1); + } + + JsonReader jsonReader = Json.createReader(new StringReader(args[3])); + JsonObject testValuesMap = jsonReader.readObject(); + jsonReader.close(); + + try { + ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress); + + Connection connection = factory.createConnection(); + connection.setExceptionListener(new MyExceptionListener()); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue = session.createQueue(queueName); + + MessageProducer messageProducer = session.createProducer(queue); + + Message message = null; + List<String> keyList = new ArrayList<String>(testValuesMap.keySet()); + Collections.sort(keyList); + for (String key: keyList) { + JsonArray testValues = testValuesMap.getJsonArray(key); + for (int i=0; i<testValues.size(); ++i) { + String testValue = testValues.getJsonString(i).getString(); + switch (jmsMessageType) { + case "JMS_BYTESMESSAGE_TYPE": + message = createBytesMessage(session, key, testValue); + break; + case "JMS_MAPMESSAGE_TYPE": + message = createMapMessage(session, key, testValue, i); + break; + case "JMS_OBJECTMESSAGE_TYPE": + message = createObjectMessage(session, key, testValue); + break; + case "JMS_STREAMMESSAGE_TYPE": + message = createStreamMessage(session, key, testValue); + break; + case "JMS_TEXTMESSAGE_TYPE": + message = createTextMessage(session, testValue); + break; + default: + throw new Exception("Internal exception: Unexpected JMS message type \"" + jmsMessageType + "\""); + } + messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + } + + connection.close(); + } catch (Exception exp) { + System.out.println("Caught exception, exiting."); + exp.printStackTrace(System.out); + System.exit(1); + } + } + + protected static BytesMessage createBytesMessage(Session session, String testValueType, String testValue) throws Exception, JMSException { + BytesMessage message = session.createBytesMessage(); + switch (testValueType) { + case "boolean": + message.writeBoolean(Boolean.parseBoolean(testValue)); + break; + case "byte": + message.writeByte(Byte.decode(testValue)); + break; + case "bytes": + message.writeBytes(testValue.getBytes()); + break; + case "char": + if (testValue.length() == 1) { // Char format: "X" or "\xNN" + message.writeChar(testValue.charAt(0)); + } else { + throw new Exception("JmsSenderShim.createBytesMessage() Malformed char string: \"" + testValue + "\" of length " + testValue.length()); + } + break; + case "double": + Long l1 = Long.parseLong(testValue.substring(2, 3), 16) << 60; + Long l2 = Long.parseLong(testValue.substring(3), 16); + message.writeDouble(Double.longBitsToDouble(l1 | l2)); + break; + case "float": + Long i = Long.parseLong(testValue.substring(2), 16); + message.writeFloat(Float.intBitsToFloat(i.intValue())); + break; + case "int": + message.writeInt(Integer.decode(testValue)); + break; + case "long": + message.writeLong(Long.decode(testValue)); + break; + case "object": + Object obj = (Object)createObject(testValue); + message.writeObject(obj); + break; + case "short": + message.writeShort(Short.decode(testValue)); + break; + case "string": + message.writeUTF(testValue); + break; + default: + throw new Exception("Internal exception: Unexpected JMS message sub-type \"" + testValueType + "\""); + } + return message; + } + + protected static MapMessage createMapMessage(Session session, String testValueType, String testValue, int testValueNum) throws Exception, JMSException { + MapMessage message = session.createMapMessage(); + String name = String.format("%s%03d", testValueType, testValueNum); + switch (testValueType) { + case "boolean": + message.setBoolean(name, Boolean.parseBoolean(testValue)); + break; + case "byte": + message.setByte(name, Byte.decode(testValue)); + break; + case "bytes": + message.setBytes(name, testValue.getBytes()); + break; + case "char": + if (testValue.length() == 1) { // Char format: "X" + message.setChar(name, testValue.charAt(0)); + } else if (testValue.length() == 6) { // Char format: "\xNNNN" + message.setChar(name, (char)Integer.parseInt(testValue.substring(2), 16)); + } else { + throw new Exception("JmsSenderShim.createMapMessage() Malformed char string: \"" + testValue + "\""); + } + break; + case "double": + Long l1 = Long.parseLong(testValue.substring(2, 3), 16) << 60; + Long l2 = Long.parseLong(testValue.substring(3), 16); + message.setDouble(name, Double.longBitsToDouble(l1 | l2)); + break; + case "float": + Long i = Long.parseLong(testValue.substring(2), 16); + message.setFloat(name, Float.intBitsToFloat(i.intValue())); + break; + case "int": + message.setInt(name, Integer.decode(testValue)); + break; + case "long": + message.setLong(name, Long.decode(testValue)); + break; + case "object": + Object obj = (Object)createObject(testValue); + message.setObject(name, obj); + break; + case "short": + message.setShort(name, Short.decode(testValue)); + break; + case "string": + message.setString(name, testValue); + break; + default: + throw new Exception("Internal exception: Unexpected JMS message sub-type \"" + testValueType + "\""); + } + return message; + } + + protected static ObjectMessage createObjectMessage(Session session, String className, String testValue) throws Exception, JMSException { + Serializable obj = createJavaObject(className, testValue); + if (obj == null) { + // TODO: Handle error here + System.out.println("createObjectMessage: obj == null"); + return null; + } + ObjectMessage message = session.createObjectMessage(); + message.setObject(obj); + return message; + } + + protected static StreamMessage createStreamMessage(Session session, String testValueType, String testValue) throws Exception, JMSException { + StreamMessage message = session.createStreamMessage(); + switch (testValueType) { + case "boolean": + message.writeBoolean(Boolean.parseBoolean(testValue)); + break; + case "byte": + message.writeByte(Byte.decode(testValue)); + break; + case "bytes": + message.writeBytes(testValue.getBytes()); + break; + case "char": + if (testValue.length() == 1) { // Char format: "X" + message.writeChar(testValue.charAt(0)); + } else if (testValue.length() == 6) { // Char format: "\xNNNN" + message.writeChar((char)Integer.parseInt(testValue.substring(2), 16)); + } else { + throw new Exception("JmsSenderShim.createStreamMessage() Malformed char string: \"" + testValue + "\""); + } + break; + case "double": + Long l1 = Long.parseLong(testValue.substring(2, 3), 16) << 60; + Long l2 = Long.parseLong(testValue.substring(3), 16); + message.writeDouble(Double.longBitsToDouble(l1 | l2)); + break; + case "float": + Long i = Long.parseLong(testValue.substring(2), 16); + message.writeFloat(Float.intBitsToFloat(i.intValue())); + break; + case "int": + message.writeInt(Integer.decode(testValue)); + break; + case "long": + message.writeLong(Long.decode(testValue)); + break; + case "object": + Object obj = (Object)createObject(testValue); + message.writeObject(obj); + break; + case "short": + message.writeShort(Short.decode(testValue)); + break; + case "string": + message.writeString(testValue); + break; + default: + throw new Exception("Internal exception: Unexpected JMS message sub-type \"" + testValueType + "\""); + } + return message; + } + + protected static Serializable createJavaObject(String className, String testValue) throws Exception { + Serializable obj = null; + try { + Class<?> c = Class.forName(className); + if (className.compareTo("java.lang.Character") == 0) { + Constructor ctor = c.getConstructor(char.class); + if (testValue.length() == 1) { + // Use first character of string + obj = (Serializable)ctor.newInstance(testValue.charAt(0)); + } else if (testValue.length() == 4 || testValue.length() == 6) { + // Format '\xNN' or '\xNNNN' + obj = (Serializable)ctor.newInstance((char)Integer.parseInt(testValue.substring(2), 16)); + } else { + throw new Exception("JmsSenderShim.createStreamMessage() Malformed char string: \"" + testValue + "\""); + } + } else { + // Use string constructor + Constructor ctor = c.getConstructor(String.class); + obj = (Serializable)ctor.newInstance(testValue); + } + } + catch (ClassNotFoundException e) { + e.printStackTrace(System.out); + } + catch (NoSuchMethodException e) { + e.printStackTrace(System.out); + } + catch (InstantiationException e) { + e.printStackTrace(System.out); + } + catch (IllegalAccessException e) { + e.printStackTrace(System.out); + } + catch (InvocationTargetException e) { + e.printStackTrace(System.out); + } + return obj; + } + + // value has format "classname:ctorstrvalue" + protected static Serializable createObject(String value) throws Exception { + Serializable obj = null; + int colonIndex = value.indexOf(":"); + if (colonIndex >= 0) { + String className = value.substring(0, colonIndex); + String testValue = value.substring(colonIndex+1); + obj = createJavaObject(className, testValue); + } else { + throw new Exception("createObject(): Malformed value string"); + } + return obj; + } + + protected static TextMessage createTextMessage(Session session, String valueStr) throws JMSException { + return session.createTextMessage(valueStr); + } + + protected static boolean isSupportedJmsMessageType(String jmsMessageType) { + for (String supportedJmsMessageType: SUPPORTED_JMS_MESSAGE_TYPES) { + if (jmsMessageType.equals(supportedJmsMessageType)) + return true; + } + return false; + } + + private static class MyExceptionListener implements ExceptionListener { + @Override + public void onException(JMSException exception) { + System.out.println("Connection ExceptionListener fired, exiting."); + exception.printStackTrace(System.out); + System.exit(1); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/shims/qpid-proton-cpp/.gitignore ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/.gitignore b/shims/qpid-proton-cpp/.gitignore new file mode 100644 index 0000000..84c048a --- /dev/null +++ b/shims/qpid-proton-cpp/.gitignore @@ -0,0 +1 @@ +/build/ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.cpp b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.cpp index 0965092..fb76f0e 100644 --- a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.cpp +++ b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.cpp @@ -25,7 +25,8 @@ #include <json/json.h> #include "proton/connection.hpp" #include "proton/container.hpp" -#include <proton/types.hpp> +#include "proton/delivery.hpp" +#include "proton/receiver.hpp" #include "qpidit/QpidItErrors.hpp" namespace qpidit @@ -50,7 +51,7 @@ namespace qpidit } void AmqpReceiver::on_container_start(proton::container &c) { - _receiver = c.open_receiver(_brokerUrl); + /*_receiver = */c.open_receiver(_brokerUrl); } void AmqpReceiver::on_message(proton::delivery &d, proton::message &m) { @@ -148,7 +149,7 @@ namespace qpidit } _received++; if (_received >= _expected) { - d.link().close(); + d.receiver().close(); d.connection().close(); } } @@ -165,8 +166,8 @@ namespace qpidit std::cerr << "AmqpReceiver:on_transport_error()" << std::endl; } - void AmqpReceiver::on_unhandled_error(const proton::condition &c) { - std::cerr << "AmqpReceiver:on_unhandled_error() condition=" << c.name() << std::endl; + void AmqpReceiver::on_unhandled_error(const proton::error_condition &c) { + std::cerr << "AmqpReceiver:on_unhandled_error() name=" << c.name() << " description=" << c.description() << std::endl; } // protected http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.hpp b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.hpp index 812043f..c11fbaf 100644 --- a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.hpp +++ b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.hpp @@ -25,7 +25,7 @@ #include <iomanip> #include <json/value.h> #include "proton/handler.hpp" -#include "proton/receiver.hpp" +#include "proton/types.hpp" #include <sstream> namespace qpidit @@ -38,7 +38,6 @@ namespace qpidit protected: const std::string _brokerUrl; const std::string _amqpType; - proton::receiver _receiver; uint32_t _expected; uint32_t _received; Json::Value _receivedValueList; @@ -52,7 +51,7 @@ namespace qpidit void on_connection_error(proton::connection &c); void on_sender_error(proton::sender& l); void on_transport_error(proton::transport &t); - void on_unhandled_error(const proton::condition &c); + void on_unhandled_error(const proton::error_condition &c); protected: static void checkMessageType(const proton::message& msg, proton::type_id msgType); static Json::Value& getMap(Json::Value& jsonMap, const proton::value& val); http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.cpp b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.cpp index a4015eb..960308e 100644 --- a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.cpp +++ b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.cpp @@ -25,7 +25,8 @@ #include <json/json.h> #include "proton/connection.hpp" #include "proton/container.hpp" -#include "proton/decimal.hpp" +#include "proton/sender.hpp" +#include "proton/tracker.hpp" namespace qpidit { @@ -65,10 +66,10 @@ namespace qpidit } } - void AmqpSender::on_delivery_accept(proton::delivery &d) { + void AmqpSender::on_tracker_accept(proton::tracker &t) { _msgsConfirmed++; if (_msgsConfirmed == _totalMsgs) { - d.connection().close(); + t.connection().close(); } } @@ -88,8 +89,8 @@ namespace qpidit std::cerr << "AmqpSender:on_transport_error()" << std::endl; } - void AmqpSender::on_unhandled_error(const proton::condition &c) { - std::cerr << "AmqpSender:on_unhandled_error()" << " condition=" << c.name() << std::endl; + void AmqpSender::on_unhandled_error(const proton::error_condition &c) { + std::cerr << "AmqpSender:on_unhandled_error()" << " name=" << c.name() << " description=" << c.description() << std::endl; } // protected http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.hpp b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.hpp index 90ec217..f27c371 100644 --- a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.hpp +++ b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.hpp @@ -47,13 +47,13 @@ namespace qpidit virtual ~AmqpSender(); void on_container_start(proton::container &c); void on_sendable(proton::sender &s); - void on_delivery_accept(proton::delivery &d); + void on_tracker_accept(proton::tracker &t); void on_transport_close(proton::transport &t); void on_connection_error(proton::connection &c); - void on_sender_error(proton::sender& l); + void on_sender_error(proton::sender& s); void on_transport_error(proton::transport &t); - void on_unhandled_error(const proton::condition &c); + void on_unhandled_error(const proton::error_condition &c); protected: proton::message& setMessage(proton::message& msg, const Json::Value& testValue); http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp index 0fc700d..f062577 100644 --- a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp +++ b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp @@ -23,9 +23,9 @@ #include <iostream> #include <json/json.h> -#include <map> #include "proton/connection.hpp" #include "proton/container.hpp" +#include "proton/delivery.hpp" #include "qpidit/QpidItErrors.hpp" namespace qpidit @@ -44,7 +44,6 @@ namespace qpidit _brokerUrl(brokerUrl), _jmsMessageType(jmsMessageType), _testNumberMap(testNumberMap), - _receiver(), _subTypeList(testNumberMap.getMemberNames()), _subTypeIndex(0), _expected(getTotalNumExpectedMsgs(testNumberMap)), @@ -60,7 +59,7 @@ namespace qpidit } void JmsReceiver::on_container_start(proton::container &c) { - _receiver = c.open_receiver(_brokerUrl); + c.open_receiver(_brokerUrl); } void JmsReceiver::on_message(proton::delivery &d, proton::message &m) { @@ -97,8 +96,7 @@ namespace qpidit } _received++; if (_received >= _expected) { - _receiver.close(); - d.link().close(); + d.receiver().close(); d.connection().close(); } } http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp index ffaf1ff..198a957 100644 --- a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp +++ b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp @@ -25,7 +25,6 @@ #include <iomanip> #include <json/value.h> #include "proton/handler.hpp" -#include "proton/receiver.hpp" #include "proton/types.hpp" #include <sstream> @@ -43,7 +42,6 @@ namespace qpidit const std::string _brokerUrl; const std::string _jmsMessageType; const Json::Value _testNumberMap; - proton::receiver _receiver; Json::Value::Members _subTypeList; int _subTypeIndex; uint32_t _expected; http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.cpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.cpp index 3b03ef5..102fc1d 100644 --- a/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.cpp +++ b/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.cpp @@ -22,12 +22,12 @@ #include "qpidit/shim/JmsSender.hpp" #include <cerrno> -#include <iostream> #include <iomanip> +#include <iostream> #include <json/json.h> #include "proton/connection.hpp" #include "proton/container.hpp" -#include "qpidit/QpidItErrors.hpp" +#include "proton/tracker.hpp" #include <stdio.h> namespace qpidit @@ -72,11 +72,10 @@ namespace qpidit } } - void JmsSender::on_delivery_accept(proton::delivery &d) { + void JmsSender::on_tracker_accept(proton::tracker &t) { _msgsConfirmed++; if (_msgsConfirmed == _totalMsgs) { - d.link().close(); - d.connection().close(); + t.connection().close(); } } http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.hpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.hpp index 962f012..f953b43 100644 --- a/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.hpp +++ b/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.hpp @@ -53,7 +53,7 @@ namespace qpidit virtual ~JmsSender(); void on_container_start(proton::container &c); void on_sendable(proton::sender &s); - void on_delivery_accept(proton::delivery &d); + void on_tracker_accept(proton::tracker &t); void on_transport_close(proton::transport &t); protected: void sendMessages(proton::sender &s, const std::string& subType, const Json::Value& testValueMap); http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/src/main/java/org/apache/qpid/interop_test/obj_util/BytesToJavaObj.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/qpid/interop_test/obj_util/BytesToJavaObj.java b/src/main/java/org/apache/qpid/interop_test/obj_util/BytesToJavaObj.java deleted file mode 100644 index 8c461ce..0000000 --- a/src/main/java/org/apache/qpid/interop_test/obj_util/BytesToJavaObj.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.interop_test.obj_util; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectInputStream; -import java.io.Serializable; - -public class BytesToJavaObj { - String hexObjStr = null; - Serializable obj = null; - - public BytesToJavaObj(String hexObjStr) { - this.hexObjStr = hexObjStr; - } - - public String run() { - byte[] bytes = hexStrToByteArray(this.hexObjStr); - this.obj = byteArrayToObject(bytes); - if (this.obj != null) { - return this.obj.getClass().getName() + ":" + this.obj.toString(); - } - return "<null>"; - } - - protected Serializable byteArrayToObject(byte[] bytes) { - ByteArrayInputStream bis = new ByteArrayInputStream(bytes); - ObjectInput in = null; - try { - in = new ObjectInputStream(bis); - return (Serializable) in.readObject(); - } catch (ClassNotFoundException e) { - e.printStackTrace(System.out); - } catch (IOException e) { - e.printStackTrace(System.out); - } finally { - try { - bis.close(); - } catch (IOException e) {} // ignore - try { - in.close(); - } catch (IOException e) {} // ignore - } - return null; - } - - protected byte[] hexStrToByteArray(String hexStr) { - int len = hexStr.length(); - byte[] data = new byte[len / 2]; - for(int i=0; i<len; i+=2) { - data[i/2] = (byte)((Character.digit(hexStr.charAt(i), 16) << 4) + Character.digit(hexStr.charAt(i+1), 16)); - } - return data; - } - - // ========= main ========== - - public static void main(String[] args) { - if (args.length != 1) { - System.out.println("BytesToJavaObj: Incorrect argument count"); - System.out.println("BytesToJavaObj: Expected argument: \"<java_serialized_obj_str_hex>\""); - System.exit(1); - } - BytesToJavaObj btjo = new BytesToJavaObj(args[0]); - System.out.println(btjo.run()); - } -} http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/src/main/java/org/apache/qpid/interop_test/obj_util/JavaObjToBytes.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/qpid/interop_test/obj_util/JavaObjToBytes.java b/src/main/java/org/apache/qpid/interop_test/obj_util/JavaObjToBytes.java deleted file mode 100644 index 2bfbde0..0000000 --- a/src/main/java/org/apache/qpid/interop_test/obj_util/JavaObjToBytes.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.interop_test.obj_util; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutput; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -//import java.util.Arrays; - -public class JavaObjToBytes { - String javaClassName = null; - String ctorArgStr = null; - Serializable obj = null; - - public JavaObjToBytes(String javaClassName, String ctorArgStr) { - this.javaClassName = javaClassName; - this.ctorArgStr = ctorArgStr; - } - - public byte[] run() { - createJavaObject(); - return serializeJavaOjbect(); - } - - protected void createJavaObject() { - try { - Class<?> c = Class.forName(this.javaClassName); - if (this.javaClassName.compareTo("java.lang.Character") == 0) { - Constructor ctor = c.getConstructor(char.class); - if (this.ctorArgStr.length() == 1) { - // Use first character of string - obj = (Serializable)ctor.newInstance(this.ctorArgStr.charAt(0)); - } else if (this.ctorArgStr.length() == 4 || this.ctorArgStr.length() == 6) { - // Format '\xNN' or '\xNNNN' - obj = (Serializable)ctor.newInstance((char)Integer.parseInt(this.ctorArgStr.substring(2), 16)); - } else { - throw new Exception("JavaObjToBytes.createJavaObject() Malformed char string: \"" + this.ctorArgStr + "\""); - } - } else { - // Use string constructor - Constructor ctor = c.getConstructor(String.class); - obj = (Serializable)ctor.newInstance(this.ctorArgStr); - } - } - catch (ClassNotFoundException e) { - e.printStackTrace(System.out); - } - catch (NoSuchMethodException e) { - e.printStackTrace(System.out); - } - catch (InstantiationException e) { - e.printStackTrace(System.out); - } - catch (IllegalAccessException e) { - e.printStackTrace(System.out); - } - catch (InvocationTargetException e) { - e.printStackTrace(System.out); - } - catch (Exception e) { - e.printStackTrace(System.out); - } - } - - protected byte[] serializeJavaOjbect() { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ObjectOutput out = null; - try { - out = new ObjectOutputStream(bos); - out.writeObject(this.obj); - return bos.toByteArray(); - } catch (IOException e) { - e.printStackTrace(System.out); - } finally { - try { - if (out != null) { - out.close(); - } - } catch (IOException e) {} // ignore - try { - bos.close(); - } catch (IOException e) {} // ignore - } - return null; - } - - // ========= main ========== - - public static void main(String[] args) { - if (args.length != 1) { - System.out.println("JavaObjToBytes: Incorrect argument count"); - System.out.println("JavaObjToBytes: Expected argument: \"<java_class_name>:<ctor_arg_str>\""); - System.exit(1); - } - int colonIndex = args[0].indexOf(":"); - if (colonIndex < 0) { - System.out.println("Error: Incorect argument format: " + args[0]); - System.exit(-1); - } - String javaClassName = args[0].substring(0, colonIndex); - String ctorArgStr = args[0].substring(colonIndex+1); - JavaObjToBytes jotb = new JavaObjToBytes(javaClassName, ctorArgStr); - byte[] bytes = jotb.run(); - System.out.println(args[0]); - for (byte b: bytes) { - System.out.print(String.format("%02x", b)); - } - System.out.println(); - } -} - http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/src/py/qpid-interop-test/.gitignore ---------------------------------------------------------------------- diff --git a/src/py/qpid-interop-test/.gitignore b/src/py/qpid-interop-test/.gitignore deleted file mode 100644 index b3f6765..0000000 --- a/src/py/qpid-interop-test/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/interop_test_errors.pyc -/shim_utils.pyc http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/src/py/qpid-interop-test/__init__.py ---------------------------------------------------------------------- diff --git a/src/py/qpid-interop-test/__init__.py b/src/py/qpid-interop-test/__init__.py deleted file mode 100644 index 7b8aee3..0000000 --- a/src/py/qpid-interop-test/__init__.py +++ /dev/null @@ -1,25 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import broker_properties -import interop_test_errors -import test_type_map -import types -import jms - http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/src/py/qpid-interop-test/broker_properties.py ---------------------------------------------------------------------- diff --git a/src/py/qpid-interop-test/broker_properties.py b/src/py/qpid-interop-test/broker_properties.py deleted file mode 100644 index 08cc9cc..0000000 --- a/src/py/qpid-interop-test/broker_properties.py +++ /dev/null @@ -1,54 +0,0 @@ -""" -Module containing a small client which connects to the broker and -gets the broker connection properties so as to identify the broker. -""" - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from proton.handlers import MessagingHandler -from proton.reactor import Container - -class Client(MessagingHandler): - """ - Client to connect to broker and collect connection properties, used to identify the test broker - """ - def __init__(self, url): - super(Client, self).__init__() - self.url = url - self.remote_properties = None - - def on_connection_remote_open(self, event): - self.remote_properties = event.connection.remote_properties - event.connection.close() - - def on_start(self, event): - """Event loop start""" - event.container.connect(url=self.url) - - def get_connection_properties(self): - """Return the connection properties""" - return self.remote_properties - - -def getBrokerProperties(broker_url): - """Start client, then return its connection properties""" - MSG_HANDLER = Client(broker_url) - Container(MSG_HANDLER).run() - return MSG_HANDLER.get_connection_properties() http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/f4b87a1e/src/py/qpid-interop-test/interop_test_errors.py ---------------------------------------------------------------------- diff --git a/src/py/qpid-interop-test/interop_test_errors.py b/src/py/qpid-interop-test/interop_test_errors.py deleted file mode 100644 index 6be8959..0000000 --- a/src/py/qpid-interop-test/interop_test_errors.py +++ /dev/null @@ -1,29 +0,0 @@ -""" -Module containing Error classes for interop testing -""" - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -class InteropTestError(StandardError): - """ - Generic simple error class for use in interop tests - """ - def __init__(self, error_message): - super(InteropTestError, self).__init__(error_message) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
