Author: robbie
Date: Tue May  6 12:24:52 2014
New Revision: 1592732

URL: http://svn.apache.org/r1592732
Log:
QPIDJMS-18: initial work on implementing ObjectMessage

Added:
    
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessage.java
    
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsMessageFormatException.java
      - copied, changed from r1592044, 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
    
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ObjectMessageIntegrationTest.java
    
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessageTest.java
    
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ObjectMessageImplTest.java
Modified:
    
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java
    
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
    
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java

Modified: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java?rev=1592732&r1=1592731&r2=1592732&view=diff
==============================================================================
--- 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java 
(original)
+++ 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java 
Tue May  6 12:24:52 2014
@@ -40,9 +40,9 @@ public class AmqpMessageFactory
             {
                 return new AmqpTextMessage(delivery, message, amqpConnection);
             }
-            else if(isContentType(AmqpObjectMessage.CONTENT_TYPE, message))
+            else if(isContentType(AmqpSerializedObjectMessage.CONTENT_TYPE, 
message))
             {
-                return new AmqpObjectMessage(delivery, message, 
amqpConnection);
+                return new AmqpSerializedObjectMessage(delivery, message, 
amqpConnection);
             }
             else if(isContentType(AmqpBytesMessage.CONTENT_TYPE, message) || 
isContentType(null, message))
             {
@@ -59,9 +59,9 @@ public class AmqpMessageFactory
             {
                 return new AmqpBytesMessage(delivery, message, amqpConnection);
             }
-            else if(isContentType(AmqpObjectMessage.CONTENT_TYPE, message))
+            else if(isContentType(AmqpSerializedObjectMessage.CONTENT_TYPE, 
message))
             {
-                return new AmqpObjectMessage(delivery, message, 
amqpConnection);
+                return new AmqpSerializedObjectMessage(delivery, message, 
amqpConnection);
             }
         }
         else if(body instanceof AmqpValue)

Modified: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java?rev=1592732&r1=1592731&r2=1592732&view=diff
==============================================================================
--- 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java 
(original)
+++ 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java 
Tue May  6 12:24:52 2014
@@ -18,13 +18,14 @@
  */
 package org.apache.qpid.jms.engine;
 
+import java.io.IOException;
+import java.io.Serializable;
+
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.message.Message;
 
-public class AmqpObjectMessage extends AmqpMessage
+public abstract class AmqpObjectMessage extends AmqpMessage
 {
-    public static final String CONTENT_TYPE = 
"application/x-java-serialized-object";
-
     public AmqpObjectMessage()
     {
         super();
@@ -35,5 +36,7 @@ public class AmqpObjectMessage extends A
         super(message, delivery, amqpConnection);
     }
 
-    //TODO: methods to access/set content
+    public abstract void setObject(Serializable serializable) throws 
IOException;
+
+    public abstract Serializable getObject() throws IOException, 
ClassNotFoundException;
 }

Added: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessage.java?rev=1592732&view=auto
==============================================================================
--- 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessage.java
 (added)
+++ 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessage.java
 Tue May  6 12:24:52 2014
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.engine;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+
+public class AmqpSerializedObjectMessage extends AmqpObjectMessage
+{
+    public static final String CONTENT_TYPE = 
"application/x-java-serialized-object";
+
+    public AmqpSerializedObjectMessage()
+    {
+        super();
+        setContentType(CONTENT_TYPE);
+    }
+
+    public AmqpSerializedObjectMessage(Delivery delivery, Message message, 
AmqpConnection amqpConnection)
+    {
+        super(delivery, message, amqpConnection);
+    }
+
+    /**
+     * Sets the serialized object as a data section in the underlying message, 
or
+     * clears the body section if null.
+     */
+    @Override
+    public void setObject(Serializable serializable) throws IOException
+    {
+        if(serializable == null)
+        {
+            //TODO: verify whether not sending a body is ok,
+            //send a serialized null instead if it isn't
+            getMessage().setBody(null);
+        }
+        else
+        {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(serializable);
+            oos.flush();
+            oos.close();
+
+            byte[] bytes = baos.toByteArray();
+
+            getMessage().setBody(new Data(new Binary(bytes)));
+        }
+
+        //TODO: ensure content type is [still] set?
+    }
+
+    /**
+     * Returns the deserialized object, or null if no object data is present.
+     */
+    @Override
+    public Serializable getObject() throws IllegalStateException, IOException, 
ClassNotFoundException
+    {
+        Binary bin = null;
+
+        Section body = getMessage().getBody();
+        if(body == null)
+        {
+            return null;
+        }
+        else if(body instanceof Data)
+        {
+            bin = ((Data) body).getValue();
+        }
+        else
+        {
+            throw new IllegalStateException("Unexpected body type: " + 
body.getClass().getSimpleName());
+        }
+
+        if(bin == null)
+        {
+            return null;
+        }
+        else
+        {
+            ByteArrayInputStream bais = new 
ByteArrayInputStream(bin.getArray(), bin.getArrayOffset(), bin.getLength());
+            ObjectInputStream ois = new ObjectInputStream(bais);
+
+            return (Serializable) ois.readObject();
+        }
+    }
+}

Modified: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java?rev=1592732&r1=1592731&r2=1592732&view=diff
==============================================================================
--- 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java 
(original)
+++ 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java 
Tue May  6 12:24:52 2014
@@ -25,13 +25,15 @@ import javax.jms.JMSException;
 import javax.jms.ObjectMessage;
 
 import org.apache.qpid.jms.engine.AmqpObjectMessage;
+import org.apache.qpid.jms.engine.AmqpSerializedObjectMessage;
 
+//TODO: support requesting to send an AMQP map/list/value instead of 
serialized binary data
 public class ObjectMessageImpl extends MessageImpl<AmqpObjectMessage> 
implements ObjectMessage
 {
     //message to be sent
     public ObjectMessageImpl(SessionImpl sessionImpl, ConnectionImpl 
connectionImpl) throws JMSException
     {
-        super(new AmqpObjectMessage(), sessionImpl, connectionImpl);
+        super(new AmqpSerializedObjectMessage(), sessionImpl, connectionImpl);
     }
 
     //message just received
@@ -43,30 +45,53 @@ public class ObjectMessageImpl extends M
     @Override
     protected AmqpObjectMessage 
prepareUnderlyingAmqpMessageForSending(AmqpObjectMessage amqpMessage)
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        //Currently nothing to do, we [de]serialize the bytes direct to/from 
the underlying message.
+        return amqpMessage;
+
+        //TODO: verify we haven't been requested to send an AMQP map/list 
instead of serialized binary data,
+        //we might need to convert if we have been asked to (or not to).
     }
 
     //======= JMS Methods =======
 
     @Override
-    public void setObject(Serializable object) throws JMSException
+    public void setObject(Serializable serializable) throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        checkBodyWritable();
+        try
+        {
+            getUnderlyingAmqpMessage(false).setObject(serializable);
+        }
+        catch (Exception e)
+        {
+            throw new QpidJmsMessageFormatException("Exception while setting 
Object", e);
+        }
     }
 
     @Override
     public Serializable getObject() throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        try
+        {
+            return getUnderlyingAmqpMessage(false).getObject();
+        }
+        catch (Exception e)
+        {
+            throw new QpidJmsMessageFormatException("Exception while getting 
Object", e);
+        }
     }
 
     @Override
     public void clearBody() throws JMSException
     {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not Implemented");
+        try
+        {
+            getUnderlyingAmqpMessage(false).setObject(null);
+            setBodyWritable(true);
+        }
+        catch (Exception e)
+        {
+            throw new QpidJmsMessageFormatException("Exception while clearing 
Object body", e);
+        }
     }
 }

Copied: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsMessageFormatException.java
 (from r1592044, 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java)
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsMessageFormatException.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsMessageFormatException.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java&r1=1592044&r2=1592732&rev=1592732&view=diff
==============================================================================
--- 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java 
(original)
+++ 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/QpidJmsMessageFormatException.java
 Tue May  6 12:24:52 2014
@@ -1,4 +1,5 @@
 /*
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,25 +16,28 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
+ *
  */
-package org.apache.qpid.jms.engine;
+package org.apache.qpid.jms.impl;
 
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.message.Message;
+import javax.jms.MessageFormatException;
 
-public class AmqpObjectMessage extends AmqpMessage
+public class QpidJmsMessageFormatException extends MessageFormatException
 {
-    public static final String CONTENT_TYPE = 
"application/x-java-serialized-object";
+    private static final long serialVersionUID = -6968274239066827833L;
 
-    public AmqpObjectMessage()
+    public QpidJmsMessageFormatException(String reason)
     {
-        super();
+        this(reason, null);
     }
 
-    public AmqpObjectMessage(Delivery delivery, Message message, 
AmqpConnection amqpConnection)
+    public QpidJmsMessageFormatException(String reason, Exception cause)
     {
-        super(message, delivery, amqpConnection);
+        super(reason);
+        if (cause != null)
+        {
+            setLinkedException(cause);
+            initCause(cause);
+        }
     }
-
-    //TODO: methods to access/set content
 }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1592732&r1=1592731&r2=1592732&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java 
(original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Tue 
May  6 12:24:52 2014
@@ -242,8 +242,7 @@ public class SessionImpl implements Sess
     @Override
     public ObjectMessage createObjectMessage() throws JMSException
     {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not Implemented");
+        return new ObjectMessageImpl(this, getConnectionImpl());
     }
 
     @Override

Added: 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ObjectMessageIntegrationTest.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ObjectMessageIntegrationTest.java?rev=1592732&view=auto
==============================================================================
--- 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ObjectMessageIntegrationTest.java
 (added)
+++ 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/ObjectMessageIntegrationTest.java
 Tue May  6 12:24:52 2014
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.engine.AmqpSerializedObjectMessage;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import 
org.apache.qpid.jms.test.testpeer.describedtypes.sections.DataDescribedType;
+import 
org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedDataMatcher;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+public class ObjectMessageIntegrationTest extends QpidJmsTestCase
+{
+    private final IntegrationTestFixture _testFixture = new 
IntegrationTestFixture();
+
+    @Test
+    public void testSendBasicObjectMessageWithSerializedContent() throws 
Exception
+    {
+        try(TestAmqpPeer testPeer = new 
TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+
+            String content = "myObjectString";
+
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(content);
+            oos.flush();
+            oos.close();
+            byte[] bytes = baos.toByteArray();
+
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            MessagePropertiesSectionMatcher propertiesMatcher = new 
MessagePropertiesSectionMatcher(true);
+            
propertiesMatcher.withContentType(equalTo(Symbol.valueOf(AmqpSerializedObjectMessage.CONTENT_TYPE)));
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(propertiesMatcher);
+            messageMatcher.setMessageContentMatcher(new EncodedDataMatcher(new 
Binary(bytes)));
+
+            testPeer.expectTransfer(messageMatcher);
+
+            ObjectMessage message = session.createObjectMessage();
+            message.setObject(content);
+
+            producer.send(message);
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test
+    public void testReceiveBasicObjectMessageWithSerializedContent() throws 
Exception
+    {
+        try(TestAmqpPeer testPeer = new 
TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            PropertiesDescribedType properties = new PropertiesDescribedType();
+            
properties.setContentType(Symbol.valueOf(AmqpSerializedObjectMessage.CONTENT_TYPE));
+
+            String expectedContent = "expectedContent";
+
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(expectedContent);
+            oos.flush();
+            oos.close();
+            byte[] bytes = baos.toByteArray();
+
+            DescribedType dataContent = new DataDescribedType(new 
Binary(bytes));
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, properties, 
null, dataContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(1000);
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            assertNotNull(receivedMessage);
+            assertTrue(receivedMessage instanceof ObjectMessage);
+            ObjectMessage objectMessage = (ObjectMessage)receivedMessage;
+
+            Object object = objectMessage.getObject();
+            assertNotNull("Expected object but got null", object);
+            assertEquals("Message body object was not as expected", 
expectedContent, object);
+        }
+    }
+
+    @Test
+    public void 
testRecieveAndThenResendBasicObjectMessageWithSerializedContent() throws 
Exception
+    {
+        try(TestAmqpPeer testPeer = new 
TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            PropertiesDescribedType properties = new PropertiesDescribedType();
+            
properties.setContentType(Symbol.valueOf(AmqpSerializedObjectMessage.CONTENT_TYPE));
+
+            String expectedContent = "expectedContent";
+
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(expectedContent);
+            oos.flush();
+            oos.close();
+            byte[] bytes = baos.toByteArray();
+
+            DescribedType dataContent = new DataDescribedType(new 
Binary(bytes));
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, properties, 
null, dataContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(1000);
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            assertNotNull(receivedMessage);
+            assertTrue(receivedMessage instanceof ObjectMessage);
+
+            testPeer.expectSenderAttach();
+            MessageProducer producer = session.createProducer(queue);
+
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            MessagePropertiesSectionMatcher propertiesMatcher = new 
MessagePropertiesSectionMatcher(true);
+            
propertiesMatcher.withContentType(equalTo(Symbol.valueOf(AmqpSerializedObjectMessage.CONTENT_TYPE)));
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(propertiesMatcher);
+            messageMatcher.setMessageContentMatcher(new EncodedDataMatcher(new 
Binary(bytes)));
+
+            testPeer.expectTransfer(messageMatcher);
+
+            producer.send(receivedMessage);
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+}

Modified: 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java?rev=1592732&r1=1592731&r2=1592732&view=diff
==============================================================================
--- 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java
 (original)
+++ 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java
 Tue May  6 12:24:52 2014
@@ -99,10 +99,10 @@ public class AmqpMessageFactoryTest exte
    {
        //TODO: this test only required if we decide that not sending a content 
body is legal
        Message message = Proton.message();
-       message.setContentType(AmqpObjectMessage.CONTENT_TYPE);
+       message.setContentType(AmqpSerializedObjectMessage.CONTENT_TYPE);
 
        AmqpMessage amqpMessage = 
_amqpMessageFactory.createAmqpMessage(_mockDelivery, message, 
_mockAmqpConnection);
-       assertEquals(AmqpObjectMessage.class, amqpMessage.getClass());
+       assertEquals(AmqpSerializedObjectMessage.class, amqpMessage.getClass());
    }
 
    /**
@@ -180,10 +180,10 @@ public class AmqpMessageFactoryTest exte
        Message message = Proton.message();
        Binary binary = new Binary(new byte[0]);
        message.setBody(new Data(binary));
-       message.setContentType(AmqpObjectMessage.CONTENT_TYPE);
+       message.setContentType(AmqpSerializedObjectMessage.CONTENT_TYPE);
 
        AmqpMessage amqpMessage = 
_amqpMessageFactory.createAmqpMessage(_mockDelivery, message, 
_mockAmqpConnection);
-       assertEquals(AmqpObjectMessage.class, amqpMessage.getClass());
+       assertEquals(AmqpSerializedObjectMessage.class, amqpMessage.getClass());
    }
 
    /**

Added: 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessageTest.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessageTest.java?rev=1592732&view=auto
==============================================================================
--- 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessageTest.java
 (added)
+++ 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpSerializedObjectMessageTest.java
 Tue May  6 12:24:52 2014
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.engine;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class AmqpSerializedObjectMessageTest extends QpidJmsTestCase
+{
+    private AmqpConnection _mockAmqpConnection;
+    private Delivery _mockDelivery;
+
+    @Before
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _mockAmqpConnection = Mockito.mock(AmqpConnection.class);
+        _mockDelivery = Mockito.mock(Delivery.class);
+    }
+
+    @Test
+    public void testGetObjectWithNewMessageToSendReturnsNull() throws Exception
+    {
+        AmqpSerializedObjectMessage amqpSerializedObjectMessage = new 
AmqpSerializedObjectMessage();
+
+        assertNull("Expected null object initially", 
amqpSerializedObjectMessage.getObject());
+    }
+
+    @Test
+    public void 
testGetObjectUsingReceivedMessageWithNoBodySectionReturnsNull() throws Exception
+    {
+        Message message = Proton.message();
+        AmqpSerializedObjectMessage amqpSerializedObjectMessage = new 
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+
+        assertNull("Expected null object", 
amqpSerializedObjectMessage.getObject());
+    }
+
+    @Test
+    public void 
testGetObjectUsingReceivedMessageWithDataSectionContainingNothingReturnsNull() 
throws Exception
+    {
+        Message message = Proton.message();
+        message.setBody(new Data(null));
+
+        AmqpSerializedObjectMessage amqpSerializedObjectMessage = new 
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+
+        assertNull("Expected null object", 
amqpSerializedObjectMessage.getObject());
+    }
+
+    @Test
+    public void testGetObjectUsingReceivedMessageWithNonDataSectionThrowsISE() 
throws Exception
+    {
+        Message message = Proton.message();
+        message.setBody(new AmqpValue("doesntMatter"));
+
+        AmqpSerializedObjectMessage amqpSerializedObjectMessage = new 
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+
+        try
+        {
+            amqpSerializedObjectMessage.getObject();
+            fail("Expected exception to be thrown");
+        }
+        catch(IllegalStateException ise)
+        {
+            //expected
+        }
+    }
+
+    /**
+     * Test that setting an object on a new message results in the expected
+     * content in the body section of the underlying message.
+     */
+    @Test
+    public void testSetObjectOnNewMessage() throws Exception
+    {
+        String content = "myStringContent";
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(content);
+        oos.flush();
+        oos.close();
+        byte[] bytes = baos.toByteArray();
+
+        AmqpSerializedObjectMessage amqpSerializedObjectMessage = new 
AmqpSerializedObjectMessage();
+        amqpSerializedObjectMessage.setObject(content);
+
+        Message protonMessage = amqpSerializedObjectMessage.getMessage();
+
+        //retrieve the bytes from the underlying message, check they match 
expectation
+        Data body = (Data) protonMessage.getBody();
+        assertTrue("Underlying message data section did not contain the 
expected bytes", Arrays.equals(bytes, body.getValue().getArray()));
+    }
+
+    /**
+     * Test that setting a null object on a message results in the underlying
+     * body section being cleared, ensuring getObject returns null.
+     */
+    @Test
+    public void testSetObjectWithNullClearsExistingBodySection() throws 
Exception
+    {
+        Message message = Proton.message();
+        message.setBody(new Data(new Binary(new byte[0])));
+
+        AmqpSerializedObjectMessage amqpSerializedObjectMessage = new 
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+
+        assertNotNull("Expected existing body section to be found", 
message.getBody());
+        amqpSerializedObjectMessage.setObject(null);
+        assertNull("Expected existing body section to be cleared", 
message.getBody());
+        assertNull("Expected null object", 
amqpSerializedObjectMessage.getObject());
+    }
+
+    /**
+     * Test that setting an object on a new message and later getting the 
value, returns an
+     * equal but different object that does not pick up intermediate changes 
to the set object.
+     */
+    @Test
+    public void testSetThenGetObjectReturnsSnapshot() throws Exception
+    {
+        Map<String,String> origMap = new HashMap<String,String>();
+        origMap.put("key1", "value1");
+
+        AmqpSerializedObjectMessage amqpSerializedObjectMessage = new 
AmqpSerializedObjectMessage();
+        amqpSerializedObjectMessage.setObject((Serializable) origMap);
+
+        //verify we get a different-but-equal object back
+        Serializable serialized = amqpSerializedObjectMessage.getObject();
+        assertTrue("Unexpected object type returned", serialized instanceof 
Map<?,?>);
+        Map<?,?> returnedObject1 = (Map<?,?>) serialized;
+        assertNotSame("Expected different objects, due to snapshot being 
taken", origMap, returnedObject1);
+        assertEquals("Expected equal objects, due to snapshot being taken", 
origMap, returnedObject1);
+
+        //mutate the original object
+        origMap.put("key2", "value2");
+
+        //verify we get a different-but-equal object back when compared to the 
previously retrieved object
+        Serializable serialized2 = amqpSerializedObjectMessage.getObject();
+        assertTrue("Unexpected object type returned", serialized2 instanceof 
Map<?,?>);
+        Map<?,?> returnedObject2 = (Map<?,?>) serialized2;
+        assertNotSame("Expected different objects, due to snapshot being 
taken", origMap, returnedObject2);
+        assertEquals("Expected equal objects, due to snapshot being taken", 
returnedObject1, returnedObject2);
+
+        //verify the mutated map is a different and not equal object
+        assertNotSame("Expected different objects, due to snapshot being 
taken", returnedObject1, returnedObject2);
+        assertNotEquals("Expected objects to differ, due to snapshot being 
taken", origMap, returnedObject2);
+    }
+}

Added: 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ObjectMessageImplTest.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ObjectMessageImplTest.java?rev=1592732&view=auto
==============================================================================
--- 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ObjectMessageImplTest.java
 (added)
+++ 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ObjectMessageImplTest.java
 Tue May  6 12:24:52 2014
@@ -0,0 +1,260 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.ObjectMessage;
+
+import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.jms.engine.AmqpConnection;
+import org.apache.qpid.jms.engine.AmqpObjectMessage;
+import org.apache.qpid.jms.engine.AmqpSerializedObjectMessage;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ObjectMessageImplTest extends QpidJmsTestCase
+{
+    private Delivery _mockDelivery;
+    private ConnectionImpl _mockConnectionImpl;
+    private SessionImpl _mockSessionImpl;
+    private AmqpConnection _mockAmqpConnection;
+
+    @Before
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _mockAmqpConnection = Mockito.mock(AmqpConnection.class);
+        _mockConnectionImpl = Mockito.mock(ConnectionImpl.class);
+        _mockSessionImpl = Mockito.mock(SessionImpl.class);
+        Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new 
DestinationHelper());
+    }
+
+    /**
+     * Test that attempting to write bytes to a received message (without 
calling {@link ObjectMessage#clearBody()} first)
+     * causes a {@link MessageNotWriteableException} to be thrown due to being 
read-only.
+     */
+    @Test
+    public void 
testReceivedObjectMessageThrowsMessageNotWriteableExceptionOnSetObject() throws 
Exception
+    {
+        String content = "myStringContent";
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(content);
+        oos.flush();
+        oos.close();
+        byte[] bytes = baos.toByteArray();
+
+        Message message = Proton.message();
+        message.setBody(new Data(new Binary(bytes)));
+
+        AmqpSerializedObjectMessage amqpSerializedObjectMessage = new 
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+        ObjectMessageImpl objectMessageImpl = new 
ObjectMessageImpl(amqpSerializedObjectMessage, 
_mockSessionImpl,_mockConnectionImpl, null);
+
+        try
+        {
+            objectMessageImpl.setObject("newObject");
+            fail("Expected exception to be thrown");
+        }
+        catch(MessageNotWriteableException mnwe)
+        {
+            //expected
+        }
+    }
+
+    /**
+     * Test that calling {@link ObjectMessage#clearBody()} causes a received
+     * message to become writable
+     */
+    @Test
+    public void testClearBodyOnReceivedObjectMessageMakesMessageWritable() 
throws Exception
+    {
+        String content = "myStringContent";
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(content);
+        oos.flush();
+        oos.close();
+        byte[] bytes = baos.toByteArray();
+
+        Message message = Proton.message();
+        message.setBody(new Data(new Binary(bytes)));
+
+        AmqpSerializedObjectMessage amqpSerializedObjectMessage = new 
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+        ObjectMessageImpl objectMessageImpl = new 
ObjectMessageImpl(amqpSerializedObjectMessage, 
_mockSessionImpl,_mockConnectionImpl, null);
+
+        assertFalse("Message should not be writable", 
objectMessageImpl.isBodyWritable());
+
+        objectMessageImpl.clearBody();
+
+        assertTrue("Message should be writable", 
objectMessageImpl.isBodyWritable());
+    }
+
+    /**
+     * Test that calling {@link ObjectMessage#clearBody()} of a received 
message
+     * causes the body of the underlying {@link AmqpObjectMessage} to be 
emptied.
+     */
+    @Test
+    public void 
testClearBodyOnReceivedObjectMessageClearsUnderlyingMessageBody() throws 
Exception
+    {
+        String content = "myStringContent";
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(content);
+        oos.flush();
+        oos.close();
+        byte[] bytes = baos.toByteArray();
+
+        Message message = Proton.message();
+        message.setBody(new Data(new Binary(bytes)));
+
+        AmqpSerializedObjectMessage amqpSerializedObjectMessage = new 
AmqpSerializedObjectMessage(_mockDelivery, message, _mockAmqpConnection);
+        ObjectMessageImpl objectMessageImpl = new 
ObjectMessageImpl(amqpSerializedObjectMessage, 
_mockSessionImpl,_mockConnectionImpl, null);
+
+        assertNotNull("Expected body section but none was present", 
message.getBody());
+
+        objectMessageImpl.clearBody();
+
+        //check that the returned object is now null
+        assertNull("Unexpected object value", objectMessageImpl.getObject());
+
+        //verify the underlying message has no body section
+        //TODO: this test assumes we can omit the body section. If we decide 
otherwise it
+        //should instead check for e.g. a data section containing a serialized 
null object
+        assertNull("Expected no body section", message.getBody());
+    }
+
+    /**
+     * Test that setting an object on a new message and later getting the 
value, returns an
+     * equal but different object that does not pick up intermediate changes 
to the set object.
+     */
+    @Test
+    public void testSetThenGetObjectReturnsSnapshot() throws Exception
+    {
+        Map<String,String> origMap = new HashMap<String,String>();
+        origMap.put("key1", "value1");
+
+        ObjectMessageImpl objectMessageImpl = new 
ObjectMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+        objectMessageImpl.setObject((Serializable) origMap);
+
+        //verify we get a different-but-equal object back
+        Serializable serialized = objectMessageImpl.getObject();
+        assertTrue("Unexpected object type returned", serialized instanceof 
Map<?,?>);
+        Map<?,?> returnedObject1 = (Map<?,?>) serialized;
+        assertNotSame("Expected different objects, due to snapshot being 
taken", origMap, returnedObject1);
+        assertEquals("Expected equal objects, due to snapshot being taken", 
origMap, returnedObject1);
+
+        //mutate the original object
+        origMap.put("key2", "value2");
+
+        //verify we get a different-but-equal object back when compared to the 
previously retrieved object
+        Serializable serialized2 = objectMessageImpl.getObject();
+        assertTrue("Unexpected object type returned", serialized2 instanceof 
Map<?,?>);
+        Map<?,?> returnedObject2 = (Map<?,?>) serialized2;
+        assertNotSame("Expected different objects, due to snapshot being 
taken", origMap, returnedObject2);
+        assertEquals("Expected equal objects, due to snapshot being taken", 
returnedObject1, returnedObject2);
+
+        //verify the mutated map is a different and not equal object
+        assertNotSame("Expected different objects, due to snapshot being 
taken", returnedObject1, returnedObject2);
+        assertNotEquals("Expected objects to differ, due to snapshot being 
taken", origMap, returnedObject2);
+    }
+
+    /**
+     * Test that setting an object on a new message which contains 
non-serializable content results
+     * in an {@link MessageFormatException} being thrown due to failure to 
encode the object.
+     */
+    @Test
+    public void testSetObjectWithNonSerializableThrowsJMSMFE() throws Exception
+    {
+        Map<String,Object> origMap = new HashMap<String,Object>();
+        origMap.put("key1", "value1");
+        origMap.put("notSerializable", new NotSerializable());
+
+        ObjectMessageImpl objectMessageImpl = new 
ObjectMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        try
+        {
+            objectMessageImpl.setObject((Serializable) origMap);
+            fail("Expected exception to be thrown");
+        }
+        catch(MessageFormatException mfe)
+        {
+            //expected
+        }
+    }
+
+    //Test class
+    private static class NotSerializable
+    {
+        public NotSerializable()
+        {
+        }
+    }
+
+    /**
+     * Test that failure during deserialization of an object in a message 
results
+     * in an {@link MessageFormatException} being throw.
+     */
+    @Test
+    public void testGetObjectWithFailedDeserialisationThrowsJMSMFE() throws 
Exception
+    {
+        Map<String,Object> origMap = new HashMap<String,Object>();
+        origMap.put("key1", "value1");
+        origMap.put("notSerializable", new NotSerializable());
+
+        AmqpObjectMessage amqpSerializedObjectMessage = 
Mockito.mock(AmqpSerializedObjectMessage.class);
+        Mockito.when(amqpSerializedObjectMessage.getObject()).thenThrow(new 
ClassNotFoundException());
+
+        ObjectMessageImpl objectMessageImpl = new 
ObjectMessageImpl(amqpSerializedObjectMessage, 
_mockSessionImpl,_mockConnectionImpl, null);
+
+        try
+        {
+            objectMessageImpl.getObject();
+            fail("Expected exception to be thrown");
+        }
+        catch(MessageFormatException mfe)
+        {
+            //expected
+        }
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to