Author: robbie
Date: Tue May 6 12:24:52 2014
New Revision: 1592732
URL: http://svn.apache.org/r1592732
Log:
QPIDJMS-18: initial work on implementing ObjectMessage
Added:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsMessageFormatException.java
- copied, changed from r1592044,
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ObjectMessageIntegrationTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessageTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ObjectMessageImplTest.java
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java?rev=1592732&r1=1592731&r2=1592732&view=diff
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java
(original)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java
Tue May 6 12:24:52 2014
@@ -40,9 +40,9 @@ public class AmqpMessageFactory
{
return new AmqpTextMessage(delivery, message, amqpConnection);
}
- else if(isContentType(AmqpObjectMessage.CONTENT_TYPE, message))
+ else if(isContentType(AmqpSerializedObjectMessage.CONTENT_TYPE,
message))
{
- return new AmqpObjectMessage(delivery, message,
amqpConnection);
+ return new AmqpSerializedObjectMessage(delivery, message,
amqpConnection);
}
else if(isContentType(AmqpBytesMessage.CONTENT_TYPE, message) ||
isContentType(null, message))
{
@@ -59,9 +59,9 @@ public class AmqpMessageFactory
{
return new AmqpBytesMessage(delivery, message, amqpConnection);
}
- else if(isContentType(AmqpObjectMessage.CONTENT_TYPE, message))
+ else if(isContentType(AmqpSerializedObjectMessage.CONTENT_TYPE,
message))
{
- return new AmqpObjectMessage(delivery, message,
amqpConnection);
+ return new AmqpSerializedObjectMessage(delivery, message,
amqpConnection);
}
}
else if(body instanceof AmqpValue)
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java?rev=1592732&r1=1592731&r2=1592732&view=diff
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
(original)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
Tue May 6 12:24:52 2014
@@ -18,13 +18,14 @@
*/
package org.apache.qpid.jms.engine;
+import java.io.IOException;
+import java.io.Serializable;
+
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.message.Message;
-public class AmqpObjectMessage extends AmqpMessage
+public abstract class AmqpObjectMessage extends AmqpMessage
{
- public static final String CONTENT_TYPE =
"application/x-java-serialized-object";
-
public AmqpObjectMessage()
{
super();
@@ -35,5 +36,7 @@ public class AmqpObjectMessage extends A
super(message, delivery, amqpConnection);
}
- //TODO: methods to access/set content
+ public abstract void setObject(Serializable serializable) throws
IOException;
+
+ public abstract Serializable getObject() throws IOException,
ClassNotFoundException;
}
Added:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessage.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessage.java?rev=1592732&view=auto
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessage.java
(added)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessage.java
Tue May 6 12:24:52 2014
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.engine;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+
+public class AmqpSerializedObjectMessage extends AmqpObjectMessage
+{
+ public static final String CONTENT_TYPE =
"application/x-java-serialized-object";
+
+ public AmqpSerializedObjectMessage()
+ {
+ super();
+ setContentType(CONTENT_TYPE);
+ }
+
+ public AmqpSerializedObjectMessage(Delivery delivery, Message message,
AmqpConnection amqpConnection)
+ {
+ super(delivery, message, amqpConnection);
+ }
+
+ /**
+ * Sets the serialized object as a data section in the underlying message,
or
+ * clears the body section if null.
+ */
+ @Override
+ public void setObject(Serializable serializable) throws IOException
+ {
+ if(serializable == null)
+ {
+ //TODO: verify whether not sending a body is ok,
+ //send a serialized null instead if it isn't
+ getMessage().setBody(null);
+ }
+ else
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(serializable);
+ oos.flush();
+ oos.close();
+
+ byte[] bytes = baos.toByteArray();
+
+ getMessage().setBody(new Data(new Binary(bytes)));
+ }
+
+ //TODO: ensure content type is [still] set?
+ }
+
+ /**
+ * Returns the deserialized object, or null if no object data is present.
+ */
+ @Override
+ public Serializable getObject() throws IllegalStateException, IOException,
ClassNotFoundException
+ {
+ Binary bin = null;
+
+ Section body = getMessage().getBody();
+ if(body == null)
+ {
+ return null;
+ }
+ else if(body instanceof Data)
+ {
+ bin = ((Data) body).getValue();
+ }
+ else
+ {
+ throw new IllegalStateException("Unexpected body type: " +
body.getClass().getSimpleName());
+ }
+
+ if(bin == null)
+ {
+ return null;
+ }
+ else
+ {
+ ByteArrayInputStream bais = new
ByteArrayInputStream(bin.getArray(), bin.getArrayOffset(), bin.getLength());
+ ObjectInputStream ois = new ObjectInputStream(bais);
+
+ return (Serializable) ois.readObject();
+ }
+ }
+}
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java?rev=1592732&r1=1592731&r2=1592732&view=diff
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
(original)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
Tue May 6 12:24:52 2014
@@ -25,13 +25,15 @@ import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import org.apache.qpid.jms.engine.AmqpObjectMessage;
+import org.apache.qpid.jms.engine.AmqpSerializedObjectMessage;
+//TODO: support requesting to send an AMQP map/list/value instead of
serialized binary data
public class ObjectMessageImpl extends MessageImpl<AmqpObjectMessage>
implements ObjectMessage
{
//message to be sent
public ObjectMessageImpl(SessionImpl sessionImpl, ConnectionImpl
connectionImpl) throws JMSException
{
- super(new AmqpObjectMessage(), sessionImpl, connectionImpl);
+ super(new AmqpSerializedObjectMessage(), sessionImpl, connectionImpl);
}
//message just received
@@ -43,30 +45,53 @@ public class ObjectMessageImpl extends M
@Override
protected AmqpObjectMessage
prepareUnderlyingAmqpMessageForSending(AmqpObjectMessage amqpMessage)
{
- //TODO
- throw new UnsupportedOperationException("Not Implemented");
+ //Currently nothing to do, we [de]serialize the bytes direct to/from
the underlying message.
+ return amqpMessage;
+
+ //TODO: verify we haven't been requested to send an AMQP map/list
instead of serialized binary data,
+ //we might need to convert if we have been asked to (or not to).
}
//======= JMS Methods =======
@Override
- public void setObject(Serializable object) throws JMSException
+ public void setObject(Serializable serializable) throws JMSException
{
- //TODO
- throw new UnsupportedOperationException("Not Implemented");
+ checkBodyWritable();
+ try
+ {
+ getUnderlyingAmqpMessage(false).setObject(serializable);
+ }
+ catch (Exception e)
+ {
+ throw new QpidJmsMessageFormatException("Exception while setting
Object", e);
+ }
}
@Override
public Serializable getObject() throws JMSException
{
- //TODO
- throw new UnsupportedOperationException("Not Implemented");
+ try
+ {
+ return getUnderlyingAmqpMessage(false).getObject();
+ }
+ catch (Exception e)
+ {
+ throw new QpidJmsMessageFormatException("Exception while getting
Object", e);
+ }
}
@Override
public void clearBody() throws JMSException
{
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
+ try
+ {
+ getUnderlyingAmqpMessage(false).setObject(null);
+ setBodyWritable(true);
+ }
+ catch (Exception e)
+ {
+ throw new QpidJmsMessageFormatException("Exception while clearing
Object body", e);
+ }
}
}
Copied:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsMessageFormatException.java
(from r1592044,
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java)
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsMessageFormatException.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsMessageFormatException.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java&r1=1592044&r2=1592732&rev=1592732&view=diff
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
(original)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsMessageFormatException.java
Tue May 6 12:24:52 2014
@@ -1,4 +1,5 @@
/*
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,25 +16,28 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
+ *
*/
-package org.apache.qpid.jms.engine;
+package org.apache.qpid.jms.impl;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.message.Message;
+import javax.jms.MessageFormatException;
-public class AmqpObjectMessage extends AmqpMessage
+public class QpidJmsMessageFormatException extends MessageFormatException
{
- public static final String CONTENT_TYPE =
"application/x-java-serialized-object";
+ private static final long serialVersionUID = -6968274239066827833L;
- public AmqpObjectMessage()
+ public QpidJmsMessageFormatException(String reason)
{
- super();
+ this(reason, null);
}
- public AmqpObjectMessage(Delivery delivery, Message message,
AmqpConnection amqpConnection)
+ public QpidJmsMessageFormatException(String reason, Exception cause)
{
- super(message, delivery, amqpConnection);
+ super(reason);
+ if (cause != null)
+ {
+ setLinkedException(cause);
+ initCause(cause);
+ }
}
-
- //TODO: methods to access/set content
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1592732&r1=1592731&r2=1592732&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
(original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Tue
May 6 12:24:52 2014
@@ -242,8 +242,7 @@ public class SessionImpl implements Sess
@Override
public ObjectMessage createObjectMessage() throws JMSException
{
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
+ return new ObjectMessageImpl(this, getConnectionImpl());
}
@Override
Added:
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ObjectMessageIntegrationTest.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ObjectMessageIntegrationTest.java?rev=1592732&view=auto
==============================================================================
---
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ObjectMessageIntegrationTest.java
(added)
+++
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ObjectMessageIntegrationTest.java
Tue May 6 12:24:52 2014
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.engine.AmqpSerializedObjectMessage;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import
org.apache.qpid.jms.test.testpeer.describedtypes.sections.DataDescribedType;
+import
org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import
org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
+import
org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedDataMatcher;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+public class ObjectMessageIntegrationTest extends QpidJmsTestCase
+{
+ private final IntegrationTestFixture _testFixture = new
IntegrationTestFixture();
+
+ @Test
+ public void testSendBasicObjectMessageWithSerializedContent() throws
Exception
+ {
+ try(TestAmqpPeer testPeer = new
TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ String content = "myObjectString";
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(content);
+ oos.flush();
+ oos.close();
+ byte[] bytes = baos.toByteArray();
+
+ MessageHeaderSectionMatcher headersMatcher = new
MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new
MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propertiesMatcher = new
MessagePropertiesSectionMatcher(true);
+
propertiesMatcher.withContentType(equalTo(Symbol.valueOf(AmqpSerializedObjectMessage.CONTENT_TYPE)));
+ TransferPayloadCompositeMatcher messageMatcher = new
TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propertiesMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedDataMatcher(new
Binary(bytes)));
+
+ testPeer.expectTransfer(messageMatcher);
+
+ ObjectMessage message = session.createObjectMessage();
+ message.setObject(content);
+
+ producer.send(message);
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+
+ @Test
+ public void testReceiveBasicObjectMessageWithSerializedContent() throws
Exception
+ {
+ try(TestAmqpPeer testPeer = new
TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ PropertiesDescribedType properties = new PropertiesDescribedType();
+
properties.setContentType(Symbol.valueOf(AmqpSerializedObjectMessage.CONTENT_TYPE));
+
+ String expectedContent = "expectedContent";
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(expectedContent);
+ oos.flush();
+ oos.close();
+ byte[] bytes = baos.toByteArray();
+
+ DescribedType dataContent = new DataDescribedType(new
Binary(bytes));
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, properties,
null, dataContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof ObjectMessage);
+ ObjectMessage objectMessage = (ObjectMessage)receivedMessage;
+
+ Object object = objectMessage.getObject();
+ assertNotNull("Expected object but got null", object);
+ assertEquals("Message body object was not as expected",
expectedContent, object);
+ }
+ }
+
+ @Test
+ public void
testRecieveAndThenResendBasicObjectMessageWithSerializedContent() throws
Exception
+ {
+ try(TestAmqpPeer testPeer = new
TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ PropertiesDescribedType properties = new PropertiesDescribedType();
+
properties.setContentType(Symbol.valueOf(AmqpSerializedObjectMessage.CONTENT_TYPE));
+
+ String expectedContent = "expectedContent";
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(expectedContent);
+ oos.flush();
+ oos.close();
+ byte[] bytes = baos.toByteArray();
+
+ DescribedType dataContent = new DataDescribedType(new
Binary(bytes));
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, properties,
null, dataContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof ObjectMessage);
+
+ testPeer.expectSenderAttach();
+ MessageProducer producer = session.createProducer(queue);
+
+ MessageHeaderSectionMatcher headersMatcher = new
MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new
MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propertiesMatcher = new
MessagePropertiesSectionMatcher(true);
+
propertiesMatcher.withContentType(equalTo(Symbol.valueOf(AmqpSerializedObjectMessage.CONTENT_TYPE)));
+ TransferPayloadCompositeMatcher messageMatcher = new
TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propertiesMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedDataMatcher(new
Binary(bytes)));
+
+ testPeer.expectTransfer(messageMatcher);
+
+ producer.send(receivedMessage);
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+}
Modified:
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java?rev=1592732&r1=1592731&r2=1592732&view=diff
==============================================================================
---
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java
(original)
+++
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java
Tue May 6 12:24:52 2014
@@ -99,10 +99,10 @@ public class AmqpMessageFactoryTest exte
{
//TODO: this test only required if we decide that not sending a content
body is legal
Message message = Proton.message();
- message.setContentType(AmqpObjectMessage.CONTENT_TYPE);
+ message.setContentType(AmqpSerializedObjectMessage.CONTENT_TYPE);
AmqpMessage amqpMessage =
_amqpMessageFactory.createAmqpMessage(_mockDelivery, message,
_mockAmqpConnection);
- assertEquals(AmqpObjectMessage.class, amqpMessage.getClass());
+ assertEquals(AmqpSerializedObjectMessage.class, amqpMessage.getClass());
}
/**
@@ -180,10 +180,10 @@ public class AmqpMessageFactoryTest exte
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
- message.setContentType(AmqpObjectMessage.CONTENT_TYPE);
+ message.setContentType(AmqpSerializedObjectMessage.CONTENT_TYPE);
AmqpMessage amqpMessage =
_amqpMessageFactory.createAmqpMessage(_mockDelivery, message,
_mockAmqpConnection);
- assertEquals(AmqpObjectMessage.class, amqpMessage.getClass());
+ assertEquals(AmqpSerializedObjectMessage.class, amqpMessage.getClass());
}
/**
Added:
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessageTest.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessageTest.java?rev=1592732&view=auto
==============================================================================
---
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessageTest.java
(added)
+++
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessageTest.java
Tue May 6 12:24:52 2014
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.engine;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class AmqpSerializedObjectMessageTest extends QpidJmsTestCase
+{
+ private AmqpConnection _mockAmqpConnection;
+ private Delivery _mockDelivery;
+
+ @Before
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _mockAmqpConnection = Mockito.mock(AmqpConnection.class);
+ _mockDelivery = Mockito.mock(Delivery.class);
+ }
+
+ @Test
+ public void testGetObjectWithNewMessageToSendReturnsNull() throws Exception
+ {
+ AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage();
+
+ assertNull("Expected null object initially",
amqpSerializedObjectMessage.getObject());
+ }
+
+ @Test
+ public void
testGetObjectUsingReceivedMessageWithNoBodySectionReturnsNull() throws Exception
+ {
+ Message message = Proton.message();
+ AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+
+ assertNull("Expected null object",
amqpSerializedObjectMessage.getObject());
+ }
+
+ @Test
+ public void
testGetObjectUsingReceivedMessageWithDataSectionContainingNothingReturnsNull()
throws Exception
+ {
+ Message message = Proton.message();
+ message.setBody(new Data(null));
+
+ AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+
+ assertNull("Expected null object",
amqpSerializedObjectMessage.getObject());
+ }
+
+ @Test
+ public void testGetObjectUsingReceivedMessageWithNonDataSectionThrowsISE()
throws Exception
+ {
+ Message message = Proton.message();
+ message.setBody(new AmqpValue("doesntMatter"));
+
+ AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+
+ try
+ {
+ amqpSerializedObjectMessage.getObject();
+ fail("Expected exception to be thrown");
+ }
+ catch(IllegalStateException ise)
+ {
+ //expected
+ }
+ }
+
+ /**
+ * Test that setting an object on a new message results in the expected
+ * content in the body section of the underlying message.
+ */
+ @Test
+ public void testSetObjectOnNewMessage() throws Exception
+ {
+ String content = "myStringContent";
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(content);
+ oos.flush();
+ oos.close();
+ byte[] bytes = baos.toByteArray();
+
+ AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage();
+ amqpSerializedObjectMessage.setObject(content);
+
+ Message protonMessage = amqpSerializedObjectMessage.getMessage();
+
+ //retrieve the bytes from the underlying message, check they match
expectation
+ Data body = (Data) protonMessage.getBody();
+ assertTrue("Underlying message data section did not contain the
expected bytes", Arrays.equals(bytes, body.getValue().getArray()));
+ }
+
+ /**
+ * Test that setting a null object on a message results in the underlying
+ * body section being cleared, ensuring getObject returns null.
+ */
+ @Test
+ public void testSetObjectWithNullClearsExistingBodySection() throws
Exception
+ {
+ Message message = Proton.message();
+ message.setBody(new Data(new Binary(new byte[0])));
+
+ AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+
+ assertNotNull("Expected existing body section to be found",
message.getBody());
+ amqpSerializedObjectMessage.setObject(null);
+ assertNull("Expected existing body section to be cleared",
message.getBody());
+ assertNull("Expected null object",
amqpSerializedObjectMessage.getObject());
+ }
+
+ /**
+ * Test that setting an object on a new message and later getting the
value, returns an
+ * equal but different object that does not pick up intermediate changes
to the set object.
+ */
+ @Test
+ public void testSetThenGetObjectReturnsSnapshot() throws Exception
+ {
+ Map<String,String> origMap = new HashMap<String,String>();
+ origMap.put("key1", "value1");
+
+ AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage();
+ amqpSerializedObjectMessage.setObject((Serializable) origMap);
+
+ //verify we get a different-but-equal object back
+ Serializable serialized = amqpSerializedObjectMessage.getObject();
+ assertTrue("Unexpected object type returned", serialized instanceof
Map<?,?>);
+ Map<?,?> returnedObject1 = (Map<?,?>) serialized;
+ assertNotSame("Expected different objects, due to snapshot being
taken", origMap, returnedObject1);
+ assertEquals("Expected equal objects, due to snapshot being taken",
origMap, returnedObject1);
+
+ //mutate the original object
+ origMap.put("key2", "value2");
+
+ //verify we get a different-but-equal object back when compared to the
previously retrieved object
+ Serializable serialized2 = amqpSerializedObjectMessage.getObject();
+ assertTrue("Unexpected object type returned", serialized2 instanceof
Map<?,?>);
+ Map<?,?> returnedObject2 = (Map<?,?>) serialized2;
+ assertNotSame("Expected different objects, due to snapshot being
taken", origMap, returnedObject2);
+ assertEquals("Expected equal objects, due to snapshot being taken",
returnedObject1, returnedObject2);
+
+ //verify the mutated map is a different and not equal object
+ assertNotSame("Expected different objects, due to snapshot being
taken", returnedObject1, returnedObject2);
+ assertNotEquals("Expected objects to differ, due to snapshot being
taken", origMap, returnedObject2);
+ }
+}
Added:
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ObjectMessageImplTest.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ObjectMessageImplTest.java?rev=1592732&view=auto
==============================================================================
---
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ObjectMessageImplTest.java
(added)
+++
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ObjectMessageImplTest.java
Tue May 6 12:24:52 2014
@@ -0,0 +1,260 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.ObjectMessage;
+
+import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.jms.engine.AmqpConnection;
+import org.apache.qpid.jms.engine.AmqpObjectMessage;
+import org.apache.qpid.jms.engine.AmqpSerializedObjectMessage;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ObjectMessageImplTest extends QpidJmsTestCase
+{
+ private Delivery _mockDelivery;
+ private ConnectionImpl _mockConnectionImpl;
+ private SessionImpl _mockSessionImpl;
+ private AmqpConnection _mockAmqpConnection;
+
+ @Before
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _mockAmqpConnection = Mockito.mock(AmqpConnection.class);
+ _mockConnectionImpl = Mockito.mock(ConnectionImpl.class);
+ _mockSessionImpl = Mockito.mock(SessionImpl.class);
+ Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new
DestinationHelper());
+ }
+
+ /**
+ * Test that attempting to write bytes to a received message (without
calling {@link ObjectMessage#clearBody()} first)
+ * causes a {@link MessageNotWriteableException} to be thrown due to being
read-only.
+ */
+ @Test
+ public void
testReceivedObjectMessageThrowsMessageNotWriteableExceptionOnSetObject() throws
Exception
+ {
+ String content = "myStringContent";
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(content);
+ oos.flush();
+ oos.close();
+ byte[] bytes = baos.toByteArray();
+
+ Message message = Proton.message();
+ message.setBody(new Data(new Binary(bytes)));
+
+ AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+ ObjectMessageImpl objectMessageImpl = new
ObjectMessageImpl(amqpSerializedObjectMessage,
_mockSessionImpl,_mockConnectionImpl, null);
+
+ try
+ {
+ objectMessageImpl.setObject("newObject");
+ fail("Expected exception to be thrown");
+ }
+ catch(MessageNotWriteableException mnwe)
+ {
+ //expected
+ }
+ }
+
+ /**
+ * Test that calling {@link ObjectMessage#clearBody()} causes a received
+ * message to become writable
+ */
+ @Test
+ public void testClearBodyOnReceivedObjectMessageMakesMessageWritable()
throws Exception
+ {
+ String content = "myStringContent";
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(content);
+ oos.flush();
+ oos.close();
+ byte[] bytes = baos.toByteArray();
+
+ Message message = Proton.message();
+ message.setBody(new Data(new Binary(bytes)));
+
+ AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+ ObjectMessageImpl objectMessageImpl = new
ObjectMessageImpl(amqpSerializedObjectMessage,
_mockSessionImpl,_mockConnectionImpl, null);
+
+ assertFalse("Message should not be writable",
objectMessageImpl.isBodyWritable());
+
+ objectMessageImpl.clearBody();
+
+ assertTrue("Message should be writable",
objectMessageImpl.isBodyWritable());
+ }
+
+ /**
+ * Test that calling {@link ObjectMessage#clearBody()} of a received
message
+ * causes the body of the underlying {@link AmqpObjectMessage} to be
emptied.
+ */
+ @Test
+ public void
testClearBodyOnReceivedObjectMessageClearsUnderlyingMessageBody() throws
Exception
+ {
+ String content = "myStringContent";
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(content);
+ oos.flush();
+ oos.close();
+ byte[] bytes = baos.toByteArray();
+
+ Message message = Proton.message();
+ message.setBody(new Data(new Binary(bytes)));
+
+ AmqpSerializedObjectMessage amqpSerializedObjectMessage = new
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+ ObjectMessageImpl objectMessageImpl = new
ObjectMessageImpl(amqpSerializedObjectMessage,
_mockSessionImpl,_mockConnectionImpl, null);
+
+ assertNotNull("Expected body section but none was present",
message.getBody());
+
+ objectMessageImpl.clearBody();
+
+ //check that the returned object is now null
+ assertNull("Unexpected object value", objectMessageImpl.getObject());
+
+ //verify the underlying message has no body section
+ //TODO: this test assumes we can omit the body section. If we decide
otherwise it
+ //should instead check for e.g. a data section containing a serialized
null object
+ assertNull("Expected no body section", message.getBody());
+ }
+
+ /**
+ * Test that setting an object on a new message and later getting the
value, returns an
+ * equal but different object that does not pick up intermediate changes
to the set object.
+ */
+ @Test
+ public void testSetThenGetObjectReturnsSnapshot() throws Exception
+ {
+ Map<String,String> origMap = new HashMap<String,String>();
+ origMap.put("key1", "value1");
+
+ ObjectMessageImpl objectMessageImpl = new
ObjectMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+ objectMessageImpl.setObject((Serializable) origMap);
+
+ //verify we get a different-but-equal object back
+ Serializable serialized = objectMessageImpl.getObject();
+ assertTrue("Unexpected object type returned", serialized instanceof
Map<?,?>);
+ Map<?,?> returnedObject1 = (Map<?,?>) serialized;
+ assertNotSame("Expected different objects, due to snapshot being
taken", origMap, returnedObject1);
+ assertEquals("Expected equal objects, due to snapshot being taken",
origMap, returnedObject1);
+
+ //mutate the original object
+ origMap.put("key2", "value2");
+
+ //verify we get a different-but-equal object back when compared to the
previously retrieved object
+ Serializable serialized2 = objectMessageImpl.getObject();
+ assertTrue("Unexpected object type returned", serialized2 instanceof
Map<?,?>);
+ Map<?,?> returnedObject2 = (Map<?,?>) serialized2;
+ assertNotSame("Expected different objects, due to snapshot being
taken", origMap, returnedObject2);
+ assertEquals("Expected equal objects, due to snapshot being taken",
returnedObject1, returnedObject2);
+
+ //verify the mutated map is a different and not equal object
+ assertNotSame("Expected different objects, due to snapshot being
taken", returnedObject1, returnedObject2);
+ assertNotEquals("Expected objects to differ, due to snapshot being
taken", origMap, returnedObject2);
+ }
+
+ /**
+ * Test that setting an object on a new message which contains
non-serializable content results
+ * in an {@link MessageFormatException} being thrown due to failure to
encode the object.
+ */
+ @Test
+ public void testSetObjectWithNonSerializableThrowsJMSMFE() throws Exception
+ {
+ Map<String,Object> origMap = new HashMap<String,Object>();
+ origMap.put("key1", "value1");
+ origMap.put("notSerializable", new NotSerializable());
+
+ ObjectMessageImpl objectMessageImpl = new
ObjectMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+ try
+ {
+ objectMessageImpl.setObject((Serializable) origMap);
+ fail("Expected exception to be thrown");
+ }
+ catch(MessageFormatException mfe)
+ {
+ //expected
+ }
+ }
+
+ //Test class
+ private static class NotSerializable
+ {
+ public NotSerializable()
+ {
+ }
+ }
+
+ /**
+ * Test that failure during deserialization of an object in a message
results
+ * in an {@link MessageFormatException} being throw.
+ */
+ @Test
+ public void testGetObjectWithFailedDeserialisationThrowsJMSMFE() throws
Exception
+ {
+ Map<String,Object> origMap = new HashMap<String,Object>();
+ origMap.put("key1", "value1");
+ origMap.put("notSerializable", new NotSerializable());
+
+ AmqpObjectMessage amqpSerializedObjectMessage =
Mockito.mock(AmqpSerializedObjectMessage.class);
+ Mockito.when(amqpSerializedObjectMessage.getObject()).thenThrow(new
ClassNotFoundException());
+
+ ObjectMessageImpl objectMessageImpl = new
ObjectMessageImpl(amqpSerializedObjectMessage,
_mockSessionImpl,_mockConnectionImpl, null);
+
+ try
+ {
+ objectMessageImpl.getObject();
+ fail("Expected exception to be thrown");
+ }
+ catch(MessageFormatException mfe)
+ {
+ //expected
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]