Repository: activemq-artemis Updated Branches: refs/heads/master 6f6d9845f -> 67f804054
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java new file mode 100644 index 0000000..2ece01d --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java @@ -0,0 +1,952 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.converter.message; + +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY; +import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import javax.jms.JMSException; + +import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; +import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; +import org.apache.activemq.artemis.utils.IDGenerator; +import org.apache.activemq.artemis.utils.SimpleIDGenerator; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.message.Message; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +public class JMSMappingOutboundTransformerTest { + + private final UUID TEST_OBJECT_VALUE = UUID.fromString("fee14b62-09e0-4ac6-a4c3-4206c630d844"); + private final String TEST_ADDRESS = "queue://testAddress"; + + private IDGenerator idGenerator; + private JMSMappingOutboundTransformer transformer; + + public static final byte QUEUE_TYPE = 0x00; + public static final byte TOPIC_TYPE = 0x01; + public static final byte TEMP_QUEUE_TYPE = 0x02; + public static final byte TEMP_TOPIC_TYPE = 0x03; + + @Before + public void setUp() { + idGenerator = new SimpleIDGenerator(0); + transformer = new JMSMappingOutboundTransformer(idGenerator); + } + + // ----- no-body Message type tests ---------------------------------------// + + @Test + public void testConvertMessageToAmqpMessageWithNoBody() throws Exception { + ServerJMSMessage outbound = createMessage(); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNull(amqp.getBody()); + } + + @Test + public void testConvertTextMessageToAmqpMessageWithNoBodyOriginalEncodingWasNull() throws Exception { + ServerJMSTextMessage outbound = createTextMessage(); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNull(amqp.getBody()); + } + + // ----- BytesMessage type tests ---------------------------------------// + + @Test + public void testConvertEmptyBytesMessageToAmqpMessageWithDataBody() throws Exception { + ServerJMSBytesMessage outbound = createBytesMessage(); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof Data); + assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary); + assertEquals(0, ((Data) amqp.getBody()).getValue().getLength()); + } + + @Test + public void testConvertUncompressedBytesMessageToAmqpMessageWithDataBody() throws Exception { + byte[] expectedPayload = new byte[] {8, 16, 24, 32}; + ServerJMSBytesMessage outbound = createBytesMessage(); + outbound.writeBytes(expectedPayload); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof Data); + assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary); + assertEquals(4, ((Data) amqp.getBody()).getValue().getLength()); + + Binary amqpData = ((Data) amqp.getBody()).getValue(); + Binary inputData = new Binary(expectedPayload); + + assertTrue(inputData.equals(amqpData)); + } + + @Ignore("Compressed message body support not yet implemented.") + @Test + public void testConvertCompressedBytesMessageToAmqpMessageWithDataBody() throws Exception { + byte[] expectedPayload = new byte[] {8, 16, 24, 32}; + ServerJMSBytesMessage outbound = createBytesMessage(true); + outbound.writeBytes(expectedPayload); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof Data); + assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary); + assertEquals(4, ((Data) amqp.getBody()).getValue().getLength()); + + Binary amqpData = ((Data) amqp.getBody()).getValue(); + Binary inputData = new Binary(expectedPayload); + + assertTrue(inputData.equals(amqpData)); + } + + @Test + public void testConvertEmptyBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception { + ServerJMSBytesMessage outbound = createBytesMessage(); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary); + assertEquals(0, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength()); + } + + @Test + public void testConvertUncompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception { + byte[] expectedPayload = new byte[] {8, 16, 24, 32}; + ServerJMSBytesMessage outbound = createBytesMessage(); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); + outbound.writeBytes(expectedPayload); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary); + assertEquals(4, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength()); + + Binary amqpData = (Binary) ((AmqpValue) amqp.getBody()).getValue(); + Binary inputData = new Binary(expectedPayload); + + assertTrue(inputData.equals(amqpData)); + } + + @Ignore("Compressed message body support not yet implemented.") + @Test + public void testConvertCompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception { + byte[] expectedPayload = new byte[] {8, 16, 24, 32}; + ServerJMSBytesMessage outbound = createBytesMessage(true); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); + outbound.writeBytes(expectedPayload); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary); + assertEquals(4, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength()); + + Binary amqpData = (Binary) ((AmqpValue) amqp.getBody()).getValue(); + Binary inputData = new Binary(expectedPayload); + + assertTrue(inputData.equals(amqpData)); + } + + // ----- MapMessage type tests --------------------------------------------// + + @Test + public void testConvertMapMessageToAmqpMessageWithNoBody() throws Exception { + ServerJMSMapMessage outbound = createMapMessage(); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map); + } + + @Test + public void testConvertMapMessageToAmqpMessageWithByteArrayValueInBody() throws Exception { + final byte[] byteArray = new byte[] {1, 2, 3, 4, 5}; + + ServerJMSMapMessage outbound = createMapMessage(); + outbound.setBytes("bytes", byteArray); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue(); + + assertEquals(1, amqpMap.size()); + Binary readByteArray = (Binary) amqpMap.get("bytes"); + assertNotNull(readByteArray); + } + + @Test + public void testConvertMapMessageToAmqpMessage() throws Exception { + ServerJMSMapMessage outbound = createMapMessage(); + outbound.setString("property-1", "string"); + outbound.setInt("property-2", 1); + outbound.setBoolean("property-3", true); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue(); + + assertEquals(3, amqpMap.size()); + assertTrue("string".equals(amqpMap.get("property-1"))); + } + + @Test + public void testConvertCompressedMapMessageToAmqpMessage() throws Exception { + ServerJMSMapMessage outbound = createMapMessage(true); + outbound.setString("property-1", "string"); + outbound.setInt("property-2", 1); + outbound.setBoolean("property-3", true); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map); + + @SuppressWarnings("unchecked") + Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue(); + + assertEquals(3, amqpMap.size()); + assertTrue("string".equals(amqpMap.get("property-1"))); + } + + // ----- StreamMessage type tests -----------------------------------------// + + @Test + public void testConvertStreamMessageToAmqpMessageWithAmqpValueBody() throws Exception { + ServerJMSStreamMessage outbound = createStreamMessage(); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof List); + } + + @Test + public void testConvertStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception { + ServerJMSStreamMessage outbound = createStreamMessage(); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpSequence); + assertTrue(((AmqpSequence) amqp.getBody()).getValue() instanceof List); + } + + @Test + public void testConvertCompressedStreamMessageToAmqpMessageWithAmqpValueBody() throws Exception { + ServerJMSStreamMessage outbound = createStreamMessage(true); + outbound.writeBoolean(false); + outbound.writeString("test"); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof List); + + @SuppressWarnings("unchecked") + List<Object> amqpList = (List<Object>) ((AmqpValue) amqp.getBody()).getValue(); + + assertEquals(2, amqpList.size()); + } + + @Test + public void testConvertCompressedStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception { + ServerJMSStreamMessage outbound = createStreamMessage(true); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE); + outbound.writeBoolean(false); + outbound.writeString("test"); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpSequence); + assertTrue(((AmqpSequence) amqp.getBody()).getValue() instanceof List); + + @SuppressWarnings("unchecked") + List<Object> amqpList = ((AmqpSequence) amqp.getBody()).getValue(); + + assertEquals(2, amqpList.size()); + } + + // ----- ObjectMessage type tests -----------------------------------------// + + @Test + public void testConvertEmptyObjectMessageToAmqpMessageWithDataBody() throws Exception { + ServerJMSObjectMessage outbound = createObjectMessage(); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof Data); + assertEquals(5, ((Data) amqp.getBody()).getValue().getLength()); + } + + @Test + public void testConvertEmptyObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception { + ServerJMSObjectMessage outbound = createObjectMessage(); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof Data); + assertEquals(5, ((Data) amqp.getBody()).getValue().getLength()); + } + + @Test + public void testConvertEmptyObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception { + ServerJMSObjectMessage outbound = createObjectMessage(); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary); + assertEquals(5, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength()); + } + + @Test + public void testConvertObjectMessageToAmqpMessageWithDataBody() throws Exception { + ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof Data); + assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength()); + + Object value = deserialize(((Data) amqp.getBody()).getValue().getArray()); + assertNotNull(value); + assertTrue(value instanceof UUID); + } + + @Test + public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception { + ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof Data); + assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength()); + + Object value = deserialize(((Data) amqp.getBody()).getValue().getArray()); + assertNotNull(value); + assertTrue(value instanceof UUID); + } + + @Test + public void testConvertObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception { + ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary); + assertFalse(0 == ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength()); + + Object value = deserialize(((Binary) ((AmqpValue) amqp.getBody()).getValue()).getArray()); + assertNotNull(value); + assertTrue(value instanceof UUID); + } + + @Test + public void testConvertCompressedObjectMessageToAmqpMessageWithDataBody() throws Exception { + ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof Data); + assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength()); + + Object value = deserialize(((Data) amqp.getBody()).getValue().getArray()); + assertNotNull(value); + assertTrue(value instanceof UUID); + } + + @Test + public void testConvertCompressedObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception { + ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof Data); + assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength()); + + Object value = deserialize(((Data) amqp.getBody()).getValue().getArray()); + assertNotNull(value); + assertTrue(value instanceof UUID); + } + + @Test + public void testConvertCompressedObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception { + ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary); + assertFalse(0 == ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength()); + + Object value = deserialize(((Binary) ((AmqpValue) amqp.getBody()).getValue()).getArray()); + assertNotNull(value); + assertTrue(value instanceof UUID); + } + + // ----- TextMessage type tests -------------------------------------------// + + @Test + public void testConvertTextMessageToAmqpMessageWithNoBody() throws Exception { + ServerJMSTextMessage outbound = createTextMessage(); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertNull(((AmqpValue) amqp.getBody()).getValue()); + } + + @Test + public void testConvertTextMessageCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception { + String contentString = "myTextMessageContent"; + ServerJMSTextMessage outbound = createTextMessage(contentString); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof Data); + assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary); + + Binary data = ((Data) amqp.getBody()).getValue(); + String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8); + assertEquals(contentString, contents); + } + + @Test + public void testConvertTextMessageContentNotStoredCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception { + String contentString = "myTextMessageContent"; + ServerJMSTextMessage outbound = createTextMessage(contentString); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof Data); + assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary); + + Binary data = ((Data) amqp.getBody()).getValue(); + String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8); + assertEquals(contentString, contents); + } + + @Test + public void testConvertTextMessageCreatesAmqpValueStringBody() throws Exception { + String contentString = "myTextMessageContent"; + ServerJMSTextMessage outbound = createTextMessage(contentString); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertEquals(contentString, ((AmqpValue) amqp.getBody()).getValue()); + } + + @Test + public void testConvertTextMessageContentNotStoredCreatesAmqpValueStringBody() throws Exception { + String contentString = "myTextMessageContent"; + ServerJMSTextMessage outbound = createTextMessage(contentString); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertEquals(contentString, ((AmqpValue) amqp.getBody()).getValue()); + } + + @Test + public void testConvertCompressedTextMessageCreatesDataSectionBody() throws Exception { + String contentString = "myTextMessageContent"; + ServerJMSTextMessage outbound = createTextMessage(contentString, true); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA); + outbound.encode(); + + EncodedMessage encoded = transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof Data); + assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary); + + Binary data = ((Data) amqp.getBody()).getValue(); + String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8); + assertEquals(contentString, contents); + } + + // ----- Test JMSDestination Handling -------------------------------------// + + @Test + public void testConvertMessageWithJMSDestinationNull() throws Exception { + doTestConvertMessageWithJMSDestination(null, null); + } + + @Test + public void testConvertMessageWithJMSDestinationQueue() throws Exception { + doTestConvertMessageWithJMSDestination(createDestination(QUEUE_TYPE), QUEUE_TYPE); + } + + @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP") + @Test + public void testConvertMessageWithJMSDestinationTemporaryQueue() throws Exception { + doTestConvertMessageWithJMSDestination(createDestination(TEMP_QUEUE_TYPE), TEMP_QUEUE_TYPE); + } + + @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP") + @Test + public void testConvertMessageWithJMSDestinationTopic() throws Exception { + doTestConvertMessageWithJMSDestination(createDestination(TOPIC_TYPE), TOPIC_TYPE); + } + + @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP") + @Test + public void testConvertMessageWithJMSDestinationTemporaryTopic() throws Exception { + doTestConvertMessageWithJMSDestination(createDestination(TEMP_TOPIC_TYPE), TEMP_TOPIC_TYPE); + } + + private void doTestConvertMessageWithJMSDestination(ServerDestination jmsDestination, Object expectedAnnotationValue) throws Exception { + ServerJMSTextMessage textMessage = createTextMessage(); + textMessage.setText("myTextMessageContent"); + textMessage.setJMSDestination(jmsDestination); + + EncodedMessage encoded = transform(textMessage); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + MessageAnnotations ma = amqp.getMessageAnnotations(); + Map<Symbol, Object> maMap = ma == null ? null : ma.getValue(); + if (maMap != null) { + Object actualValue = maMap.get(JMSMappingOutboundTransformer.JMS_DEST_TYPE_MSG_ANNOTATION); + assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue); + } else if (expectedAnnotationValue != null) { + fail("Expected annotation value, but there were no annotations"); + } + + if (jmsDestination != null) { + assertEquals("Unexpected 'to' address", jmsDestination.getAddress(), amqp.getAddress()); + } + } + + // ----- Test JMSReplyTo Handling -----------------------------------------// + + @Test + public void testConvertMessageWithJMSReplyToNull() throws Exception { + doTestConvertMessageWithJMSReplyTo(null, null); + } + + @Test + public void testConvertMessageWithJMSReplyToQueue() throws Exception { + doTestConvertMessageWithJMSReplyTo(createDestination(QUEUE_TYPE), QUEUE_TYPE); + } + + @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP") + @Test + public void testConvertMessageWithJMSReplyToTemporaryQueue() throws Exception { + doTestConvertMessageWithJMSReplyTo(createDestination(TEMP_QUEUE_TYPE), TEMP_QUEUE_TYPE); + } + + @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP") + @Test + public void testConvertMessageWithJMSReplyToTopic() throws Exception { + doTestConvertMessageWithJMSReplyTo(createDestination(TOPIC_TYPE), TOPIC_TYPE); + } + + @Ignore("Artemis code doesn't provide a means of supplying a typed destination to AMQP") + @Test + public void testConvertMessageWithJMSReplyToTemporaryTopic() throws Exception { + doTestConvertMessageWithJMSReplyTo(createDestination(TEMP_TOPIC_TYPE), TEMP_TOPIC_TYPE); + } + + private void doTestConvertMessageWithJMSReplyTo(ServerDestination jmsReplyTo, Object expectedAnnotationValue) throws Exception { + ServerJMSTextMessage textMessage = createTextMessage(); + textMessage.setText("myTextMessageContent"); + textMessage.setJMSReplyTo(jmsReplyTo); + + EncodedMessage encoded = transform(textMessage); + assertNotNull(encoded); + + Message amqp = encoded.decode(); + + MessageAnnotations ma = amqp.getMessageAnnotations(); + Map<Symbol, Object> maMap = ma == null ? null : ma.getValue(); + if (maMap != null) { + Object actualValue = maMap.get(JMSMappingOutboundTransformer.JMS_REPLY_TO_TYPE_MSG_ANNOTATION); + assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue); + } else if (expectedAnnotationValue != null) { + fail("Expected annotation value, but there were no annotations"); + } + + if (jmsReplyTo != null) { + assertEquals("Unexpected 'reply-to' address", jmsReplyTo.getAddress(), amqp.getReplyTo()); + } + } + + // ----- Utility Methods used for this Test -------------------------------// + + public EncodedMessage transform(ServerJMSMessage message) throws Exception { + // Useful for testing but not recommended for real life use. + ByteBuf nettyBuffer = Unpooled.buffer(1024); + NettyWritable buffer = new NettyWritable(nettyBuffer); + + long messageFormat = transformer.transform(message, buffer); + + EncodedMessage encoded = new EncodedMessage(messageFormat, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes()); + + return encoded; + } + + private ServerDestination createDestination(byte destType) { + ServerDestination destination = null; + switch (destType) { + case QUEUE_TYPE: + destination = new ServerDestination(TEST_ADDRESS); + break; + case TOPIC_TYPE: + destination = new ServerDestination(TEST_ADDRESS); + break; + case TEMP_QUEUE_TYPE: + destination = new ServerDestination(TEST_ADDRESS); + break; + case TEMP_TOPIC_TYPE: + destination = new ServerDestination(TEST_ADDRESS); + break; + default: + throw new IllegalArgumentException("Invliad Destination Type given/"); + } + + return destination; + } + + private ServerJMSMessage createMessage() { + return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE), 0); + } + + private ServerJMSBytesMessage createBytesMessage() { + return createBytesMessage(false); + } + + private ServerJMSBytesMessage createBytesMessage(boolean compression) { + ServerJMSBytesMessage message = new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE), 0); + + if (compression) { + // TODO + } + + return message; + } + + private ServerJMSMapMessage createMapMessage() { + return createMapMessage(false); + } + + private ServerJMSMapMessage createMapMessage(boolean compression) { + ServerJMSMapMessage message = new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE), 0); + + if (compression) { + // TODO + } + + return message; + } + + private ServerJMSStreamMessage createStreamMessage() { + return createStreamMessage(false); + } + + private ServerJMSStreamMessage createStreamMessage(boolean compression) { + ServerJMSStreamMessage message = new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE), 0); + + if (compression) { + // TODO + } + + return message; + } + + private ServerJMSObjectMessage createObjectMessage() { + return createObjectMessage(null); + } + + private ServerJMSObjectMessage createObjectMessage(Serializable payload) { + return createObjectMessage(payload, false); + } + + private ServerJMSObjectMessage createObjectMessage(Serializable payload, boolean compression) { + ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(idGenerator); + + if (compression) { + // TODO + } + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos);) { + + oos.writeObject(payload); + byte[] data = baos.toByteArray(); + result.setSerializedForm(new Binary(data)); + } catch (Exception ex) { + throw new AssertionError("Should not fail to setObject in this test"); + } + + return result; + } + + private ServerJMSTextMessage createTextMessage() { + return createTextMessage(null); + } + + private ServerJMSTextMessage createTextMessage(String text) { + return createTextMessage(text, false); + } + + private ServerJMSTextMessage createTextMessage(String text, boolean compression) { + ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(idGenerator); + + if (compression) { + // TODO + } + + try { + result.setText(text); + } catch (JMSException e) { + } + + return result; + } + + private Object deserialize(byte[] payload) throws Exception { + try (ByteArrayInputStream bis = new ByteArrayInputStream(payload); ObjectInputStream ois = new ObjectInputStream(bis);) { + + return ois.readObject(); + } + } + + private ServerMessageImpl newMessage(byte messageType) { + ServerMessageImpl message = new ServerMessageImpl(idGenerator.generateID(), 512); + message.setType(messageType); + ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); + return message; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java new file mode 100644 index 0000000..99aab33 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.converter.message; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; +import org.apache.activemq.artemis.utils.IDGenerator; +import org.apache.activemq.artemis.utils.SimpleIDGenerator; +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.message.ProtonJMessage; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +/** + * Some simple performance tests for the Message Transformers. + */ +@Ignore("Useful for profiling but slow and not meant as a unit test") +public class JMSTransformationSpeedComparisonTest { + + @Rule + public TestName test = new TestName(); + + private IDGenerator idGenerator; + private ProtonMessageConverter converter; + + private final int WARM_CYCLES = 1000; + private final int PROFILE_CYCLES = 1000000; + + @Before + public void setUp() { + idGenerator = new SimpleIDGenerator(0); + converter = new ProtonMessageConverter(idGenerator); + } + + @Test + public void testBodyOnlyMessage() throws Exception { + + Message message = Proton.message(); + message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); + EncodedMessage encoded = encode(message); + + // Warm up + for (int i = 0; i < WARM_CYCLES; ++i) { + ServerMessage intermediate = converter.inbound(encoded); + encode(converter.outbound(intermediate, 1)); + } + + long totalDuration = 0; + + long startTime = System.nanoTime(); + for (int i = 0; i < PROFILE_CYCLES; ++i) { + ServerMessage intermediate = converter.inbound(encoded); + encode(converter.outbound(intermediate, 1)); + } + totalDuration += System.nanoTime() - startTime; + + LOG_RESULTS(totalDuration); + } + + @Test + public void testMessageWithNoPropertiesOrAnnotations() throws Exception { + + Message message = Proton.message(); + + message.setAddress("queue://test-queue"); + message.setDeliveryCount(1); + message.setCreationTime(System.currentTimeMillis()); + message.setContentType("text/plain"); + message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); + + EncodedMessage encoded = encode(message); + + // Warm up + for (int i = 0; i < WARM_CYCLES; ++i) { + ServerMessage intermediate = converter.inbound(encoded); + encode(converter.outbound(intermediate, 1)); + } + + long totalDuration = 0; + + long startTime = System.nanoTime(); + for (int i = 0; i < PROFILE_CYCLES; ++i) { + ServerMessage intermediate = converter.inbound(encoded); + encode(converter.outbound(intermediate, 1)); + } + totalDuration += System.nanoTime() - startTime; + + LOG_RESULTS(totalDuration); + } + + @Test + public void testTypicalQpidJMSMessage() throws Exception { + + EncodedMessage encoded = encode(createTypicalQpidJMSMessage()); + + // Warm up + for (int i = 0; i < WARM_CYCLES; ++i) { + ServerMessage intermediate = converter.inbound(encoded); + encode(converter.outbound(intermediate, 1)); + } + + long totalDuration = 0; + + long startTime = System.nanoTime(); + for (int i = 0; i < PROFILE_CYCLES; ++i) { + ServerMessage intermediate = converter.inbound(encoded); + encode(converter.outbound(intermediate, 1)); + } + totalDuration += System.nanoTime() - startTime; + + LOG_RESULTS(totalDuration); + } + + @Test + public void testComplexQpidJMSMessage() throws Exception { + + EncodedMessage encoded = encode(createComplexQpidJMSMessage()); + + // Warm up + for (int i = 0; i < WARM_CYCLES; ++i) { + ServerMessage intermediate = converter.inbound(encoded); + encode(converter.outbound(intermediate, 1)); + } + + long totalDuration = 0; + + long startTime = System.nanoTime(); + for (int i = 0; i < PROFILE_CYCLES; ++i) { + ServerMessage intermediate = converter.inbound(encoded); + encode(converter.outbound(intermediate, 1)); + } + totalDuration += System.nanoTime() - startTime; + + LOG_RESULTS(totalDuration); + } + + @Test + public void testTypicalQpidJMSMessageInBoundOnly() throws Exception { + + EncodedMessage encoded = encode(createTypicalQpidJMSMessage()); + + // Warm up + for (int i = 0; i < WARM_CYCLES; ++i) { + converter.inbound(encoded); + } + + long totalDuration = 0; + + long startTime = System.nanoTime(); + for (int i = 0; i < PROFILE_CYCLES; ++i) { + converter.inbound(encoded); + } + + totalDuration += System.nanoTime() - startTime; + + LOG_RESULTS(totalDuration); + } + + @Test + public void testTypicalQpidJMSMessageOutBoundOnly() throws Exception { + + EncodedMessage encoded = encode(createTypicalQpidJMSMessage()); + ServerMessage intermediate = converter.inbound(encoded); + + // Warm up + for (int i = 0; i < WARM_CYCLES; ++i) { + encode(converter.outbound(intermediate, 1)); + } + + long totalDuration = 0; + + long startTime = System.nanoTime(); + for (int i = 0; i < PROFILE_CYCLES; ++i) { + encode(converter.outbound(intermediate, 1)); + } + + totalDuration += System.nanoTime() - startTime; + + LOG_RESULTS(totalDuration); + } + + private Message createTypicalQpidJMSMessage() { + Map<String, Object> applicationProperties = new HashMap<>(); + Map<Symbol, Object> messageAnnotations = new HashMap<>(); + + applicationProperties.put("property-1", "string"); + applicationProperties.put("property-2", 512); + applicationProperties.put("property-3", true); + + messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0); + messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0); + + Message message = Proton.message(); + + message.setAddress("queue://test-queue"); + message.setDeliveryCount(1); + message.setApplicationProperties(new ApplicationProperties(applicationProperties)); + message.setMessageAnnotations(new MessageAnnotations(messageAnnotations)); + message.setCreationTime(System.currentTimeMillis()); + message.setContentType("text/plain"); + message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); + + return message; + } + + private Message createComplexQpidJMSMessage() { + Map<String, Object> applicationProperties = new HashMap<>(); + Map<Symbol, Object> messageAnnotations = new HashMap<>(); + + applicationProperties.put("property-1", "string-1"); + applicationProperties.put("property-2", 512); + applicationProperties.put("property-3", true); + applicationProperties.put("property-4", "string-2"); + applicationProperties.put("property-5", 512); + applicationProperties.put("property-6", true); + applicationProperties.put("property-7", "string-3"); + applicationProperties.put("property-8", 512); + applicationProperties.put("property-9", true); + + messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0); + messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0); + + Message message = Proton.message(); + + // Header Values + message.setPriority((short) 9); + message.setDurable(true); + message.setDeliveryCount(2); + message.setTtl(5000); + + // Properties + message.setMessageId("ID:SomeQualifier:0:0:1"); + message.setGroupId("Group-ID-1"); + message.setGroupSequence(15); + message.setAddress("queue://test-queue"); + message.setReplyTo("queue://reply-queue"); + message.setCreationTime(System.currentTimeMillis()); + message.setContentType("text/plain"); + message.setCorrelationId("ID:SomeQualifier:0:7:9"); + message.setUserId("username".getBytes(StandardCharsets.UTF_8)); + + // Application Properties / Message Annotations / Body + message.setApplicationProperties(new ApplicationProperties(applicationProperties)); + message.setMessageAnnotations(new MessageAnnotations(messageAnnotations)); + message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); + + return message; + } + + private EncodedMessage encode(Object target) { + if (target instanceof ProtonJMessage) { + ProtonJMessage amqp = (ProtonJMessage) target; + + ByteBuf nettyBuffer = Unpooled.buffer(1024); + amqp.encode(new NettyWritable(nettyBuffer)); + + return new EncodedMessage(0, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes()); + } else { + return null; + } + } + + private void LOG_RESULTS(long duration) { + String result = "[JMS] Total time for " + PROFILE_CYCLES + " cycles of transforms = " + TimeUnit.NANOSECONDS.toMillis(duration) + " ms -> " + + test.getMethodName(); + + System.out.println(result); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java new file mode 100644 index 0000000..a5a2168 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.converter.message; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter; +import org.apache.activemq.artemis.utils.IDGenerator; +import org.apache.activemq.artemis.utils.SimpleIDGenerator; +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.codec.CompositeWritableBuffer; +import org.apache.qpid.proton.codec.DroppingWritableBuffer; +import org.apache.qpid.proton.codec.WritableBuffer; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.message.ProtonJMessage; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +/** + * Tests some basic encode / decode functionality on the transformers. + */ +public class MessageTransformationTest { + + @Rule + public TestName test = new TestName(); + + private IDGenerator idGenerator; + private ProtonMessageConverter converter; + + @Before + public void setUp() { + idGenerator = new SimpleIDGenerator(0); + converter = new ProtonMessageConverter(idGenerator); + } + + @Test + public void testEncodeDecodeFidelity() throws Exception { + Map<String, Object> applicationProperties = new HashMap<>(); + Map<Symbol, Object> messageAnnotations = new HashMap<>(); + + applicationProperties.put("property-1", "string"); + applicationProperties.put("property-2", 512); + applicationProperties.put("property-3", true); + + messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0); + messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0); + + Message incomingMessage = Proton.message(); + + incomingMessage.setAddress("queue://test-queue"); + incomingMessage.setDeliveryCount(1); + incomingMessage.setApplicationProperties(new ApplicationProperties(applicationProperties)); + incomingMessage.setMessageAnnotations(new MessageAnnotations(messageAnnotations)); + incomingMessage.setCreationTime(System.currentTimeMillis()); + incomingMessage.setContentType("text/plain"); + incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); + + EncodedMessage encoded = encode(incomingMessage); + + ServerMessage outbound = converter.inbound(encoded); + Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, outbound.getLongProperty("JMSXDeliveryCount").intValue())).decode(); + + // Test that message details are equal + assertEquals(incomingMessage.getAddress(), outboudMessage.getAddress()); + assertEquals(incomingMessage.getDeliveryCount(), outboudMessage.getDeliveryCount()); + assertEquals(incomingMessage.getCreationTime(), outboudMessage.getCreationTime()); + assertEquals(incomingMessage.getContentType(), outboudMessage.getContentType()); + + // Test Message annotations + ApplicationProperties incomingApplicationProperties = incomingMessage.getApplicationProperties(); + ApplicationProperties outgoingApplicationProperties = outboudMessage.getApplicationProperties(); + + assertEquals(incomingApplicationProperties.getValue(), outgoingApplicationProperties.getValue()); + + // Test Message properties + MessageAnnotations incomingMessageAnnotations = incomingMessage.getMessageAnnotations(); + MessageAnnotations outgoingMessageAnnotations = outboudMessage.getMessageAnnotations(); + + assertEquals(incomingMessageAnnotations.getValue(), outgoingMessageAnnotations.getValue()); + + // Test that bodies are equal + assertTrue(incomingMessage.getBody() instanceof AmqpValue); + assertTrue(outboudMessage.getBody() instanceof AmqpValue); + + AmqpValue incomingBody = (AmqpValue) incomingMessage.getBody(); + AmqpValue outgoingBody = (AmqpValue) outboudMessage.getBody(); + + assertTrue(incomingBody.getValue() instanceof String); + assertTrue(outgoingBody.getValue() instanceof String); + + assertEquals(incomingBody.getValue(), outgoingBody.getValue()); + } + + @Test + public void testBodyOnlyEncodeDecode() throws Exception { + + Message incomingMessage = Proton.message(); + + incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); + + EncodedMessage encoded = encode(incomingMessage); + ServerMessage outbound = converter.inbound(encoded); + Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode(); + + assertNull(outboudMessage.getHeader()); + assertNull(outboudMessage.getProperties()); + } + + @Test + public void testPropertiesButNoHeadersEncodeDecode() throws Exception { + + Message incomingMessage = Proton.message(); + + incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); + incomingMessage.setMessageId("ID:SomeQualifier:0:0:1"); + + EncodedMessage encoded = encode(incomingMessage); + ServerMessage outbound = converter.inbound(encoded); + Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode(); + + assertNull(outboudMessage.getHeader()); + assertNotNull(outboudMessage.getProperties()); + } + + @Test + public void testHeaderButNoPropertiesEncodeDecode() throws Exception { + + Message incomingMessage = Proton.message(); + + incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); + incomingMessage.setDurable(true); + + EncodedMessage encoded = encode(incomingMessage); + ServerMessage outbound = converter.inbound(encoded); + Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode(); + + assertNotNull(outboudMessage.getHeader()); + assertNull(outboudMessage.getProperties()); + } + + @Test + public void testMessageWithAmqpValueThatFailsJMSConversion() throws Exception { + + Message incomingMessage = Proton.message(); + + incomingMessage.setBody(new AmqpValue(new Boolean(true))); + + EncodedMessage encoded = encode(incomingMessage); + ServerMessage outbound = converter.inbound(encoded); + Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode(); + + Section section = outboudMessage.getBody(); + assertNotNull(section); + assertTrue(section instanceof AmqpValue); + AmqpValue amqpValue = (AmqpValue) section; + assertNotNull(amqpValue.getValue()); + assertTrue(amqpValue.getValue() instanceof Boolean); + assertEquals(true, amqpValue.getValue()); + } + + @Test + public void testComplexQpidJMSMessageEncodeDecode() throws Exception { + + Map<String, Object> applicationProperties = new HashMap<>(); + Map<Symbol, Object> messageAnnotations = new HashMap<>(); + + applicationProperties.put("property-1", "string-1"); + applicationProperties.put("property-2", 512); + applicationProperties.put("property-3", true); + applicationProperties.put("property-4", "string-2"); + applicationProperties.put("property-5", 512); + applicationProperties.put("property-6", true); + applicationProperties.put("property-7", "string-3"); + applicationProperties.put("property-8", 512); + applicationProperties.put("property-9", true); + + messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0); + messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0); + messageAnnotations.put(Symbol.valueOf("x-opt-jms-reply-to"), 0); + messageAnnotations.put(Symbol.valueOf("x-opt-delivery-delay"), 2000); + + Message message = Proton.message(); + + // Header Values + message.setPriority((short) 9); + message.setDurable(true); + message.setDeliveryCount(2); + message.setTtl(5000); + + // Properties + message.setMessageId("ID:SomeQualifier:0:0:1"); + message.setGroupId("Group-ID-1"); + message.setGroupSequence(15); + message.setAddress("queue://test-queue"); + message.setReplyTo("queue://reply-queue"); + message.setCreationTime(System.currentTimeMillis()); + message.setContentType("text/plain"); + message.setCorrelationId("ID:SomeQualifier:0:7:9"); + message.setUserId("username".getBytes(StandardCharsets.UTF_8)); + + // Application Properties / Message Annotations / Body + message.setApplicationProperties(new ApplicationProperties(applicationProperties)); + message.setMessageAnnotations(new MessageAnnotations(messageAnnotations)); + message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); + + EncodedMessage encoded = encode(message); + ServerMessage outbound = converter.inbound(encoded); + Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode(); + + assertNotNull(outboudMessage.getHeader()); + assertNotNull(outboudMessage.getProperties()); + assertNotNull(outboudMessage.getMessageAnnotations()); + assertNotNull(outboudMessage.getApplicationProperties()); + assertNull(outboudMessage.getDeliveryAnnotations()); + assertNull(outboudMessage.getFooter()); + + assertEquals(9, outboudMessage.getApplicationProperties().getValue().size()); + assertEquals(4, outboudMessage.getMessageAnnotations().getValue().size()); + } + + private EncodedMessage encode(Message message) { + ProtonJMessage amqp = (ProtonJMessage) message; + + ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]); + final DroppingWritableBuffer overflow = new DroppingWritableBuffer(); + int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow)); + if (overflow.position() > 0) { + buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]); + c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer)); + } + + return new EncodedMessage(1, buffer.array(), 0, c); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index f39a9c5..e3e9681 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -204,6 +204,30 @@ public class AmqpMessage { } /** + * Sets the replyTo address which is applied to the AMQP message reply-to field in the message properties + * + * @param address The replyTo address that should be applied in the Message To field. + */ + public void setReplyToAddress(String address) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setReplyTo(address); + } + + /** + * Return the set replyTo address that was set in the Message To field. + * + * @return the set replyTo address String form or null if not set. + */ + public String getReplyToAddress() { + if (message.getProperties() == null) { + return null; + } + + return message.getProperties().getReplyTo(); + } + + /** * Sets the MessageId property on an outbound message using the provided String * * @param messageId the String message ID value to set. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/62627bf2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index 2a1e8c9..15271f6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -16,6 +16,25 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Field; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Enumeration; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -36,21 +55,9 @@ import javax.jms.StreamMessage; import javax.jms.TemporaryQueue; import javax.jms.TextMessage; import javax.jms.Topic; +import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; -import java.io.IOException; -import java.io.Serializable; -import java.lang.reflect.Field; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Enumeration; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.Bindings; @@ -83,11 +90,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION; -import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains; - @RunWith(Parameterized.class) public class ProtonTest extends ProtonTestBase { @@ -180,6 +182,31 @@ public class ProtonTest extends ProtonTestBase { } @Test + public void testSendAndReceiveOnTopic() throws Exception { + Connection connection = createConnection("myClientId"); + try { + TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("amqp_testtopic"); + TopicSubscriber consumer = session.createSubscriber(topic); + TopicPublisher producer = session.createPublisher(topic); + + TextMessage message = session.createTextMessage("test-message"); + producer.send(message); + producer.close(); + + connection.start(); + + message = (TextMessage) consumer.receive(1000); + assertNotNull(message); + assertNotNull(message.getText()); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + @Test public void testDurableSubscriptionUnsubscribe() throws Exception { Connection connection = createConnection("myClientId"); try { @@ -495,7 +522,7 @@ public class ProtonTest extends ProtonTestBase { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); javax.jms.Queue queue = createQueue(address); MessageProducer p = session.createProducer(queue); - ArrayList list = new ArrayList(); + ArrayList<String> list = new ArrayList<>(); list.add("aString"); ObjectMessage objectMessage = session.createObjectMessage(list); p.send(objectMessage); @@ -507,7 +534,7 @@ public class ProtonTest extends ProtonTestBase { objectMessage = (ObjectMessage) cons.receive(5000); assertNotNull(objectMessage); - list = (ArrayList) objectMessage.getObject(); + list = (ArrayList<String>) objectMessage.getObject(); assertEquals(list.get(0), "aString"); connection.close(); } @@ -586,7 +613,7 @@ public class ProtonTest extends ProtonTestBase { fillAddress(destinationAddress); AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection amqpConnection = amqpConnection = client.connect(); + AmqpConnection amqpConnection = client.connect(); try { AmqpSession session = amqpConnection.createSession(); AmqpSender sender = session.createSender(destinationAddress); @@ -860,7 +887,7 @@ public class ProtonTest extends ProtonTestBase { AmqpMessage request = new AmqpMessage(); request.setApplicationProperty("_AMQ_ResourceName", "core.server"); request.setApplicationProperty("_AMQ_OperationName", "getQueueNames"); - request.setApplicationProperty("JMSReplyTo", destinationAddress); + request.setReplyToAddress(destinationAddress); request.setText("[]"); sender.send(request);
