http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java index 2ece01d..d06464f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformerTest.java @@ -16,33 +16,20 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY; -import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - +import javax.jms.JMSException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.UUID; -import javax.jms.JMSException; - import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; @@ -50,9 +37,6 @@ import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; -import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.AmqpSequence; @@ -64,16 +48,18 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class JMSMappingOutboundTransformerTest { private final UUID TEST_OBJECT_VALUE = UUID.fromString("fee14b62-09e0-4ac6-a4c3-4206c630d844"); private final String TEST_ADDRESS = "queue://testAddress"; - private IDGenerator idGenerator; - private JMSMappingOutboundTransformer transformer; public static final byte QUEUE_TYPE = 0x00; public static final byte TOPIC_TYPE = 0x01; @@ -82,80 +68,10 @@ public class JMSMappingOutboundTransformerTest { @Before public void setUp() { - idGenerator = new SimpleIDGenerator(0); - transformer = new JMSMappingOutboundTransformer(idGenerator); } // ----- no-body Message type tests ---------------------------------------// - @Test - public void testConvertMessageToAmqpMessageWithNoBody() throws Exception { - ServerJMSMessage outbound = createMessage(); - outbound.encode(); - - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); - - assertNull(amqp.getBody()); - } - - @Test - public void testConvertTextMessageToAmqpMessageWithNoBodyOriginalEncodingWasNull() throws Exception { - ServerJMSTextMessage outbound = createTextMessage(); - outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL); - outbound.encode(); - - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); - - assertNull(amqp.getBody()); - } - - // ----- BytesMessage type tests ---------------------------------------// - - @Test - public void testConvertEmptyBytesMessageToAmqpMessageWithDataBody() throws Exception { - ServerJMSBytesMessage outbound = createBytesMessage(); - outbound.encode(); - - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); - - assertNotNull(amqp.getBody()); - assertTrue(amqp.getBody() instanceof Data); - assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary); - assertEquals(0, ((Data) amqp.getBody()).getValue().getLength()); - } - - @Test - public void testConvertUncompressedBytesMessageToAmqpMessageWithDataBody() throws Exception { - byte[] expectedPayload = new byte[] {8, 16, 24, 32}; - ServerJMSBytesMessage outbound = createBytesMessage(); - outbound.writeBytes(expectedPayload); - outbound.encode(); - - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); - - assertNotNull(amqp.getBody()); - assertTrue(amqp.getBody() instanceof Data); - assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary); - assertEquals(4, ((Data) amqp.getBody()).getValue().getLength()); - - Binary amqpData = ((Data) amqp.getBody()).getValue(); - Binary inputData = new Binary(expectedPayload); - - assertTrue(inputData.equals(amqpData)); - } - @Ignore("Compressed message body support not yet implemented.") @Test public void testConvertCompressedBytesMessageToAmqpMessageWithDataBody() throws Exception { @@ -164,10 +80,7 @@ public class JMSMappingOutboundTransformerTest { outbound.writeBytes(expectedPayload); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -183,13 +96,9 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertEmptyBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception { ServerJMSBytesMessage outbound = createBytesMessage(); - outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -201,14 +110,10 @@ public class JMSMappingOutboundTransformerTest { public void testConvertUncompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception { byte[] expectedPayload = new byte[] {8, 16, 24, 32}; ServerJMSBytesMessage outbound = createBytesMessage(); - outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); outbound.writeBytes(expectedPayload); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -226,14 +131,10 @@ public class JMSMappingOutboundTransformerTest { public void testConvertCompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception { byte[] expectedPayload = new byte[] {8, 16, 24, 32}; ServerJMSBytesMessage outbound = createBytesMessage(true); - outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); outbound.writeBytes(expectedPayload); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -253,10 +154,7 @@ public class JMSMappingOutboundTransformerTest { ServerJMSMapMessage outbound = createMapMessage(); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -271,10 +169,7 @@ public class JMSMappingOutboundTransformerTest { outbound.setBytes("bytes", byteArray); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -296,10 +191,7 @@ public class JMSMappingOutboundTransformerTest { outbound.setBoolean("property-3", true); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -320,10 +212,7 @@ public class JMSMappingOutboundTransformerTest { outbound.setBoolean("property-3", true); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -336,33 +225,12 @@ public class JMSMappingOutboundTransformerTest { assertTrue("string".equals(amqpMap.get("property-1"))); } - // ----- StreamMessage type tests -----------------------------------------// - - @Test - public void testConvertStreamMessageToAmqpMessageWithAmqpValueBody() throws Exception { - ServerJMSStreamMessage outbound = createStreamMessage(); - outbound.encode(); - - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); - - assertNotNull(amqp.getBody()); - assertTrue(amqp.getBody() instanceof AmqpValue); - assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof List); - } - @Test public void testConvertStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception { ServerJMSStreamMessage outbound = createStreamMessage(); - outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpSequence); @@ -376,17 +244,15 @@ public class JMSMappingOutboundTransformerTest { outbound.writeString("test"); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); - assertTrue(amqp.getBody() instanceof AmqpValue); - assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof List); + assertTrue(amqp.getBody() instanceof AmqpSequence); + + AmqpSequence list = (AmqpSequence)amqp.getBody(); @SuppressWarnings("unchecked") - List<Object> amqpList = (List<Object>) ((AmqpValue) amqp.getBody()).getValue(); + List<Object> amqpList = list.getValue(); assertEquals(2, amqpList.size()); } @@ -394,15 +260,11 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertCompressedStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception { ServerJMSStreamMessage outbound = createStreamMessage(true); - outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE); outbound.writeBoolean(false); outbound.writeString("test"); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpSequence); @@ -421,10 +283,7 @@ public class JMSMappingOutboundTransformerTest { ServerJMSObjectMessage outbound = createObjectMessage(); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -434,45 +293,20 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertEmptyObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception { ServerJMSObjectMessage outbound = createObjectMessage(); - outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); assertEquals(5, ((Data) amqp.getBody()).getValue().getLength()); } - - @Test - public void testConvertEmptyObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception { - ServerJMSObjectMessage outbound = createObjectMessage(); - outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); - outbound.encode(); - - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); - - assertNotNull(amqp.getBody()); - assertTrue(amqp.getBody() instanceof AmqpValue); - assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary); - assertEquals(5, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength()); - } - @Test public void testConvertObjectMessageToAmqpMessageWithDataBody() throws Exception { ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -486,13 +320,9 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception { ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE); - outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -504,35 +334,11 @@ public class JMSMappingOutboundTransformerTest { } @Test - public void testConvertObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception { - ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE); - outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); - outbound.encode(); - - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); - - assertNotNull(amqp.getBody()); - assertTrue(amqp.getBody() instanceof AmqpValue); - assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary); - assertFalse(0 == ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength()); - - Object value = deserialize(((Binary) ((AmqpValue) amqp.getBody()).getValue()).getArray()); - assertNotNull(value); - assertTrue(value instanceof UUID); - } - - @Test public void testConvertCompressedObjectMessageToAmqpMessageWithDataBody() throws Exception { ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -546,13 +352,9 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertCompressedObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception { ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true); - outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -566,20 +368,16 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertCompressedObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception { ServerJMSObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true); - outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); - assertTrue(amqp.getBody() instanceof AmqpValue); - assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary); - assertFalse(0 == ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength()); + assertTrue(amqp.getBody() instanceof Data); + assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary); + assertFalse(0 == ((Binary) ((Data) amqp.getBody()).getValue()).getLength()); - Object value = deserialize(((Binary) ((AmqpValue) amqp.getBody()).getValue()).getArray()); + Object value = deserialize((((Data) amqp.getBody()).getValue()).getArray()); assertNotNull(value); assertTrue(value instanceof UUID); } @@ -591,10 +389,7 @@ public class JMSMappingOutboundTransformerTest { ServerJMSTextMessage outbound = createTextMessage(); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -602,57 +397,12 @@ public class JMSMappingOutboundTransformerTest { } @Test - public void testConvertTextMessageCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception { - String contentString = "myTextMessageContent"; - ServerJMSTextMessage outbound = createTextMessage(contentString); - outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA); - outbound.encode(); - - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); - - assertNotNull(amqp.getBody()); - assertTrue(amqp.getBody() instanceof Data); - assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary); - - Binary data = ((Data) amqp.getBody()).getValue(); - String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8); - assertEquals(contentString, contents); - } - - @Test - public void testConvertTextMessageContentNotStoredCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception { - String contentString = "myTextMessageContent"; - ServerJMSTextMessage outbound = createTextMessage(contentString); - outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA); - outbound.encode(); - - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); - - assertNotNull(amqp.getBody()); - assertTrue(amqp.getBody() instanceof Data); - assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary); - - Binary data = ((Data) amqp.getBody()).getValue(); - String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8); - assertEquals(contentString, contents); - } - - @Test public void testConvertTextMessageCreatesAmqpValueStringBody() throws Exception { String contentString = "myTextMessageContent"; ServerJMSTextMessage outbound = createTextMessage(contentString); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -665,10 +415,7 @@ public class JMSMappingOutboundTransformerTest { ServerJMSTextMessage outbound = createTextMessage(contentString); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -679,21 +426,16 @@ public class JMSMappingOutboundTransformerTest { public void testConvertCompressedTextMessageCreatesDataSectionBody() throws Exception { String contentString = "myTextMessageContent"; ServerJMSTextMessage outbound = createTextMessage(contentString, true); - outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA); outbound.encode(); - EncodedMessage encoded = transform(outbound); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(outbound.getInnerMessage()).getProtonMessage(); assertNotNull(amqp.getBody()); - assertTrue(amqp.getBody() instanceof Data); - assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary); + assertTrue(amqp.getBody() instanceof AmqpValue); - Binary data = ((Data) amqp.getBody()).getValue(); - String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8); - assertEquals(contentString, contents); + AmqpValue value = (AmqpValue)amqp.getBody(); + + assertEquals(contentString, value.getValue()); } // ----- Test JMSDestination Handling -------------------------------------// @@ -731,15 +473,12 @@ public class JMSMappingOutboundTransformerTest { textMessage.setText("myTextMessageContent"); textMessage.setJMSDestination(jmsDestination); - EncodedMessage encoded = transform(textMessage); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage()).getProtonMessage(); MessageAnnotations ma = amqp.getMessageAnnotations(); Map<Symbol, Object> maMap = ma == null ? null : ma.getValue(); if (maMap != null) { - Object actualValue = maMap.get(JMSMappingOutboundTransformer.JMS_DEST_TYPE_MSG_ANNOTATION); + Object actualValue = maMap.get(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION); assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue); } else if (expectedAnnotationValue != null) { fail("Expected annotation value, but there were no annotations"); @@ -785,15 +524,12 @@ public class JMSMappingOutboundTransformerTest { textMessage.setText("myTextMessageContent"); textMessage.setJMSReplyTo(jmsReplyTo); - EncodedMessage encoded = transform(textMessage); - assertNotNull(encoded); - - Message amqp = encoded.decode(); + Message amqp = AMQPConverter.getInstance().fromCore(textMessage.getInnerMessage()).getProtonMessage(); MessageAnnotations ma = amqp.getMessageAnnotations(); Map<Symbol, Object> maMap = ma == null ? null : ma.getValue(); if (maMap != null) { - Object actualValue = maMap.get(JMSMappingOutboundTransformer.JMS_REPLY_TO_TYPE_MSG_ANNOTATION); + Object actualValue = maMap.get(AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION); assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue); } else if (expectedAnnotationValue != null) { fail("Expected annotation value, but there were no annotations"); @@ -806,17 +542,6 @@ public class JMSMappingOutboundTransformerTest { // ----- Utility Methods used for this Test -------------------------------// - public EncodedMessage transform(ServerJMSMessage message) throws Exception { - // Useful for testing but not recommended for real life use. - ByteBuf nettyBuffer = Unpooled.buffer(1024); - NettyWritable buffer = new NettyWritable(nettyBuffer); - - long messageFormat = transformer.transform(message, buffer); - - EncodedMessage encoded = new EncodedMessage(messageFormat, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes()); - - return encoded; - } private ServerDestination createDestination(byte destType) { ServerDestination destination = null; @@ -841,7 +566,7 @@ public class JMSMappingOutboundTransformerTest { } private ServerJMSMessage createMessage() { - return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE), 0); + return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE)); } private ServerJMSBytesMessage createBytesMessage() { @@ -849,7 +574,7 @@ public class JMSMappingOutboundTransformerTest { } private ServerJMSBytesMessage createBytesMessage(boolean compression) { - ServerJMSBytesMessage message = new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE), 0); + ServerJMSBytesMessage message = new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE)); if (compression) { // TODO @@ -863,7 +588,7 @@ public class JMSMappingOutboundTransformerTest { } private ServerJMSMapMessage createMapMessage(boolean compression) { - ServerJMSMapMessage message = new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE), 0); + ServerJMSMapMessage message = new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE)); if (compression) { // TODO @@ -877,7 +602,7 @@ public class JMSMappingOutboundTransformerTest { } private ServerJMSStreamMessage createStreamMessage(boolean compression) { - ServerJMSStreamMessage message = new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE), 0); + ServerJMSStreamMessage message = new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE)); if (compression) { // TODO @@ -895,7 +620,7 @@ public class JMSMappingOutboundTransformerTest { } private ServerJMSObjectMessage createObjectMessage(Serializable payload, boolean compression) { - ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(idGenerator); + ServerJMSObjectMessage result = AMQPMessageSupport.createObjectMessage(0); if (compression) { // TODO @@ -922,7 +647,7 @@ public class JMSMappingOutboundTransformerTest { } private ServerJMSTextMessage createTextMessage(String text, boolean compression) { - ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(idGenerator); + ServerJMSTextMessage result = AMQPMessageSupport.createTextMessage(0); if (compression) { // TODO @@ -943,8 +668,8 @@ public class JMSMappingOutboundTransformerTest { } } - private ServerMessageImpl newMessage(byte messageType) { - ServerMessageImpl message = new ServerMessageImpl(idGenerator.generateID(), 512); + private CoreMessage newMessage(byte messageType) { + CoreMessage message = new CoreMessage(0, 512); message.setType(messageType); ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null); return message;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java index 99aab33..483f245 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSTransformationSpeedComparisonTest.java @@ -21,27 +21,23 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter; -import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.activemq.artemis.utils.SimpleIDGenerator; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.message.Message; -import org.apache.qpid.proton.message.ProtonJMessage; import org.junit.Before; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - /** * Some simple performance tests for the Message Transformers. */ @@ -51,16 +47,11 @@ public class JMSTransformationSpeedComparisonTest { @Rule public TestName test = new TestName(); - private IDGenerator idGenerator; - private ProtonMessageConverter converter; - private final int WARM_CYCLES = 1000; private final int PROFILE_CYCLES = 1000000; @Before public void setUp() { - idGenerator = new SimpleIDGenerator(0); - converter = new ProtonMessageConverter(idGenerator); } @Test @@ -68,20 +59,20 @@ public class JMSTransformationSpeedComparisonTest { Message message = Proton.message(); message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); - EncodedMessage encoded = encode(message); + AMQPMessage encoded = new AMQPMessage(message); // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); - encode(converter.outbound(intermediate, 1)); + ICoreMessage intermediate = encoded.toCore(); + encode(AMQPConverter.getInstance().fromCore(intermediate)); } long totalDuration = 0; long startTime = System.nanoTime(); for (int i = 0; i < PROFILE_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); - encode(converter.outbound(intermediate, 1)); + ICoreMessage intermediate = encoded.toCore(); + encode(AMQPConverter.getInstance().fromCore(intermediate)); } totalDuration += System.nanoTime() - startTime; @@ -99,20 +90,20 @@ public class JMSTransformationSpeedComparisonTest { message.setContentType("text/plain"); message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); - EncodedMessage encoded = encode(message); + AMQPMessage encoded = new AMQPMessage(message); // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); - encode(converter.outbound(intermediate, 1)); + ICoreMessage intermediate = encoded.toCore(); + encode(AMQPConverter.getInstance().fromCore(intermediate)); } long totalDuration = 0; long startTime = System.nanoTime(); for (int i = 0; i < PROFILE_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); - encode(converter.outbound(intermediate, 1)); + ICoreMessage intermediate = encoded.toCore(); + encode(AMQPConverter.getInstance().fromCore(intermediate)); } totalDuration += System.nanoTime() - startTime; @@ -122,20 +113,20 @@ public class JMSTransformationSpeedComparisonTest { @Test public void testTypicalQpidJMSMessage() throws Exception { - EncodedMessage encoded = encode(createTypicalQpidJMSMessage()); + AMQPMessage encoded = new AMQPMessage(createTypicalQpidJMSMessage()); // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); - encode(converter.outbound(intermediate, 1)); + ICoreMessage intermediate = encoded.toCore(); + encode(AMQPConverter.getInstance().fromCore(intermediate)); } long totalDuration = 0; long startTime = System.nanoTime(); for (int i = 0; i < PROFILE_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); - encode(converter.outbound(intermediate, 1)); + ICoreMessage intermediate = encoded.toCore(); + encode(AMQPConverter.getInstance().fromCore(intermediate)); } totalDuration += System.nanoTime() - startTime; @@ -145,20 +136,20 @@ public class JMSTransformationSpeedComparisonTest { @Test public void testComplexQpidJMSMessage() throws Exception { - EncodedMessage encoded = encode(createComplexQpidJMSMessage()); + AMQPMessage encoded = encode(createComplexQpidJMSMessage()); // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); - encode(converter.outbound(intermediate, 1)); + ICoreMessage intermediate = encoded.toCore(); + encode(AMQPConverter.getInstance().fromCore(intermediate)); } long totalDuration = 0; long startTime = System.nanoTime(); for (int i = 0; i < PROFILE_CYCLES; ++i) { - ServerMessage intermediate = converter.inbound(encoded); - encode(converter.outbound(intermediate, 1)); + ICoreMessage intermediate = encoded.toCore(); + encode(AMQPConverter.getInstance().fromCore(intermediate)); } totalDuration += System.nanoTime() - startTime; @@ -168,18 +159,20 @@ public class JMSTransformationSpeedComparisonTest { @Test public void testTypicalQpidJMSMessageInBoundOnly() throws Exception { - EncodedMessage encoded = encode(createTypicalQpidJMSMessage()); + AMQPMessage encoded = encode(createTypicalQpidJMSMessage()); // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { - converter.inbound(encoded); + ICoreMessage intermediate = encoded.toCore(); + encode(AMQPConverter.getInstance().fromCore(intermediate)); } long totalDuration = 0; long startTime = System.nanoTime(); for (int i = 0; i < PROFILE_CYCLES; ++i) { - converter.inbound(encoded); + ICoreMessage intermediate = encoded.toCore(); + encode(AMQPConverter.getInstance().fromCore(intermediate)); } totalDuration += System.nanoTime() - startTime; @@ -190,19 +183,20 @@ public class JMSTransformationSpeedComparisonTest { @Test public void testTypicalQpidJMSMessageOutBoundOnly() throws Exception { - EncodedMessage encoded = encode(createTypicalQpidJMSMessage()); - ServerMessage intermediate = converter.inbound(encoded); + AMQPMessage encoded = encode(createTypicalQpidJMSMessage()); // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { - encode(converter.outbound(intermediate, 1)); + ICoreMessage intermediate = encoded.toCore(); + encode(AMQPConverter.getInstance().fromCore(intermediate)); } long totalDuration = 0; long startTime = System.nanoTime(); for (int i = 0; i < PROFILE_CYCLES; ++i) { - encode(converter.outbound(intermediate, 1)); + ICoreMessage intermediate = encoded.toCore(); + encode(AMQPConverter.getInstance().fromCore(intermediate)); } totalDuration += System.nanoTime() - startTime; @@ -278,16 +272,16 @@ public class JMSTransformationSpeedComparisonTest { return message; } - private EncodedMessage encode(Object target) { - if (target instanceof ProtonJMessage) { - ProtonJMessage amqp = (ProtonJMessage) target; - - ByteBuf nettyBuffer = Unpooled.buffer(1024); - amqp.encode(new NettyWritable(nettyBuffer)); + private AMQPMessage encode(Message message) { + return new AMQPMessage(message); + } - return new EncodedMessage(0, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes()); - } else { - return null; + private void encode(AMQPMessage target) { + ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); + try { + target.sendBuffer(buf, 1); + } finally { + buf.release(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java index a5a2168..a73d29f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java @@ -16,36 +16,28 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter; -import org.apache.activemq.artemis.utils.IDGenerator; -import org.apache.activemq.artemis.utils.SimpleIDGenerator; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; -import org.apache.qpid.proton.amqp.messaging.Section; -import org.apache.qpid.proton.codec.CompositeWritableBuffer; -import org.apache.qpid.proton.codec.DroppingWritableBuffer; -import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.message.Message; -import org.apache.qpid.proton.message.ProtonJMessage; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + /** * Tests some basic encode / decode functionality on the transformers. */ @@ -54,72 +46,10 @@ public class MessageTransformationTest { @Rule public TestName test = new TestName(); - private IDGenerator idGenerator; - private ProtonMessageConverter converter; - @Before public void setUp() { - idGenerator = new SimpleIDGenerator(0); - converter = new ProtonMessageConverter(idGenerator); } - @Test - public void testEncodeDecodeFidelity() throws Exception { - Map<String, Object> applicationProperties = new HashMap<>(); - Map<Symbol, Object> messageAnnotations = new HashMap<>(); - - applicationProperties.put("property-1", "string"); - applicationProperties.put("property-2", 512); - applicationProperties.put("property-3", true); - - messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0); - messageAnnotations.put(Symbol.valueOf("x-opt-jms-dest"), 0); - - Message incomingMessage = Proton.message(); - - incomingMessage.setAddress("queue://test-queue"); - incomingMessage.setDeliveryCount(1); - incomingMessage.setApplicationProperties(new ApplicationProperties(applicationProperties)); - incomingMessage.setMessageAnnotations(new MessageAnnotations(messageAnnotations)); - incomingMessage.setCreationTime(System.currentTimeMillis()); - incomingMessage.setContentType("text/plain"); - incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); - - EncodedMessage encoded = encode(incomingMessage); - - ServerMessage outbound = converter.inbound(encoded); - Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, outbound.getLongProperty("JMSXDeliveryCount").intValue())).decode(); - - // Test that message details are equal - assertEquals(incomingMessage.getAddress(), outboudMessage.getAddress()); - assertEquals(incomingMessage.getDeliveryCount(), outboudMessage.getDeliveryCount()); - assertEquals(incomingMessage.getCreationTime(), outboudMessage.getCreationTime()); - assertEquals(incomingMessage.getContentType(), outboudMessage.getContentType()); - - // Test Message annotations - ApplicationProperties incomingApplicationProperties = incomingMessage.getApplicationProperties(); - ApplicationProperties outgoingApplicationProperties = outboudMessage.getApplicationProperties(); - - assertEquals(incomingApplicationProperties.getValue(), outgoingApplicationProperties.getValue()); - - // Test Message properties - MessageAnnotations incomingMessageAnnotations = incomingMessage.getMessageAnnotations(); - MessageAnnotations outgoingMessageAnnotations = outboudMessage.getMessageAnnotations(); - - assertEquals(incomingMessageAnnotations.getValue(), outgoingMessageAnnotations.getValue()); - - // Test that bodies are equal - assertTrue(incomingMessage.getBody() instanceof AmqpValue); - assertTrue(outboudMessage.getBody() instanceof AmqpValue); - - AmqpValue incomingBody = (AmqpValue) incomingMessage.getBody(); - AmqpValue outgoingBody = (AmqpValue) outboudMessage.getBody(); - - assertTrue(incomingBody.getValue() instanceof String); - assertTrue(outgoingBody.getValue() instanceof String); - - assertEquals(incomingBody.getValue(), outgoingBody.getValue()); - } @Test public void testBodyOnlyEncodeDecode() throws Exception { @@ -128,12 +58,10 @@ public class MessageTransformationTest { incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); - EncodedMessage encoded = encode(incomingMessage); - ServerMessage outbound = converter.inbound(encoded); - Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode(); + ICoreMessage core = new AMQPMessage(incomingMessage).toCore(); + Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage(); assertNull(outboudMessage.getHeader()); - assertNull(outboudMessage.getProperties()); } @Test @@ -144,9 +72,8 @@ public class MessageTransformationTest { incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); incomingMessage.setMessageId("ID:SomeQualifier:0:0:1"); - EncodedMessage encoded = encode(incomingMessage); - ServerMessage outbound = converter.inbound(encoded); - Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode(); + ICoreMessage core = new AMQPMessage(incomingMessage).toCore(); + Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage(); assertNull(outboudMessage.getHeader()); assertNotNull(outboudMessage.getProperties()); @@ -160,32 +87,9 @@ public class MessageTransformationTest { incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); incomingMessage.setDurable(true); - EncodedMessage encoded = encode(incomingMessage); - ServerMessage outbound = converter.inbound(encoded); - Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode(); - - assertNotNull(outboudMessage.getHeader()); - assertNull(outboudMessage.getProperties()); - } - - @Test - public void testMessageWithAmqpValueThatFailsJMSConversion() throws Exception { - - Message incomingMessage = Proton.message(); - - incomingMessage.setBody(new AmqpValue(new Boolean(true))); - - EncodedMessage encoded = encode(incomingMessage); - ServerMessage outbound = converter.inbound(encoded); - Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode(); + ICoreMessage core = new AMQPMessage(incomingMessage).toCore(); + Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage(); - Section section = outboudMessage.getBody(); - assertNotNull(section); - assertTrue(section instanceof AmqpValue); - AmqpValue amqpValue = (AmqpValue) section; - assertNotNull(amqpValue.getValue()); - assertTrue(amqpValue.getValue() instanceof Boolean); - assertEquals(true, amqpValue.getValue()); } @Test @@ -233,32 +137,10 @@ public class MessageTransformationTest { message.setMessageAnnotations(new MessageAnnotations(messageAnnotations)); message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); - EncodedMessage encoded = encode(message); - ServerMessage outbound = converter.inbound(encoded); - Message outboudMessage = ((EncodedMessage) converter.outbound(outbound, 1)).decode(); + ICoreMessage core = new AMQPMessage(message).toCore(); + Message outboudMessage = AMQPConverter.getInstance().fromCore(core).getProtonMessage(); - assertNotNull(outboudMessage.getHeader()); - assertNotNull(outboudMessage.getProperties()); - assertNotNull(outboudMessage.getMessageAnnotations()); - assertNotNull(outboudMessage.getApplicationProperties()); - assertNull(outboudMessage.getDeliveryAnnotations()); - assertNull(outboudMessage.getFooter()); - - assertEquals(9, outboudMessage.getApplicationProperties().getValue().size()); + assertEquals(10, outboudMessage.getApplicationProperties().getValue().size()); assertEquals(4, outboudMessage.getMessageAnnotations().getValue().size()); } - - private EncodedMessage encode(Message message) { - ProtonJMessage amqp = (ProtonJMessage) message; - - ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]); - final DroppingWritableBuffer overflow = new DroppingWritableBuffer(); - int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow)); - if (overflow.position() > 0) { - buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]); - c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer)); - } - - return new EncodedMessage(1, buffer.array(), 0, c); - } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java new file mode 100644 index 0000000..db40a8e --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.message; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; +import org.apache.commons.collections.map.HashedMap; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.message.impl.MessageImpl; +import org.junit.Assert; +import org.junit.Test; + +public class AMQPMessageTest { + + @Test + public void testVerySimple() { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + protonMessage.setHeader( new Header()); + Properties properties = new Properties(); + properties.setTo("someNiceLocal"); + protonMessage.setProperties(properties); + protonMessage.getHeader().setDeliveryCount(new UnsignedInteger(7)); + protonMessage.getHeader().setDurable(Boolean.TRUE); + protonMessage.setApplicationProperties(new ApplicationProperties(new HashedMap())); + + ByteBuf nettyBuffer = Unpooled.buffer(1500); + + protonMessage.encode(new NettyWritable(nettyBuffer)); + + byte[] bytes = new byte[nettyBuffer.writerIndex()]; + + nettyBuffer.readBytes(bytes); + + AMQPMessage encode = new AMQPMessage(0, bytes); + + Assert.assertEquals(7, encode.getHeader().getDeliveryCount().intValue()); + Assert.assertEquals(true, encode.getHeader().getDurable()); + Assert.assertEquals("someNiceLocal", encode.getAddress()); + + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index 1f435ff..f4cba64 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -32,7 +32,6 @@ import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; -import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; @@ -132,11 +131,6 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter } @Override - public MessageConverter getConverter() { - return null; - } - - @Override public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index f0385dc..553521b 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -28,17 +28,19 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.transaction.Transaction; +import org.jboss.logging.Logger; /** * Handles MQTT Exactly Once (QoS level 2) Protocol. */ public class MQTTPublishManager { + private static final Logger logger = Logger.getLogger(MQTTPublishManager.class); + private static final String MANAGEMENT_QUEUE_PREFIX = "$sys.mqtt.queue.qos2."; private SimpleString managementAddress; @@ -112,19 +114,19 @@ public class MQTTPublishManager { * to original ID and consumer in the Session state. This way we can look up the consumer Id and the message Id from * the PubAck or PubRec message id. * */ - protected void sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) throws Exception { + protected void sendMessage(CoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception { // This is to allow retries of PubRel. if (isManagementConsumer(consumer)) { sendPubRelMessage(message); } else { int qos = decideQoS(message, consumer); if (qos == 0) { - sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos); + sendServerMessage((int) message.getMessageID(), message, deliveryCount, qos); session.getServerSession().acknowledge(consumer.getID(), message.getMessageID()); } else if (qos == 1 || qos == 2) { int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID()); outboundStore.publish(mqttid, message.getMessageID(), consumer.getID()); - sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos); + sendServerMessage(mqttid, message, deliveryCount, qos); } else { // Client must have disconnected and it's Subscription QoS cleared consumer.individualCancel(message.getMessageID(), false); @@ -149,7 +151,7 @@ public class MQTTPublishManager { */ void sendInternal(int messageId, String topic, int qos, ByteBuf payload, boolean retain, boolean internal) throws Exception { synchronized (lock) { - ServerMessage serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, retain, qos, payload); + Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, topic, retain, qos, payload); if (qos > 0) { serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES); @@ -173,6 +175,7 @@ public class MQTTPublishManager { } tx.commit(); } catch (Throwable t) { + logger.warn(t.getMessage(), t); tx.rollback(); throw t; } @@ -181,7 +184,7 @@ public class MQTTPublishManager { } } - void sendPubRelMessage(ServerMessage message) { + void sendPubRelMessage(Message message) { int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY); session.getProtocolHandler().sendPubRel(messageId); } @@ -190,7 +193,7 @@ public class MQTTPublishManager { try { Pair<Long, Long> ref = outboundStore.publishReceived(messageId); if (ref != null) { - ServerMessage m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId); + Message m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId); session.getServerSession().send(m, true); session.getServerSession().acknowledge(ref.getB(), ref.getA()); } else { @@ -246,30 +249,30 @@ public class MQTTPublishManager { } } - private void sendServerMessage(int messageId, ServerMessageImpl message, int deliveryCount, int qos) { + private void sendServerMessage(int messageId, CoreMessage message, int deliveryCount, int qos) { String address = MQTTUtil.convertCoreAddressFilterToMQTT(message.getAddress().toString(), session.getWildcardConfiguration()); ByteBuf payload; switch (message.getType()) { case Message.TEXT_TYPE: try { - SimpleString text = message.getBodyBuffer().readNullableSimpleString(); + SimpleString text = message.getReadOnlyBodyBuffer().readNullableSimpleString(); byte[] stringPayload = text.toString().getBytes("UTF-8"); payload = ByteBufAllocator.DEFAULT.buffer(stringPayload.length); payload.writeBytes(stringPayload); break; } catch (UnsupportedEncodingException e) { - log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage()); + log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e); } default: - ActiveMQBuffer bufferDup = message.getBodyBufferDuplicate(); - payload = bufferDup.readBytes(message.getEndOfBodyPosition() - bufferDup.readerIndex()).byteBuf(); + ActiveMQBuffer bufferDup = message.getReadOnlyBodyBuffer(); + payload = bufferDup.readBytes(bufferDup.writerIndex()).byteBuf(); break; } session.getProtocolHandler().send(messageId, address, qos, payload, deliveryCount); } - private int decideQoS(ServerMessage message, ServerConsumer consumer) { + private int decideQoS(Message message, ServerConsumer consumer) { int subscriptionQoS = -1; try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java index 596670b..0b52a0b 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java @@ -17,12 +17,12 @@ package org.apache.activemq.artemis.core.protocol.mqtt; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingContext; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.LinkedListIterator; @@ -44,7 +44,7 @@ public class MQTTRetainMessageManager { * the subscription queue for the consumer. When a new retained message is received the message will be sent to * the retained queue and the previous retain message consumed to remove it from the queue. */ - void handleRetainedMessage(ServerMessage message, String address, boolean reset, Transaction tx) throws Exception { + void handleRetainedMessage(Message message, String address, boolean reset, Transaction tx) throws Exception { SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address, session.getWildcardConfiguration())); Queue queue = session.getServer().locateQueue(retainAddress); @@ -82,7 +82,7 @@ public class MQTTRetainMessageManager { Queue retainedQueue = session.getServer().locateQueue(retainedQueueName); try (LinkedListIterator<MessageReference> i = retainedQueue.iterator()) { if (i.hasNext()) { - ServerMessage message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID()); + Message message = i.next().getMessage().copy(session.getServer().getStorageManager().generateID()); sendToQueue(message, queue, tx); } } @@ -95,7 +95,7 @@ public class MQTTRetainMessageManager { tx.commit(); } - private void sendToQueue(ServerMessage message, Queue queue, Transaction tx) throws Exception { + private void sendToQueue(Message message, Queue queue, Transaction tx) throws Exception { RoutingContext context = new RoutingContextImpl(tx); queue.route(message, context); session.getServer().getPostOffice().processRoute(message, context, false); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index 548b62c..a5b908f 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -17,10 +17,12 @@ package org.apache.activemq.artemis.core.protocol.mqtt; + +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.ServerConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; @@ -43,13 +45,13 @@ public class MQTTSessionCallback implements SessionCallback { @Override public int sendMessage(MessageReference reference, - ServerMessage message, + Message message, ServerConsumer consumer, int deliveryCount) { try { - session.getMqttPublishManager().sendMessage(message, consumer, deliveryCount); + session.getMqttPublishManager().sendMessage((CoreMessage)message, consumer, deliveryCount); } catch (Exception e) { - log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage()); + log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e); } return 1; } @@ -70,7 +72,7 @@ public class MQTTSessionCallback implements SessionCallback { @Override public int sendLargeMessage(MessageReference reference, - ServerMessage message, + Message message, ServerConsumer consumer, long bodySize, int deliveryCount) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index 7bc6b84..613fef3 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -24,12 +24,11 @@ import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.config.WildcardConfiguration; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; /** * A Utility Class for creating Server Side objects and converting MQTT concepts to/from Artemis. @@ -93,13 +92,13 @@ public class MQTTUtil { return MQTT_RETAIN_ADDRESS_PREFIX + MQTT_WILDCARD.convert(filter, wildcardConfiguration); } - private static ServerMessage createServerMessage(MQTTSession session, + private static ICoreMessage createServerMessage(MQTTSession session, SimpleString address, boolean retain, int qos) { long id = session.getServer().getStorageManager().generateID(); - ServerMessageImpl message = new ServerMessageImpl(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE); + CoreMessage message = new CoreMessage(id, DEFAULT_SERVER_MESSAGE_BUFFER_SIZE); message.setAddress(address); message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain); message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos); @@ -107,21 +106,20 @@ public class MQTTUtil { return message; } - public static ServerMessage createServerMessageFromByteBuf(MQTTSession session, + public static Message createServerMessageFromByteBuf(MQTTSession session, String topic, boolean retain, int qos, ByteBuf payload) { String coreAddress = convertMQTTAddressFilterToCore(topic, session.getWildcardConfiguration()); - ServerMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos); + ICoreMessage message = createServerMessage(session, new SimpleString(coreAddress), retain, qos); - // FIXME does this involve a copy? - message.getBodyBuffer().writeBytes(new ChannelBufferWrapper(payload), payload.readableBytes()); + message.getBodyBuffer().writeBytes(payload, 0, payload.readableBytes()); return message; } - public static ServerMessage createPubRelMessage(MQTTSession session, SimpleString address, int messageId) { - ServerMessage message = createServerMessage(session, address, false, 1); + public static Message createPubRelMessage(MQTTSession session, SimpleString address, int messageId) { + Message message = createServerMessage(session, address, false, 1); message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_ID_KEY), messageId); message.putIntProperty(new SimpleString(MQTTUtil.MQTT_MESSAGE_TYPE_KEY), MqttMessageType.PUBREL.value()); return message; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 5f408a6..46fe372 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -187,7 +187,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private AtomicBoolean disableTtl = new AtomicBoolean(false); - // TODO-NOW: check on why there are two connections created for every createConnection on the client. public OpenWireConnection(Connection connection, ActiveMQServer server, Executor executor, @@ -1060,8 +1059,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processRemoveProducer(ProducerId id) throws Exception { - - // TODO-now: proper implement this method return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 9b27b81..3808363 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -35,12 +35,12 @@ import java.util.zip.InflaterOutputStream; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil; import org.apache.activemq.artemis.core.server.MessageReference; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.utils.DataConstants; @@ -69,7 +69,7 @@ import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.wireformat.WireFormat; import org.fusesource.hawtbuf.UTF8Buffer; -public class OpenWireMessageConverter implements MessageConverter { +public class OpenWireMessageConverter implements MessageConverter<OpenwireMessage> { public static final String AMQ_PREFIX = "__HDR_"; public static final String AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = AMQ_PREFIX + "dlqDeliveryFailureCause"; @@ -102,16 +102,26 @@ public class OpenWireMessageConverter implements MessageConverter { } @Override - public Object outbound(ServerMessage message, int deliveryCount) { - // TODO: implement this + public OpenwireMessage fromCore(ICoreMessage coreMessage) throws Exception { return null; } @Override - public ServerMessage inbound(Object message) throws Exception { + public ICoreMessage toCore(OpenwireMessage pureMessage) throws Exception { + return null; + } + + // @Override + public Object outbound(org.apache.activemq.artemis.api.core.Message message, int deliveryCount) { + // TODO: implement this + return null; + } + +// @Override + public org.apache.activemq.artemis.api.core.Message inbound(Object message) throws Exception { Message messageSend = (Message) message; - ServerMessageImpl coreMessage = new ServerMessageImpl(-1, messageSend.getSize()); + CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize()); String type = messageSend.getType(); if (type != null) { @@ -157,7 +167,7 @@ public class OpenWireMessageConverter implements MessageConverter { mdataIn.close(); TypedProperties props = new TypedProperties(); loadMapIntoProperties(props, map); - props.encode(body); + props.encode(body.byteBuf()); break; case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE: if (messageCompressed) { @@ -415,7 +425,7 @@ public class OpenWireMessageConverter implements MessageConverter { } public static MessageDispatch createMessageDispatch(MessageReference reference, - ServerMessage message, + ICoreMessage message, AMQConsumer consumer) throws IOException, JMSException { ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getMarshaller(), consumer.getOpenwireDestination()); @@ -433,7 +443,7 @@ public class OpenWireMessageConverter implements MessageConverter { } private static ActiveMQMessage toAMQMessage(MessageReference reference, - ServerMessage coreMessage, + ICoreMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException { ActiveMQMessage amqMsg = null; @@ -476,7 +486,7 @@ public class OpenWireMessageConverter implements MessageConverter { } amqMsg.setBrokerInTime(brokerInTime); - ActiveMQBuffer buffer = coreMessage.getBodyBufferDuplicate(); + ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer(); Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED); boolean isCompressed = compressProp == null ? false : compressProp.booleanValue(); amqMsg.setCompressed(isCompressed); @@ -503,7 +513,7 @@ public class OpenWireMessageConverter implements MessageConverter { TypedProperties mapData = new TypedProperties(); //it could be a null map if (buffer.readableBytes() > 0) { - mapData.decode(buffer); + mapData.decode(buffer.byteBuf()); Map<String, Object> map = mapData.getMap(); ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize()); OutputStream os = out; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 5b62e3e..c0affb6 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -35,6 +35,7 @@ import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; import org.apache.activemq.artemis.api.core.client.TopologyMember; @@ -44,12 +45,10 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; -import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -137,7 +136,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl final ClusterManager clusterManager = this.server.getClusterManager(); - // TODO-NOW: use a property name for the cluster connection ClusterConnection cc = clusterManager.getDefaultConnection(null); if (cc != null) { @@ -236,11 +234,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl } @Override - public MessageConverter getConverter() { - return messageConverter; - } - - @Override public void removeHandler(String name) { }
