Repository: qpid-jms Updated Branches: refs/heads/master 07d1637d9 -> 4db955b21
QPIDJMS-62: update handling of ObjectMessage to ensure a body section is sent when the object is null Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/4db955b2 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/4db955b2 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/4db955b2 Branch: refs/heads/master Commit: 4db955b2117328ff64e3534bc356d7b5a291d74f Parents: 07d1637 Author: Robert Gemmell <[email protected]> Authored: Mon Jun 1 12:42:07 2015 +0100 Committer: Robert Gemmell <[email protected]> Committed: Mon Jun 1 12:42:07 2015 +0100 ---------------------------------------------------------------------- .../message/AmqpSerializedObjectDelegate.java | 51 +++++++++++++------- .../amqp/message/AmqpTypedObjectDelegate.java | 9 ++-- .../ObjectMessageIntegrationTest.java | 22 +++++++-- .../message/AmqpJmsObjectMessageFacadeTest.java | 38 ++++++++++++--- 4 files changed, 91 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4db955b2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java index 647286e..1b3d3e9 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java @@ -16,6 +16,8 @@ */ package org.apache.qpid.jms.provider.amqp.message; +import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -34,7 +36,18 @@ import org.apache.qpid.proton.message.Message; */ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate { - public static final String CONTENT_TYPE = "application/x-java-serialized-object"; + static final Data NULL_OBJECT_BODY; + static + { + byte[] bytes; + try { + bytes = getSerializedBytes(null); + } catch (IOException e) { + throw new RuntimeException("Failed to initialise null object body", e); + } + + NULL_OBJECT_BODY = new Data(new Binary(bytes)); + } private final Message message; @@ -46,7 +59,18 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate { */ public AmqpSerializedObjectDelegate(Message message) { this.message = message; - this.message.setContentType(CONTENT_TYPE); + this.message.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); + } + + private static byte[] getSerializedBytes(Serializable value) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(value); + oos.flush(); + oos.close(); + + return baos.toByteArray(); + } } @Override @@ -54,7 +78,7 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate { Binary bin = null; Section body = message.getBody(); - if (body == null) { + if (body == null || body == NULL_OBJECT_BODY) { return null; } else if (body instanceof Data) { bin = ((Data) body).getValue(); @@ -80,26 +104,19 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate { @Override public void setObject(Serializable value) throws IOException { if(value == null) { - // TODO: verify whether not sending a body is ok, - // send a serialized null instead if it isn't - message.setBody(null); + message.setBody(NULL_OBJECT_BODY); } else { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos)) { - - oos.writeObject(value); - oos.flush(); - oos.close(); - - byte[] bytes = baos.toByteArray(); - message.setBody(new Data(new Binary(bytes))); - } + byte[] bytes = getSerializedBytes(value); + message.setBody(new Data(new Binary(bytes))); } } @Override public void onSend() { - this.message.setContentType(CONTENT_TYPE); + this.message.setContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); + if(message.getBody() == null) { + message.setBody(NULL_OBJECT_BODY); + } } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4db955b2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java index 860e5d3..5c9bc68 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java @@ -33,6 +33,8 @@ import org.apache.qpid.proton.message.Message; */ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate { + static final AmqpValue NULL_OBJECT_BODY = new AmqpValue(null); + private final Message message; /** @@ -75,9 +77,7 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate { @Override public void setObject(Serializable value) throws IOException { if (value == null) { - // TODO: verify whether not sending a body is OK, send some form of - // null (AmqpValue containing null) instead if it isn't? - message.setBody(null); + message.setBody(NULL_OBJECT_BODY); } else if (isSupportedAmqpValueObjectType(value)) { // TODO: This is a temporary hack, we actually need to take a snapshot of the object // at this point in time, not simply set the object itself into the Proton message. @@ -98,6 +98,9 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate { @Override public void onSend() { message.setContentType(null); + if (message.getBody() == null) { + message.setBody(NULL_OBJECT_BODY); + } } private boolean isSupportedAmqpValueObjectType(Serializable serializable) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4db955b2/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java index 8a97af7..cc3b29f 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java @@ -25,10 +25,12 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.ObjectOutputStream; import java.util.HashMap; import javax.jms.Connection; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -62,6 +64,20 @@ public class ObjectMessageIntegrationTest extends QpidJmsTestCase @Test(timeout = 5000) public void testSendBasicObjectMessageWithSerializedContent() throws Exception { + doSendBasicObjectMessageWithSerializedContentTestImpl("myObjectString", false); + } + + @Test(timeout = 5000) + public void testSendBasicObjectMessageWithSerializedContentExplicitNull() throws Exception { + doSendBasicObjectMessageWithSerializedContentTestImpl(null, true); + } + + @Test(timeout = 5000) + public void testSendBasicObjectMessageWithSerializedContentImplicitNull() throws Exception { + doSendBasicObjectMessageWithSerializedContentTestImpl(null, false); + } + + private void doSendBasicObjectMessageWithSerializedContentTestImpl(String content, boolean setObjectIfNull) throws JMSException, IOException, InterruptedException, Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); testPeer.expectBegin(true); @@ -71,8 +87,6 @@ public class ObjectMessageIntegrationTest extends QpidJmsTestCase Queue queue = session.createQueue("myQueue"); MessageProducer producer = session.createProducer(queue); - String content = "myObjectString"; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(content); @@ -94,7 +108,9 @@ public class ObjectMessageIntegrationTest extends QpidJmsTestCase testPeer.expectTransfer(messageMatcher); ObjectMessage message = session.createObjectMessage(); - message.setObject(content); + if (content != null || setObjectIfNull) { + message.setObject(content); + } producer.send(message); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4db955b2/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacadeTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacadeTest.java index 5a2aa2b..5b456c3 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacadeTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacadeTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertSame; import static org.junit.Assert.fail; import java.io.ByteArrayOutputStream; @@ -82,6 +83,31 @@ public class AmqpJmsObjectMessageFacadeTest extends AmqpJmsMessageTypesTestCase assertNull(amqpObjectMessageFacade.getObject()); } + // ---------- Test state of messages prepared to send -----------------// + + @Test + public void testNewMessageToSendHasBodySectionRepresentingNull() throws Exception { + doNewMessageToSendHasBodySectionRepresentingNull(false); + } + + @Test + public void testNewAmqpTypedMessageToSendHasBodySectionRepresentingNull() throws Exception { + doNewMessageToSendHasBodySectionRepresentingNull(true); + } + + private void doNewMessageToSendHasBodySectionRepresentingNull(boolean amqpTyped) throws Exception { + AmqpJmsObjectMessageFacade amqpObjectMessageFacade = createNewObjectMessageFacade(amqpTyped); + amqpObjectMessageFacade.onSend(false, false, 0); + + Message protonMessage = amqpObjectMessageFacade.getAmqpMessage(); + assertNotNull("Message body should be presents", protonMessage.getBody()); + if(amqpTyped) { + assertSame("Expected existing body section to be replaced", AmqpTypedObjectDelegate.NULL_OBJECT_BODY, protonMessage.getBody()); + } else { + assertSame("Expected existing body section to be replaced", AmqpSerializedObjectDelegate.NULL_OBJECT_BODY, protonMessage.getBody()); + } + } + // ---------- test for normal message operations -------------------------// /** @@ -132,8 +158,8 @@ public class AmqpJmsObjectMessageFacadeTest extends AmqpJmsMessageTypesTestCase } /** - * Test that setting a null object on a message results in the underlying - * body section being cleared, ensuring getObject returns null. + * Test that setting a null object on a message results in the underlying body + * section being set with the null object body, ensuring getObject returns null. */ @Test public void testSetObjectWithNullClearsExistingBodySection() throws Exception { @@ -145,13 +171,13 @@ public class AmqpJmsObjectMessageFacadeTest extends AmqpJmsMessageTypesTestCase assertNotNull("Expected existing body section to be found", protonMessage.getBody()); amqpObjectMessageFacade.setObject(null); - assertNull("Expected existing body section to be cleared", protonMessage.getBody()); + assertSame("Expected existing body section to be replaced", AmqpSerializedObjectDelegate.NULL_OBJECT_BODY, protonMessage.getBody()); assertNull("Expected null object", amqpObjectMessageFacade.getObject()); } /** - * Test that setting a null object on a message results in the underlying - * body section being cleared, ensuring getObject returns null. + * Test that clearing the body on a message results in the underlying body + * section being set with the null object body, ensuring getObject returns null. */ @Test public void testClearBodyWithExistingSerializedBodySection() throws Exception { @@ -163,7 +189,7 @@ public class AmqpJmsObjectMessageFacadeTest extends AmqpJmsMessageTypesTestCase assertNotNull("Expected existing body section to be found", protonMessage.getBody()); amqpObjectMessageFacade.clearBody(); - assertNull("Expected existing body section to be cleared", protonMessage.getBody()); + assertSame("Expected existing body section to be replaced", AmqpSerializedObjectDelegate.NULL_OBJECT_BODY, protonMessage.getBody()); assertNull("Expected null object", amqpObjectMessageFacade.getObject()); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
