Author: robbie
Date: Wed Jul 23 16:00:35 2014
New Revision: 1612856

URL: http://svn.apache.org/r1612856
Log:
QPIDJMS-22: initial work on implementing StreamMessage

Added:
    
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpListMessageTest.java
    
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/StreamMessageImplTest.java
Modified:
    
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java
    
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java

Modified: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java?rev=1612856&r1=1612855&r2=1612856&view=diff
==============================================================================
--- 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java 
(original)
+++ 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java 
Wed Jul 23 16:00:35 2014
@@ -24,12 +24,14 @@ import org.apache.qpid.proton.message.Me
 
 public class AmqpGenericMessage extends AmqpMessage
 {
+    //message to be sent
     public AmqpGenericMessage()
     {
         super();
         setMessageAnnotation(ClientProperties.X_OPT_JMS_MSG_TYPE, 
ClientProperties.GENERIC_MESSAGE_TYPE);
     }
 
+    //message just received
     public AmqpGenericMessage(Delivery delivery, Message message, 
AmqpConnection amqpConnection)
     {
         super(message, delivery, amqpConnection);

Modified: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java?rev=1612856&r1=1612855&r2=1612856&view=diff
==============================================================================
--- 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java 
(original)
+++ 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java 
Wed Jul 23 16:00:35 2014
@@ -18,20 +18,130 @@
  */
 package org.apache.qpid.jms.engine;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.qpid.jms.impl.ClientProperties;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Section;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.message.Message;
 
 public class AmqpListMessage extends AmqpMessage
 {
+    private List<Object> _list;
+    private int _position = 0;
+
+    //message to be sent
     public AmqpListMessage()
     {
         super();
+        _list = createListBody(getMessage());
+        setMessageAnnotation(ClientProperties.X_OPT_JMS_MSG_TYPE, 
ClientProperties.STREAM_MESSAGE_TYPE);
     }
 
+    //message just received
     public AmqpListMessage(Message message, Delivery delivery, AmqpConnection 
amqpConnection)
     {
         super(message, delivery, amqpConnection);
+        _list = processBodyList(message);
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<Object> processBodyList(Message message)
+    {
+        Section body = getMessage().getBody();
+
+        if(body == null)
+        {
+            return createListBody(message);
+        }
+        else if(body instanceof AmqpValue)
+        {
+            Object value = ((AmqpValue) body).getValue();
+
+            if(value == null)
+            {
+                return createListBody(message);
+            }
+            else if(value instanceof List<?>)
+            {
+                return (List<Object>) value;
+            }
+            else
+            {
+                throw new IllegalStateException("Unexpected amqp-value body 
content type: " + value.getClass().getSimpleName());
+            }
+        }
+        else
+        {
+            throw new IllegalStateException("Unexpected message body type: " + 
body.getClass().getSimpleName());
+        }
+    }
+
+    private List<Object> createListBody(Message message)
+    {
+        List<Object> list = new ArrayList<Object>();
+        message.setBody(new AmqpValue(list));
+
+        return list;
+    }
+
+    /**
+     * Adds the provided Object to the list.
+     *
+     * If the value provided is a byte[] its entry in the list is as an AMQP 
Binary, and the
+     * underlying array is NOT copied and MUST NOT be subsequently altered.
+     */
+    public void add(final Object o)
+    {
+        Object val = o;
+        if(val instanceof byte[])
+        {
+            val = new Binary((byte[]) o);
+        }
+
+        _list.add(val);
+    }
+
+    /**
+     *
+     * Returns the next value in the list, or throws {@link 
IndexOutOfBoundsException} if there are no unread entries remaining.
+     *
+     * If the value being returned is a byte[] representing AMQP binary, the 
array returned IS a copy.
+     *
+     * @throws IndexOutOfBoundsException
+     */
+    public Object get() throws IndexOutOfBoundsException
+    {
+        Object object = _list.get(_position++);
+        if(object instanceof Binary)
+        {
+            //We will return a byte[]. It is possibly only part of the 
underlying array, copy that bit.
+            Binary bin = ((Binary) object);
+
+            return Arrays.copyOfRange(bin.getArray(), bin.getArrayOffset(), 
bin.getLength());
+        }
+
+        return object;
+    }
+
+    public void clear()
+    {
+        _list.clear();
+        resetPosition();
+    }
+
+    public void resetPosition()
+    {
+        _position = 0;
+    }
+
+    public void decrementPosition()
+    {
+        _position--;
     }
 
-    //TODO: methods to access/set content
 }

Modified: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java?rev=1612856&r1=1612855&r2=1612856&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java 
(original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java 
Wed Jul 23 16:00:35 2014
@@ -34,6 +34,7 @@ public class AmqpMapMessage extends Amqp
 {
     private volatile Map<String,Object> _messageBodyMap;
 
+    //message to be sent
     public AmqpMapMessage()
     {
         super();
@@ -41,6 +42,7 @@ public class AmqpMapMessage extends Amqp
         setMessageAnnotation(ClientProperties.X_OPT_JMS_MSG_TYPE, 
ClientProperties.MAP_MESSAGE_TYPE);
     }
 
+    //message just received
     @SuppressWarnings("unchecked")
     public AmqpMapMessage(Message message, Delivery delivery, AmqpConnection 
amqpConnection)
     {
@@ -92,7 +94,8 @@ public class AmqpMapMessage extends Amqp
      *
      * If a previous mapping for the key exists, the old value is replaced by 
the specified value.
      *
-     * To be clear, if the value provided is a byte[] then it is NOT copied 
and MUST NOT be subsequently altered.
+     * If the value provided is a byte[] its entry in the map is as an AMQP 
Binary, and the
+     * underlying array is NOT copied and MUST NOT be subsequently altered.
      *
      * @param key the key for the mapping
      * @param value the value for the mapping
@@ -111,7 +114,7 @@ public class AmqpMapMessage extends Amqp
     /**
      * Returns the value to which the specified key is mapped, or null if this 
map contains no mapping for the key.
      *
-     * If the value being returned is a byte[], the array returned IS a copy.
+     * If the value being returned is a byte[] representing AMQP binary, the 
array returned IS a copy.
      *
      * @param key the key for the mapping
      * @return the value if one exists for this key, or null if there was none.

Modified: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java?rev=1612856&r1=1612855&r2=1612856&view=diff
==============================================================================
--- 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java 
(original)
+++ 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java 
Wed Jul 23 16:00:35 2014
@@ -32,6 +32,7 @@ public class AmqpObjectMessage extends A
     private AmqpObjectMessageDelegate _delegate;
     private boolean _useAmqpTypeEncoding = false;
 
+    //message to be sent
     public AmqpObjectMessage()
     {
         super();
@@ -40,6 +41,7 @@ public class AmqpObjectMessage extends A
         initDelegate(false);
     }
 
+    //message just received
     public AmqpObjectMessage(Message message, Delivery delivery, 
AmqpConnection amqpConnection, boolean useAmqpTypes)
     {
         super(message, delivery, amqpConnection);

Modified: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java?rev=1612856&r1=1612855&r2=1612856&view=diff
==============================================================================
--- 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java 
(original)
+++ 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java 
Wed Jul 23 16:00:35 2014
@@ -46,12 +46,14 @@ public class AmqpTextMessage extends Amq
 
     private CharsetDecoder _decoder =  Charset.forName(UTF_8).newDecoder();
 
+    //message to be sent
     public AmqpTextMessage()
     {
         super();
         setText(null);
     }
 
+    //message just received
     public AmqpTextMessage(Message message, Delivery delivery, AmqpConnection 
amqpConnection)
     {
         super(message, delivery, amqpConnection);

Modified: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java?rev=1612856&r1=1612855&r2=1612856&view=diff
==============================================================================
--- 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java 
(original)
+++ 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java 
Wed Jul 23 16:00:35 2014
@@ -18,14 +18,26 @@
  */
 package org.apache.qpid.jms.impl;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageNotReadableException;
 import javax.jms.StreamMessage;
 
 import org.apache.qpid.jms.engine.AmqpListMessage;
 
 public class StreamMessageImpl extends MessageImpl<AmqpListMessage> implements 
StreamMessage
 {
+    private static final Set<Class<?>> SUPPORTED_TYPES =  new 
HashSet<Class<?>>(Arrays.asList(
+            Boolean.class, Byte.class, Short.class, Character.class, 
Integer.class, Long.class, Float.class, Double.class, String.class, 
byte[].class));
+
+    private static final int NO_BYTES_IN_FLIGHT = -1;
+    private int _remainingBytes = NO_BYTES_IN_FLIGHT;
+
     //message to be sent
     public StreamMessageImpl(SessionImpl sessionImpl, ConnectionImpl 
connectionImpl) throws JMSException
     {
@@ -45,13 +57,60 @@ public class StreamMessageImpl extends M
         throw new UnsupportedOperationException("Not Implemented");
     }
 
+    private void checkObjectType(Object value) throws 
QpidJmsMessageFormatException
+    {
+        if(value != null && !SUPPORTED_TYPES.contains(value.getClass()))
+        {
+            throw new QpidJmsMessageFormatException("Invalid object value 
type: " + value.getClass());
+        }
+    }
+
+    void checkBodyReadable() throws MessageNotReadableException
+    {
+        if(isBodyWritable())
+        {
+            throw new MessageNotReadableException("Message body is currently 
in write-only mode");
+        }
+    }
+
+    private Object readObjectInternal(boolean checkExistingReadBytesUsage) 
throws MessageEOFException, QpidJmsMessageFormatException
+    {
+        if(checkExistingReadBytesUsage)
+        {
+            if(_remainingBytes != NO_BYTES_IN_FLIGHT)
+            {
+                throw new QpidJmsMessageFormatException("Partially read bytes 
entry still being retrieved using readBytes(byte[] dest)");
+            }
+        }
+
+        try
+        {
+            return getUnderlyingAmqpMessage(false).get();
+        }
+        catch(IndexOutOfBoundsException ioobe)
+        {
+            throw new MessageEOFException("No more data in message stream");
+        }
+    }
+
     //======= JMS Methods =======
 
     @Override
     public boolean readBoolean() throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        Object o = readObject();
+        if(o instanceof Boolean)
+        {
+            return (Boolean) o;
+        }
+        else if(o instanceof String || o == null)
+        {
+            return Boolean.valueOf((String)o);
+        }
+        else
+        {
+            throw new QpidJmsMessageFormatException("Stream entry of type " + 
o.getClass().getName() + " cannot be converted to boolean.");
+        }
     }
 
     @Override
@@ -111,24 +170,81 @@ public class StreamMessageImpl extends M
     }
 
     @Override
-    public int readBytes(byte[] value) throws JMSException
+    public int readBytes(byte[] dest) throws JMSException
     {
         //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        Object o = readObjectInternal(false);
+
+        if(o == null)
+        {
+            return -1;
+        }
+
+        if(o instanceof byte[])
+        {
+            byte[] src = (byte[]) o;
+
+            if(src.length == 0)
+            {
+                return 0;
+            }
+
+            if(_remainingBytes == 0)
+            {
+                //We previously read all the bytes, but must have filled the 
dest array.
+                //Clear the remaining marker and signal completion via return 
value.
+                _remainingBytes = NO_BYTES_IN_FLIGHT;
+                return -1;
+            }
+
+            if(_remainingBytes == NO_BYTES_IN_FLIGHT)
+            {
+                //The field is non-null and non-empty, and this is the first 
read attempt.
+                //Set the remaining marker to the full size
+                _remainingBytes = src.length;
+            }
+
+            int previouslyRead = src.length - _remainingBytes;
+            int lengthToCopy = Math.min(dest.length, _remainingBytes);
+
+            if(lengthToCopy > 0)
+            {
+                System.arraycopy(src, previouslyRead, dest, 0, lengthToCopy);
+            }
+
+            _remainingBytes -= lengthToCopy;
+
+            if(_remainingBytes == 0 && lengthToCopy < dest.length)
+            {
+                //All bytes have been read and dest array was not filled on 
this call, so the return
+                //will enable the caller to determine completion. Clear the 
remaining marker.
+                _remainingBytes = NO_BYTES_IN_FLIGHT;
+            }
+            else
+            {
+                //More work to do to complete reading this field, move the 
position back.
+                getUnderlyingAmqpMessage(false).decrementPosition();
+            }
+
+            return lengthToCopy;
+        }
+        else
+        {
+            getUnderlyingAmqpMessage(false).decrementPosition();
+            throw new QpidJmsMessageFormatException("Stream entry of type " + 
o.getClass().getName() + " cannot be converted to bytes.");
+        }
     }
 
     @Override
     public Object readObject() throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        return readObjectInternal(true);
     }
 
     @Override
     public void writeBoolean(boolean value) throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        writeObject(value);
     }
 
     @Override
@@ -190,36 +306,48 @@ public class StreamMessageImpl extends M
     @Override
     public void writeBytes(byte[] value) throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        writeBytes(value, 0, value.length);
     }
 
     @Override
-    public void writeBytes(byte[] value, int offset, int length)
-            throws JMSException
+    public void writeBytes(byte[] value, int offset, int length) throws 
JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        checkBodyWritable();
+
+        byte[] dest = new byte[length];
+        System.arraycopy(value, offset, dest, 0, length);
+
+        getUnderlyingAmqpMessage(false).add(dest);
     }
 
     @Override
     public void writeObject(Object value) throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        if(value instanceof byte[])
+        {
+            writeBytes((byte[]) value);
+            return;
+        }
+
+        checkBodyWritable();
+        checkObjectType(value);
+
+        getUnderlyingAmqpMessage(false).add(value);
     }
 
     @Override
     public void reset() throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        getUnderlyingAmqpMessage(false).resetPosition();
+        setBodyWritable(false);
+        _remainingBytes = -1;
     }
 
     @Override
     public void clearBody() throws JMSException
     {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not Implemented");
+        getUnderlyingAmqpMessage(false).clear();
+        setBodyWritable(true);
+        _remainingBytes = -1;
     }
 }

Added: 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpListMessageTest.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpListMessageTest.java?rev=1612856&view=auto
==============================================================================
--- 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpListMessageTest.java
 (added)
+++ 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpListMessageTest.java
 Wed Jul 23 16:00:35 2014
@@ -0,0 +1,192 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.engine;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.jms.impl.ClientProperties;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class AmqpListMessageTest extends QpidJmsTestCase
+{
+    private AmqpConnection _mockAmqpConnection;
+    private Delivery _mockDelivery;
+
+    @Before
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _mockAmqpConnection = Mockito.mock(AmqpConnection.class);
+        _mockDelivery = Mockito.mock(Delivery.class);
+    }
+
+    @Test
+    public void testNewMessageToSendContainsMessageTypeAnnotation() throws 
Exception
+    {
+        AmqpMessage amqpListMessage = new AmqpListMessage();
+        assertTrue("expected message type annotation to be present", 
amqpListMessage.messageAnnotationExists(ClientProperties.X_OPT_JMS_MSG_TYPE));
+        assertEquals("unexpected value for message type annotation value", 
ClientProperties.STREAM_MESSAGE_TYPE, 
amqpListMessage.getMessageAnnotation(ClientProperties.X_OPT_JMS_MSG_TYPE));
+    }
+
+    @Test
+    public void testGetWithNewMessageToSendThrowsIOOBE() throws Exception
+    {
+        AmqpListMessage amqpListMessage = new AmqpListMessage();
+
+        checkForIOOBE(amqpListMessage);
+    }
+
+    @Test
+    public void testGetUsingReceivedMessageReturnsExpectedValue() throws 
Exception
+    {
+        Message message = Proton.message();
+        List<Object> list = new ArrayList<Object>();
+        list.add(Boolean.FALSE);
+        message.setBody(new AmqpValue(list));
+
+        AmqpListMessage amqpListMessage = new AmqpListMessage(message, 
_mockDelivery, _mockAmqpConnection);
+
+        assertEquals("Unexpected value retrived", Boolean.FALSE, 
amqpListMessage.get());
+    }
+
+    @Test
+    public void testResetPosition() throws Exception
+    {
+        AmqpListMessage amqpListMessage = new AmqpListMessage();
+
+        //add something
+        amqpListMessage.add(Boolean.TRUE);
+
+        //check we can access it, before IOOBE
+        checkSingleAccessWorksBeforeIOOBE(amqpListMessage, Boolean.TRUE);
+
+        //reset position
+        amqpListMessage.resetPosition();
+
+        //check it once more, should work again before another IOOBE
+        checkSingleAccessWorksBeforeIOOBE(amqpListMessage, Boolean.TRUE);
+    }
+
+    private void checkSingleAccessWorksBeforeIOOBE(AmqpListMessage 
amqpListMessage, Object expected)
+    {
+        //check the value is as expected
+        assertEquals("unexpected value", expected, amqpListMessage.get());
+
+        //verify getting again results in IOOBE as there is nothing left
+        checkForIOOBE(amqpListMessage);
+    }
+
+    private void checkForIOOBE(AmqpListMessage amqpListMessage)
+    {
+        try
+        {
+            amqpListMessage.get();
+            fail("Expected exception to be thrown");
+        }
+        catch(IndexOutOfBoundsException ioobe)
+        {
+            //expected
+        }
+    }
+
+    @Test
+    public void testClear() throws Exception
+    {
+        AmqpListMessage amqpListMessage = new AmqpListMessage();
+
+        //add some stuff
+        amqpListMessage.add(Boolean.TRUE);
+        amqpListMessage.add(Boolean.FALSE);
+
+        //retrieve only some of it, leaving some unread
+        assertEquals("unexpected value", Boolean.TRUE, amqpListMessage.get());
+
+        //clear
+        amqpListMessage.clear();
+
+        //add something else
+        amqpListMessage.add(Character.valueOf('c'));
+
+        //check we can get it alone before another IOOBE (i.e position was 
reset, other contents cleared)
+        checkSingleAccessWorksBeforeIOOBE(amqpListMessage, 
Character.valueOf('c'));
+    }
+
+    @Test
+    public void 
testCreateAmqpListMessageWithUnexpectedBodySectionTypeThrowsISE() throws 
Exception
+    {
+        Message message = Proton.message();
+        message.setBody(new AmqpSequence(new ArrayList<Object>()));
+
+        try
+        {
+            new AmqpListMessage(message, _mockDelivery, _mockAmqpConnection);
+            fail("expected exception to be thrown");
+        }
+        catch(IllegalStateException ise)
+        {
+            //expected
+        }
+    }
+
+    @Test
+    public void 
testCreateAmqpListMessageWithAmqpValueBodySectionContainingUnexpectedValueThrowsISE()
 throws Exception
+    {
+        Message message = Proton.message();
+        message.setBody(new AmqpValue("not-a-list"));
+
+        try
+        {
+            new AmqpListMessage(message, _mockDelivery, _mockAmqpConnection);
+            fail("expected exception to be thrown");
+        }
+        catch(IllegalStateException ise)
+        {
+            //expected
+        }
+    }
+
+    @Test
+    public void testCreateAmqpListMessageWithEmptyAmqpValueBodySection() 
throws Exception
+    {
+        Message message = Proton.message();
+        message.setBody(new AmqpValue(null));
+
+        AmqpListMessage amqpListMessage = new AmqpListMessage(message, 
_mockDelivery, _mockAmqpConnection);
+
+        //Should be able to use the message, e.g clearing it and adding to it.
+        amqpListMessage.clear();
+        amqpListMessage.add("myString");
+    }
+}
\ No newline at end of file

Added: 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/StreamMessageImplTest.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/StreamMessageImplTest.java?rev=1612856&view=auto
==============================================================================
--- 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/StreamMessageImplTest.java
 (added)
+++ 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/StreamMessageImplTest.java
 Wed Jul 23 16:00:35 2014
@@ -0,0 +1,445 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.impl;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.StreamMessage;
+
+import org.apache.qpid.jms.QpidJmsTestCase;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class StreamMessageImplTest extends QpidJmsTestCase
+{
+    private ConnectionImpl _mockConnectionImpl;
+    private SessionImpl _mockSessionImpl;
+
+    @Before
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _mockConnectionImpl = Mockito.mock(ConnectionImpl.class);
+        _mockSessionImpl = Mockito.mock(SessionImpl.class);
+        Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new 
DestinationHelper());
+    }
+
+    // ======= general =========
+
+    @Test
+    public void testReadWithEmptyStreamThrowsMEOFE() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new 
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+        //make it readable
+        streamMessageImpl.reset();
+
+        try
+        {
+            streamMessageImpl.readBoolean();
+            fail("Expected exception to be thrown as message has no content");
+        }
+        catch(MessageEOFException meofe)
+        {
+            //expected
+        }
+    }
+
+    @Test
+    public void testClearBodyOnNewMessageRemovesExistingValues() throws 
Exception
+    {
+        StreamMessageImpl streamMessageImpl = new 
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+        streamMessageImpl.writeBoolean(true);
+
+        streamMessageImpl.clearBody();
+
+        streamMessageImpl.writeBoolean(false);
+        streamMessageImpl.reset();
+
+        //check we get only the value added after the clear
+        assertFalse("expected value added after the clear", 
streamMessageImpl.readBoolean());
+
+        try
+        {
+            streamMessageImpl.readBoolean();
+            fail("Expected exception to be thrown");
+        }
+        catch(MessageEOFException meofe)
+        {
+            //expected
+        }
+    }
+
+    // ======= object =========
+
+    @Test
+    public void testWriteObjectWithIllegalTypeThrowsMFE() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new 
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+        try
+        {
+            streamMessageImpl.writeObject(new Exception());
+            fail("Expected exception to be thrown");
+        }
+        catch(MessageFormatException mfe)
+        {
+            //expected
+        }
+    }
+
+    // ======= bytes =========
+
+    /**
+     * Write bytes, then retrieve them as all of the legal type combinations
+     */
+    @Test
+    public void testWriteBytesReadLegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new 
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        byte[] value =  new byte[]{(byte)0, (byte)255, (byte)78};
+
+        streamMessageImpl.writeBytes(value);
+        streamMessageImpl.reset();
+
+        byte[] dest = new byte[value.length];
+
+        int readBytesLength = streamMessageImpl.readBytes(dest);
+        assertEquals("Number of bytes read did not match expectation", 
value.length, readBytesLength);
+        assertArrayEquals("value not as expected", value, dest);
+    }
+
+    /**
+     * Write bytes, then retrieve them as all of the illegal type combinations 
to verify it fails as expected
+     */
+    @Test
+    public void testWriteBytesReadIllegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new 
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        byte[] value =  new byte[]{(byte)0, (byte)255, (byte)78};
+
+        streamMessageImpl.writeBytes(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
Boolean.class);
+        /* TODO: enable when implementing
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
Byte.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
Short.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
Character.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
Integer.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
Long.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
Float.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
Double.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
String.class);
+        */
+    }
+
+    @Test
+    public void testReadObjectForBytesReturnsNewArray() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new 
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        byte[] bytes = new byte[]{(byte)11, (byte)44, (byte)99};
+        streamMessageImpl.writeBytes(bytes);
+
+        streamMessageImpl.reset();
+
+        byte[] retrievedBytes = (byte[]) streamMessageImpl.readObject();
+
+        assertNotSame("Expected different array objects", bytes, 
retrievedBytes);
+        assertArrayEquals("Expected arrays to be equal", bytes, 
retrievedBytes);
+    }
+
+    @Test
+    public void 
testReadBytesFullWithUndersizedDestinationArrayUsingMultipleReads() throws 
Exception
+    {
+        StreamMessageImpl streamMessageImpl = new 
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        byte[] bytes = new byte[]{(byte)3, (byte)78, (byte)253, (byte) 26, 
(byte) 8};
+        assertEquals("bytes should be odd length", 1, bytes.length % 2);
+        int undersizedLength = 2;
+        int remaining = 1;
+
+        streamMessageImpl.writeBytes(bytes);
+        streamMessageImpl.reset();
+
+        byte[] undersizedDestination = new byte[undersizedLength];
+        byte[] fullRetrievedBytes = new byte[bytes.length];
+
+        assertEquals("Number of bytes read did not match destination array 
length", undersizedLength, streamMessageImpl.readBytes(undersizedDestination));
+        int read = undersizedLength;
+        System.arraycopy(undersizedDestination, 0, fullRetrievedBytes, 0, 
undersizedLength);
+        assertEquals("Number of bytes read did not match destination array 
length", undersizedLength, streamMessageImpl.readBytes(undersizedDestination));
+        System.arraycopy(undersizedDestination, 0, fullRetrievedBytes, read, 
undersizedLength);
+        read += undersizedLength;
+        assertEquals("Number of bytes read did not match expectation", 
remaining, streamMessageImpl.readBytes(undersizedDestination));
+        System.arraycopy(undersizedDestination, 0, fullRetrievedBytes, read, 
remaining);
+        read += remaining;
+        assertArrayEquals("Expected array to equal retrieved bytes", bytes, 
fullRetrievedBytes);
+    }
+
+    @Test
+    public void testReadBytesFullWithPreciselySizedDestinationArray() throws 
Exception
+    {
+        StreamMessageImpl streamMessageImpl = new 
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        byte[] bytes = new byte[]{(byte)11, (byte)44, (byte)99};
+        streamMessageImpl.writeBytes(bytes);
+
+        streamMessageImpl.reset();
+
+        byte[] retrievedByteArray = new byte[bytes.length];
+        int readBytesLength = streamMessageImpl.readBytes(retrievedByteArray);
+
+        assertEquals("Number of bytes read did not match original array 
length", bytes.length, readBytesLength);
+        assertArrayEquals("Expected array to equal retrieved bytes", bytes, 
retrievedByteArray);
+        assertEquals("Expected completion return value", -1, 
streamMessageImpl.readBytes(retrievedByteArray));
+    }
+
+    @Test
+    public void testReadBytesFullWithOversizedDestinationArray() throws 
Exception
+    {
+        StreamMessageImpl streamMessageImpl = new 
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        byte[] bytes = new byte[]{(byte)4, (byte)115, (byte)255};
+        streamMessageImpl.writeBytes(bytes);
+
+        streamMessageImpl.reset();
+
+        byte[] oversizedDestination = new byte[bytes.length + 1];
+        int readBytesLength = 
streamMessageImpl.readBytes(oversizedDestination);
+
+        assertEquals("Number of bytes read did not match original array 
length", bytes.length, readBytesLength);
+        assertArrayEquals("Expected array subset to equal retrieved bytes", 
bytes, Arrays.copyOfRange(oversizedDestination, 0, readBytesLength));
+    }
+
+    /**
+     * {@link StreamMessage#readBytes(byte[])} indicates:
+     *
+     * "Once the first readBytes call on a byte[] field value has been made, 
the full value of the field must be read
+     * before it is valid to read the next field. An attempt to read the next 
field before that has been done will
+     * throw a MessageFormatException."
+     *
+     * {@link StreamMessage#readObject()} indicates:
+     * "An attempt to call readObject to read a byte field value into a new 
byte[] object before the full value
+     * of the byte field has been read will throw a MessageFormatException."
+     *
+     * Test that these restrictions are met, and don't interfere with 
completing the readBytes usage.
+     */
+    @Test
+    public void testReadObjectAfterPartialReadBytesThrowsMFE() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new 
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        byte[] bytes = new byte[]{(byte)11, (byte)44, (byte)99};
+        streamMessageImpl.writeBytes(bytes);
+
+        streamMessageImpl.reset();
+
+        //start reading via readBytes
+        int partialLength = 2;
+        byte[] retrievedByteArray = new byte[partialLength];
+        int readBytesLength = streamMessageImpl.readBytes(retrievedByteArray);
+
+        assertEquals(partialLength, readBytesLength);
+        assertArrayEquals("Expected array subset to equal retrieved bytes", 
Arrays.copyOf(bytes, partialLength), retrievedByteArray);
+
+        //check that using readObject does not return the full/remaining bytes 
as a new array
+        try
+        {
+            streamMessageImpl.readObject();
+            fail("expected exception to be thrown");
+        }
+        catch(MessageFormatException mfe)
+        {
+            //expected
+        }
+
+        //finish reading via reaBytes to ensure it can be completed
+        readBytesLength = streamMessageImpl.readBytes(retrievedByteArray);
+        assertEquals(bytes.length - partialLength, readBytesLength);
+        assertArrayEquals("Expected array subset to equal retrieved bytes",
+                Arrays.copyOfRange(bytes, partialLength, bytes.length), 
Arrays.copyOfRange(retrievedByteArray, 0, readBytesLength));
+    }
+
+    //========= boolean ========
+
+    /**
+     * Set a boolean, then retrieve it as all of the legal type combinations 
to verify it is parsed correctly
+     */
+    @Test
+    public void testWriteBooleanReadLegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new 
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        boolean value = true;
+
+        streamMessageImpl.writeBoolean(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryEquals(streamMessageImpl, value, Boolean.class);
+
+        /* TODO: enable when implementing
+        assertGetStreamEntryEquals(streamMessageImpl, String.valueOf(value), 
String.class);
+        */
+    }
+
+    /**
+     * Set a boolean, then retrieve it as all of the illegal type combinations 
to verify it fails as expected
+     */
+    @Test
+    public void testSetBooleanGetIllegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new 
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        boolean value = true;
+
+        streamMessageImpl.writeBoolean(value);
+        streamMessageImpl.reset();
+
+        /* TODO: enable when implementing
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
Byte.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
Short.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
Character.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
Integer.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
Long.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
Float.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
Double.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, 
byte[].class);
+        */
+    }
+
+    //========= utility methods ========
+
+    private void assertGetStreamEntryEquals(StreamMessageImpl testMessage,
+                                            Object expectedValue,
+                                            Class<?> clazz) throws JMSException
+    {
+        if(clazz == byte[].class)
+        {
+            throw new IllegalArgumentException("byte[] values not suported");
+        }
+
+        Object actualValue = getStreamEntryUsingTypeMethod(testMessage, clazz, 
null);
+        assertEquals(expectedValue, actualValue);
+    }
+
+    private void 
assertGetStreamEntryThrowsMessageFormatException(StreamMessageImpl testMessage,
+                                                                  Class<?> 
clazz) throws JMSException
+    {
+        try
+        {
+            getStreamEntryUsingTypeMethod(testMessage, clazz, new byte[0]);
+
+            fail("expected exception to be thrown");
+        }
+        catch(MessageFormatException jmsMFE)
+        {
+            //expected
+        }
+    }
+
+    private void 
assertGetStreamEntryThrowsNumberFormatException(StreamMessageImpl testMessage,
+                                                                 Class<?> 
clazz) throws JMSException
+    {
+        assertGetStreamEntryThrowsNumberFormatException(testMessage, clazz, 
null);
+    }
+
+    private void 
assertGetStreamEntryThrowsNumberFormatException(StreamMessageImpl testMessage,
+                                                                 Class<?> 
clazz,
+                                                                 byte[] 
destination) throws JMSException
+    {
+        if(clazz == byte[].class && destination == null)
+        {
+            throw new IllegalArgumentException("Destinatinon byte[] must be 
supplied");
+        }
+
+        try
+        {
+            getStreamEntryUsingTypeMethod(testMessage, clazz, destination);
+
+            fail("expected exception to be thrown");
+        }
+        catch(NumberFormatException nfe)
+        {
+            //expected
+        }
+    }
+
+    private Object getStreamEntryUsingTypeMethod(StreamMessageImpl 
testMessage, Class<?> clazz, byte[] destination) throws JMSException
+    {
+        if(clazz == Boolean.class)
+        {
+            return testMessage.readBoolean();
+        }
+        else if(clazz == Byte.class)
+        {
+            return testMessage.readByte();
+        }
+        else if(clazz == Character.class)
+        {
+            return testMessage.readChar();
+        }
+        else if(clazz == Short.class)
+        {
+            return testMessage.readShort();
+        }
+        else if(clazz == Integer.class)
+        {
+            return testMessage.readInt();
+        }
+        else if(clazz == Long.class)
+        {
+            return testMessage.readLong();
+        }
+        else if(clazz == Float.class)
+        {
+            return testMessage.readFloat();
+        }
+        else if(clazz == Double.class)
+        {
+            return testMessage.readDouble();
+        }
+        else if(clazz == String.class)
+        {
+            return testMessage.readString();
+        }
+        else if(clazz == byte[].class)
+        {
+            return testMessage.readBytes(destination);
+        }
+        else
+        {
+            throw new RuntimeException("Unexpected entry type class");
+        }
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to