Repository: qpid-jms Updated Branches: refs/heads/master b5f00d23a -> df2d911d4
QPIDJMS-178 Ensure that the object stored in the ObjectMessage has a snapshot taken on set and a snapshot returned on get to ensure that changes to the outside object do not get reflected in the stored value as pre the spec. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/df2d911d Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/df2d911d Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/df2d911d Branch: refs/heads/master Commit: df2d911d4e75a65b50c23717636b0a6e15be526f Parents: b5f00d2 Author: Timothy Bish <[email protected]> Authored: Fri May 20 17:23:44 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri May 20 17:23:44 2016 -0400 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpConsumer.java | 10 +-- .../amqp/message/AmqpJmsMessageBuilder.java | 33 ++++++---- .../message/AmqpJmsObjectMessageFacade.java | 20 +++--- .../amqp/message/AmqpMessageSupport.java | 44 +++++++++++++ .../message/AmqpSerializedObjectDelegate.java | 40 ++++++++++-- .../amqp/message/AmqpTypedObjectDelegate.java | 60 +++++++++++++---- .../amqp/message/AmqpJmsMessageBuilderTest.java | 51 ++++++++------- .../message/AmqpJmsMessageTypesTestCase.java | 4 +- .../message/AmqpJmsObjectMessageFacadeTest.java | 69 ++++++++++++++++++++ 9 files changed, 257 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index f7a7200..a162af3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -48,7 +48,6 @@ import org.apache.qpid.proton.amqp.messaging.Released; import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -426,10 +425,9 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver private boolean processDelivery(Delivery incoming) throws Exception { incoming.setDefaultDeliveryState(Released.getInstance()); - Message amqpMessage = decodeIncomingMessage(incoming); JmsMessage message = null; try { - message = AmqpJmsMessageBuilder.createJmsMessage(this, amqpMessage); + message = AmqpJmsMessageBuilder.createJmsMessage(this, unwrapIncomingMessage(incoming)); } catch (Exception e) { LOG.warn("Error on transform: {}", e.getMessage()); // TODO - We could signal provider error but not sure we want to fail @@ -531,7 +529,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } } - protected Message decodeIncomingMessage(Delivery incoming) { + protected ByteBuf unwrapIncomingMessage(Delivery incoming) { int count; while ((count = getEndpoint().recv(incomingBuffer.array(), incomingBuffer.writerIndex(), incomingBuffer.writableBytes())) > 0) { @@ -542,9 +540,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } try { - Message protonMessage = Message.Factory.create(); - protonMessage.decode(incomingBuffer.array(), 0, incomingBuffer.readableBytes()); - return protonMessage; + return incomingBuffer.duplicate(); } finally { incomingBuffer.clear(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java index 15f6d1d..4385246 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java @@ -25,6 +25,7 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_S import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_TEXT_MESSAGE; import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE; import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.decodeMessage; import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.isContentType; import java.io.IOException; @@ -47,6 +48,8 @@ import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.message.Message; +import io.netty.buffer.ByteBuf; + /** * Builder class used to construct the appropriate JmsMessage / JmsMessageFacade * objects to wrap an incoming AMQP Message. @@ -59,23 +62,25 @@ public class AmqpJmsMessageBuilder { * * @param consumer * The provider AMQP Consumer instance where this message arrived at. - * @param message - * The Proton Message object that will be wrapped. + * @param messageBytes + * The the raw bytes that compose the incoming message. (Read-Only) * * @return a JmsMessage instance properly configured for dispatch to the provider listener. * * @throws IOException if an error occurs while creating the message objects. */ - public static JmsMessage createJmsMessage(AmqpConsumer consumer, Message message) throws IOException { + public static JmsMessage createJmsMessage(AmqpConsumer consumer, ByteBuf messageBytes) throws IOException { + + Message amqpMessage = decodeMessage(messageBytes); // First we try the easy way, if the annotation is there we don't have to work hard. - JmsMessage result = createFromMsgAnnotation(consumer, message); + JmsMessage result = createFromMsgAnnotation(consumer, amqpMessage, messageBytes); if (result != null) { return result; } // Next, match specific section structures and content types - result = createWithoutAnnotation(consumer, message); + result = createWithoutAnnotation(consumer, amqpMessage, messageBytes); if (result != null) { return result; } @@ -83,7 +88,7 @@ public class AmqpJmsMessageBuilder { throw new IOException("Could not create a JMS message from incoming message"); } - private static JmsMessage createFromMsgAnnotation(AmqpConsumer consumer, Message message) throws IOException { + private static JmsMessage createFromMsgAnnotation(AmqpConsumer consumer, Message message, ByteBuf messageBytes) throws IOException { Object annotation = AmqpMessageSupport.getMessageAnnotation(JMS_MSG_TYPE, message); if (annotation != null) { @@ -99,7 +104,7 @@ public class AmqpJmsMessageBuilder { case JMS_STREAM_MESSAGE: return createStreamMessage(consumer, message); case JMS_OBJECT_MESSAGE: - return createObjectMessage(consumer, message); + return createObjectMessage(consumer, message, messageBytes); default: throw new IOException("Invalid JMS Message Type annotation value found in message: " + annotation); } @@ -108,12 +113,12 @@ public class AmqpJmsMessageBuilder { return null; } - private static JmsMessage createWithoutAnnotation(AmqpConsumer consumer, Message message) { + private static JmsMessage createWithoutAnnotation(AmqpConsumer consumer, Message message, ByteBuf messageBytes) { Section body = message.getBody(); if (body == null) { if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) { - return createObjectMessage(consumer, message); + return createObjectMessage(consumer, message, messageBytes); } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) { return createBytesMessage(consumer, message); } else { @@ -128,7 +133,7 @@ public class AmqpJmsMessageBuilder { if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) { return createBytesMessage(consumer, message); } else if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) { - return createObjectMessage(consumer, message); + return createObjectMessage(consumer, message, messageBytes); } else { Charset charset = getCharsetForTextualContent(message.getContentType()); if (charset != null) { @@ -145,17 +150,17 @@ public class AmqpJmsMessageBuilder { } else if (value instanceof Binary) { return createBytesMessage(consumer, message); } else { - return createObjectMessage(consumer, message); + return createObjectMessage(consumer, message, messageBytes); } } else if (body instanceof AmqpSequence) { - return createObjectMessage(consumer, message); + return createObjectMessage(consumer, message, messageBytes); } return null; } - private static JmsObjectMessage createObjectMessage(AmqpConsumer consumer, Message message) { - return new JmsObjectMessage(new AmqpJmsObjectMessageFacade(consumer, message)); + private static JmsObjectMessage createObjectMessage(AmqpConsumer consumer, Message message, ByteBuf messageBytes) { + return new JmsObjectMessage(new AmqpJmsObjectMessageFacade(consumer, message, messageBytes.copy())); } private static JmsStreamMessage createStreamMessage(AmqpConsumer consumer, Message message) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java index 6935ea9..4db872a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java @@ -30,6 +30,8 @@ import org.apache.qpid.jms.provider.amqp.AmqpConnection; import org.apache.qpid.jms.provider.amqp.AmqpConsumer; import org.apache.qpid.proton.message.Message; +import io.netty.buffer.ByteBuf; + /** * Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage * type. @@ -50,7 +52,7 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements super(connection); setMessageAnnotation(JMS_MSG_TYPE, JMS_OBJECT_MESSAGE); - initDelegate(isAmqpTypeEncoded); + initDelegate(isAmqpTypeEncoded, null); } /** @@ -61,12 +63,14 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements * the consumer that received this message. * @param message * the incoming Message instance that is being wrapped. + * @param messageBytes + * a copy of the raw bytes of the incoming message. */ - public AmqpJmsObjectMessageFacade(AmqpConsumer consumer, Message message) { + public AmqpJmsObjectMessageFacade(AmqpConsumer consumer, Message message, ByteBuf messageBytes) { super(consumer, message); boolean javaSerialized = AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.equals(message.getContentType()); - initDelegate(!javaSerialized); + initDelegate(!javaSerialized, messageBytes); } /** @@ -126,9 +130,9 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements AmqpObjectTypeDelegate newDelegate = null; if (useAmqpTypedEncoding) { - newDelegate = new AmqpTypedObjectDelegate(message); + newDelegate = new AmqpTypedObjectDelegate(message, null); } else { - newDelegate = new AmqpSerializedObjectDelegate(message); + newDelegate = new AmqpSerializedObjectDelegate(message, null); } newDelegate.setObject(existingObject); @@ -140,11 +144,11 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements } } - private void initDelegate(boolean useAmqpTypes) { + private void initDelegate(boolean useAmqpTypes, ByteBuf messageBytes) { if (!useAmqpTypes) { - delegate = new AmqpSerializedObjectDelegate(getAmqpMessage()); + delegate = new AmqpSerializedObjectDelegate(getAmqpMessage(), messageBytes); } else { - delegate = new AmqpTypedObjectDelegate(getAmqpMessage()); + delegate = new AmqpTypedObjectDelegate(getAmqpMessage(), messageBytes); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java index 09c9160..4d93167 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java @@ -21,6 +21,9 @@ import java.util.Map; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.message.Message; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + /** * Support class containing constant values and static methods that are * used to map to / from AMQP Message types being sent or received. @@ -148,4 +151,45 @@ public final class AmqpMessageSupport { return contentType.equals(message.getContentType()); } } + + /** + * Given a byte buffer that represents an encoded AMQP Message instance, + * decode and return the Message. + * + * @param encodedBytes + * the bytes that represent an encoded AMQP Message. + * + * @return a new Message instance with the decoded data. + */ + public static Message decodeMessage(ByteBuf encodedBytes) { + // For now we must fully decode the message to get at the annotations. + Message protonMessage = Message.Factory.create(); + protonMessage.decode(encodedBytes.array(), 0, encodedBytes.readableBytes()); + return protonMessage; + } + + /** + * Given a Message instance, encode the Message to the wire level representation + * of that Message. + * + * @param message + * the Message that is to be encoded into the wire level representation. + * + * @return a buffer containing the wire level representation of the input Message. + */ + public static ByteBuf encodeMessage(Message message) { + final int BUFFER_SIZE = 4096; + byte[] encodedMessage = new byte[BUFFER_SIZE]; + int encodedSize = 0; + while (true) { + try { + encodedSize = message.encode(encodedMessage, 0, encodedMessage.length); + break; + } catch (java.nio.BufferOverflowException e) { + encodedMessage = new byte[encodedMessage.length * 2]; + } + } + + return Unpooled.wrappedBuffer(encodedMessage, 0, encodedSize); + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java index d928e58..546060b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java @@ -17,12 +17,14 @@ package org.apache.qpid.jms.provider.amqp.message; import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.decodeMessage; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.jms.util.ClassLoadingAwareObjectInputStream; import org.apache.qpid.proton.amqp.Binary; @@ -30,6 +32,8 @@ import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.message.Message; +import io.netty.buffer.ByteBuf; + /** * Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage * type. @@ -50,21 +54,33 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate { } private final Message message; + private final AtomicReference<Section> cachedReceivedBody = new AtomicReference<Section>(); + private ByteBuf messageBytes; /** * Create a new delegate that uses Java serialization to store the message content. * * @param message * the AMQP message instance where the object is to be stored / read. + * @param messageBytes + * the raw bytes that comprise the message when it was received. */ - public AmqpSerializedObjectDelegate(Message message) { + public AmqpSerializedObjectDelegate(Message message, ByteBuf messageBytes) { this.message = message; this.message.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); + this.messageBytes = messageBytes; + + // We will decode the body on each access, so clear the current value + // so we don't carry along unneeded bloat. + if (messageBytes != null) { + cachedReceivedBody.set(message.getBody()); + } } private static byte[] getSerializedBytes(Serializable value) throws IOException { try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos)) { + ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(value); oos.flush(); oos.close(); @@ -77,7 +93,15 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate { public Serializable getObject() throws IOException, ClassNotFoundException { Binary bin = null; - Section body = message.getBody(); + Section body = cachedReceivedBody.getAndSet(null); + if (body == null) { + if (messageBytes != null) { + body = decodeMessage(messageBytes).getBody(); + } else { + body = message.getBody(); + } + } + if (body == null || body == NULL_OBJECT_BODY) { return null; } else if (body instanceof Data) { @@ -103,18 +127,22 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate { @Override public void setObject(Serializable value) throws IOException { - if(value == null) { + cachedReceivedBody.set(null); + + if (value == null) { message.setBody(NULL_OBJECT_BODY); } else { byte[] bytes = getSerializedBytes(value); message.setBody(new Data(new Binary(bytes))); } + + messageBytes = null; } @Override public void onSend() { - this.message.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); - if(message.getBody() == null) { + message.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); + if (message.getBody() == null) { message.setBody(NULL_OBJECT_BODY); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java index 483771e..99ab86b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java @@ -16,10 +16,14 @@ */ package org.apache.qpid.jms.provider.amqp.message; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.decodeMessage; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.encodeMessage; + import java.io.IOException; import java.io.Serializable; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.proton.amqp.messaging.AmqpSequence; import org.apache.qpid.proton.amqp.messaging.AmqpValue; @@ -27,6 +31,8 @@ import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.message.Message; +import io.netty.buffer.ByteBuf; + /** * Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage * type. @@ -36,24 +42,41 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate { static final AmqpValue NULL_OBJECT_BODY = new AmqpValue(null); private final Message message; + private final AtomicReference<Section> cachedReceivedBody = new AtomicReference<Section>(); + private ByteBuf messageBytes; /** * Create a new delegate that uses Java serialization to store the message content. * * @param message * the AMQP message instance where the object is to be stored / read. + * @param messageBytes + * the raw bytes that comprise the AMQP message that was received. */ - public AmqpTypedObjectDelegate(Message message) { + public AmqpTypedObjectDelegate(Message message, ByteBuf messageBytes) { this.message = message; this.message.setContentType(null); + this.messageBytes = messageBytes; + + // We will decode the body on each access, so clear the current value + // so we don't carry along unneeded bloat. + if (messageBytes != null) { + cachedReceivedBody.set(message.getBody()); + } } @Override public Serializable getObject() throws IOException, ClassNotFoundException { - // TODO: this should actually return a snapshot of the object, so we - // need to save the bytes so we can return an equal/unmodified object later + Section body = cachedReceivedBody.getAndSet(null); + + if (body == null) { + if (messageBytes != null) { + body = decodeMessage(messageBytes).getBody(); + } else { + body = message.getBody(); + } + } - Section body = message.getBody(); if (body == null) { return null; } else if (body instanceof AmqpValue) { @@ -76,19 +99,28 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate { @Override public void setObject(Serializable value) throws IOException { + cachedReceivedBody.set(null); + if (value == null) { message.setBody(NULL_OBJECT_BODY); + messageBytes = null; } else if (isSupportedAmqpValueObjectType(value)) { - // TODO: This is a temporary hack, we actually need to take a snapshot of the object - // at this point in time, not simply set the object itself into the Proton message. - // We will need to encode it now, first to save the snapshot to send, and also to - // verify up front that we can actually send it later. - - // Even if we do that we would currently then need to decode it later to set the - // body to send, unless we augment Proton to allow setting the bytes directly. - // We will always need to decode bytes to return a snapshot from getObject(). We - // will need to save the bytes somehow to support that on received messages. - message.setBody(new AmqpValue(value)); + Message transfer = Message.Factory.create(); + + // Exchange the incoming body value for one that is created from encoding + // and decoding the value. + transfer.setBody(new AmqpValue(value)); + messageBytes = encodeMessage(transfer); + transfer = decodeMessage(messageBytes); + messageBytes = null; + + // This step requires a heavy-weight operation of both encoding and decoding the + // incoming body value in order to create a copy such that changes to the original + // do not affect the stored value. In the future it makes sense to try to enhance + // proton such that we can encode the body and use those bytes directly on the + // message as it is being sent. + + message.setBody(transfer.getBody()); } else { // TODO: Data and AmqpSequence? throw new IllegalArgumentException("Encoding this object type with the AMQP type system is not supported: " + value.getClass().getName()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilderTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilderTest.java index 2ebff2b..a534de0 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilderTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilderTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.jms.provider.amqp.message; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.encodeMessage; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -32,6 +33,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.qpid.jms.message.JmsBytesMessage; import org.apache.qpid.jms.message.JmsMessage; @@ -88,7 +90,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { MessageAnnotations messageAnnotations = new MessageAnnotations(map); message.setMessageAnnotations(messageAnnotations); - AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); } /** @@ -108,7 +110,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { MessageAnnotations messageAnnotations = new MessageAnnotations(map); message.setMessageAnnotations(messageAnnotations); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsMessage.class, jmsMessage.getClass()); @@ -134,7 +136,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { MessageAnnotations messageAnnotations = new MessageAnnotations(map); message.setMessageAnnotations(messageAnnotations); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass()); @@ -160,7 +162,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { MessageAnnotations messageAnnotations = new MessageAnnotations(map); message.setMessageAnnotations(messageAnnotations); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsTextMessage.class, jmsMessage.getClass()); @@ -209,7 +211,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); } - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass()); @@ -234,6 +236,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { */ @Test public void testCreateStreamMessageFromMessageTypeAnnotation() throws Exception { + Message message = Proton.message(); Map<Symbol, Object> map = new HashMap<Symbol, Object>(); @@ -242,7 +245,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { MessageAnnotations messageAnnotations = new MessageAnnotations(map); message.setMessageAnnotations(messageAnnotations); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsStreamMessage.class, jmsMessage.getClass()); @@ -268,7 +271,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { Message message = Proton.message(); message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass()); @@ -289,7 +292,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { assertNull(message.getContentType()); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass()); @@ -310,7 +313,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { Message message = Proton.message(); message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass()); @@ -327,7 +330,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { Message message = Proton.message(); message.setContentType("text/plain"); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsTextMessage.class, jmsMessage.getClass()); @@ -347,7 +350,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { Message message = Proton.message(); message.setContentType("unknown-content-type"); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsMessage.class, jmsMessage.getClass()); @@ -372,7 +375,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { message.setBody(new Data(binary)); message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass()); @@ -394,7 +397,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { message.setBody(new Data(binary)); message.setContentType("unknown-content-type"); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass()); @@ -418,7 +421,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { assertNull(message.getContentType()); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass()); @@ -441,7 +444,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { message.setBody(new Data(binary)); message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass()); @@ -554,7 +557,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { message.setBody(new Data(binary)); message.setContentType(contentType); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsTextMessage.class, jmsMessage.getClass()); @@ -579,7 +582,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { Message message = Proton.message(); message.setBody(new AmqpValue("content")); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsTextMessage.class, jmsMessage.getClass()); @@ -599,7 +602,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { Message message = Proton.message(); message.setBody(new AmqpValue(null)); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsTextMessage.class, jmsMessage.getClass()); @@ -620,7 +623,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { Map<String, String> map = new HashMap<String,String>(); message.setBody(new AmqpValue(map)); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass()); @@ -644,7 +647,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { List<String> list = new ArrayList<String>(); message.setBody(new AmqpValue(list)); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass()); @@ -668,7 +671,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { Binary binary = new Binary(new byte[0]); message.setBody(new AmqpValue(binary)); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsBytesMessage.class, jmsMessage.getClass()); @@ -686,9 +689,9 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { @Test public void testCreateObjectMessageFromAmqpValueWithUncategorisedContent() throws Exception { Message message = Proton.message(); - message.setBody(new AmqpValue(new Object()));// This obviously shouldn't happen in practice + message.setBody(new AmqpValue(UUID.randomUUID())); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass()); @@ -715,7 +718,7 @@ public class AmqpJmsMessageBuilderTest extends QpidJmsTestCase { List<String> list = new ArrayList<String>(); message.setBody(new AmqpSequence(list)); - JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, message); + JmsMessage jmsMessage = AmqpJmsMessageBuilder.createJmsMessage(mockConsumer, encodeMessage(message)); assertNotNull("Message should not be null", jmsMessage); assertEquals("Unexpected message class type", JmsObjectMessage.class, jmsMessage.getClass()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java index c49e97d..3775269 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java @@ -16,6 +16,8 @@ */ package org.apache.qpid.jms.provider.amqp.message; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.encodeMessage; + import java.nio.charset.StandardCharsets; import org.apache.qpid.jms.JmsDestination; @@ -85,7 +87,7 @@ public class AmqpJmsMessageTypesTestCase extends QpidJmsTestCase { } protected AmqpJmsObjectMessageFacade createReceivedObjectMessageFacade(AmqpConsumer amqpConsumer, Message message) { - return new AmqpJmsObjectMessageFacade(amqpConsumer, message); + return new AmqpJmsObjectMessageFacade(amqpConsumer, message, encodeMessage(message)); } protected AmqpConsumer createMockAmqpConsumer() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df2d911d/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacadeTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacadeTest.java index bd0c24e..f95c685 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacadeTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacadeTest.java @@ -285,4 +285,73 @@ public class AmqpJmsObjectMessageFacadeTest extends AmqpJmsMessageTypesTestCase // expected } } + + /** + * Test that setting an object on a received message and later getting the value, returns an + * equal but different object that does not pick up intermediate changes to the set object. + * + * @throws Exception if an error occurs during the test. + */ + @Test + public void testSetThenGetObjectOnSerializedReceivedMessageNoContentTypeReturnsSnapshot() throws Exception { + doTestSetThenGetObjectOnSerializedReceivedMessageReturnsSnapshot(false); + } + + @Test + public void testSetThenGetObjectOnSerializedReceivedMessageReturnsSnapshot() throws Exception { + doTestSetThenGetObjectOnSerializedReceivedMessageReturnsSnapshot(true); + } + + @SuppressWarnings("unchecked") + private void doTestSetThenGetObjectOnSerializedReceivedMessageReturnsSnapshot(boolean contentType) throws Exception { + + HashMap<String, String> origMap = new HashMap<String, String>(); + origMap.put("key1", "value1"); + + Message message = Message.Factory.create(); + if (contentType) { + message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); + message.setBody(new Data(new Binary(getSerializedBytes(origMap)))); + } else { + message.setBody(new AmqpValue(origMap)); + } + AmqpJmsObjectMessageFacade amqpObjectMessageFacade = createReceivedObjectMessageFacade(createMockAmqpConsumer(), message); + + // verify we get a different-but-equal object back + Serializable serialized = amqpObjectMessageFacade.getObject(); + assertTrue("Unexpected object type returned", serialized instanceof Map<?, ?>); + Map<String, String> returnedObject1 = (Map<String, String>) serialized; + if (contentType) { + assertNotSame("Expected different objects, due to snapshot being taken", origMap, returnedObject1); + } else { + assertSame("Expected same objects, due to initial snapshot of delivered value", origMap, returnedObject1); + } + assertEquals("Expected equal objects, due to snapshot being taken", origMap, returnedObject1); + + // verify we get a different-but-equal object back when compared to the previously retrieved object + Serializable serialized2 = amqpObjectMessageFacade.getObject(); + assertTrue("Unexpected object type returned", serialized2 instanceof Map<?, ?>); + Map<String, String> returnedObject2 = (Map<String, String>) serialized2; + assertNotSame("Expected different objects, due to snapshot being taken", returnedObject1, returnedObject2); + assertEquals("Expected equal objects, due to snapshot being taken", returnedObject1, returnedObject2); + + // mutate the first returned object + returnedObject1.put("key2", "value2"); + + // verify the mutated map is a different and not equal object + assertNotSame("Expected different objects, due to snapshot being taken", returnedObject1, returnedObject2); + assertNotEquals("Expected objects to differ, due to snapshot being taken", returnedObject1, returnedObject2); + } + + private static byte[] getSerializedBytes(Serializable value) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) { + + oos.writeObject(value); + oos.flush(); + oos.close(); + + return baos.toByteArray(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
