Author: robbie
Date: Wed Jul 23 16:00:35 2014
New Revision: 1612856
URL: http://svn.apache.org/r1612856
Log:
QPIDJMS-22: initial work on implementing StreamMessage
Added:
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpListMessageTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/StreamMessageImplTest.java
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java?rev=1612856&r1=1612855&r2=1612856&view=diff
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java
(original)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java
Wed Jul 23 16:00:35 2014
@@ -24,12 +24,14 @@ import org.apache.qpid.proton.message.Me
public class AmqpGenericMessage extends AmqpMessage
{
+ //message to be sent
public AmqpGenericMessage()
{
super();
setMessageAnnotation(ClientProperties.X_OPT_JMS_MSG_TYPE,
ClientProperties.GENERIC_MESSAGE_TYPE);
}
+ //message just received
public AmqpGenericMessage(Delivery delivery, Message message,
AmqpConnection amqpConnection)
{
super(message, delivery, amqpConnection);
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java?rev=1612856&r1=1612855&r2=1612856&view=diff
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java
(original)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java
Wed Jul 23 16:00:35 2014
@@ -18,20 +18,130 @@
*/
package org.apache.qpid.jms.engine;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.qpid.jms.impl.ClientProperties;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.message.Message;
public class AmqpListMessage extends AmqpMessage
{
+ private List<Object> _list;
+ private int _position = 0;
+
+ //message to be sent
public AmqpListMessage()
{
super();
+ _list = createListBody(getMessage());
+ setMessageAnnotation(ClientProperties.X_OPT_JMS_MSG_TYPE,
ClientProperties.STREAM_MESSAGE_TYPE);
}
+ //message just received
public AmqpListMessage(Message message, Delivery delivery, AmqpConnection
amqpConnection)
{
super(message, delivery, amqpConnection);
+ _list = processBodyList(message);
+ }
+
+ @SuppressWarnings("unchecked")
+ private List<Object> processBodyList(Message message)
+ {
+ Section body = getMessage().getBody();
+
+ if(body == null)
+ {
+ return createListBody(message);
+ }
+ else if(body instanceof AmqpValue)
+ {
+ Object value = ((AmqpValue) body).getValue();
+
+ if(value == null)
+ {
+ return createListBody(message);
+ }
+ else if(value instanceof List<?>)
+ {
+ return (List<Object>) value;
+ }
+ else
+ {
+ throw new IllegalStateException("Unexpected amqp-value body
content type: " + value.getClass().getSimpleName());
+ }
+ }
+ else
+ {
+ throw new IllegalStateException("Unexpected message body type: " +
body.getClass().getSimpleName());
+ }
+ }
+
+ private List<Object> createListBody(Message message)
+ {
+ List<Object> list = new ArrayList<Object>();
+ message.setBody(new AmqpValue(list));
+
+ return list;
+ }
+
+ /**
+ * Adds the provided Object to the list.
+ *
+ * If the value provided is a byte[] its entry in the list is as an AMQP
Binary, and the
+ * underlying array is NOT copied and MUST NOT be subsequently altered.
+ */
+ public void add(final Object o)
+ {
+ Object val = o;
+ if(val instanceof byte[])
+ {
+ val = new Binary((byte[]) o);
+ }
+
+ _list.add(val);
+ }
+
+ /**
+ *
+ * Returns the next value in the list, or throws {@link
IndexOutOfBoundsException} if there are no unread entries remaining.
+ *
+ * If the value being returned is a byte[] representing AMQP binary, the
array returned IS a copy.
+ *
+ * @throws IndexOutOfBoundsException
+ */
+ public Object get() throws IndexOutOfBoundsException
+ {
+ Object object = _list.get(_position++);
+ if(object instanceof Binary)
+ {
+ //We will return a byte[]. It is possibly only part of the
underlying array, copy that bit.
+ Binary bin = ((Binary) object);
+
+ return Arrays.copyOfRange(bin.getArray(), bin.getArrayOffset(),
bin.getLength());
+ }
+
+ return object;
+ }
+
+ public void clear()
+ {
+ _list.clear();
+ resetPosition();
+ }
+
+ public void resetPosition()
+ {
+ _position = 0;
+ }
+
+ public void decrementPosition()
+ {
+ _position--;
}
- //TODO: methods to access/set content
}
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java?rev=1612856&r1=1612855&r2=1612856&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java
(original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java
Wed Jul 23 16:00:35 2014
@@ -34,6 +34,7 @@ public class AmqpMapMessage extends Amqp
{
private volatile Map<String,Object> _messageBodyMap;
+ //message to be sent
public AmqpMapMessage()
{
super();
@@ -41,6 +42,7 @@ public class AmqpMapMessage extends Amqp
setMessageAnnotation(ClientProperties.X_OPT_JMS_MSG_TYPE,
ClientProperties.MAP_MESSAGE_TYPE);
}
+ //message just received
@SuppressWarnings("unchecked")
public AmqpMapMessage(Message message, Delivery delivery, AmqpConnection
amqpConnection)
{
@@ -92,7 +94,8 @@ public class AmqpMapMessage extends Amqp
*
* If a previous mapping for the key exists, the old value is replaced by
the specified value.
*
- * To be clear, if the value provided is a byte[] then it is NOT copied
and MUST NOT be subsequently altered.
+ * If the value provided is a byte[] its entry in the map is as an AMQP
Binary, and the
+ * underlying array is NOT copied and MUST NOT be subsequently altered.
*
* @param key the key for the mapping
* @param value the value for the mapping
@@ -111,7 +114,7 @@ public class AmqpMapMessage extends Amqp
/**
* Returns the value to which the specified key is mapped, or null if this
map contains no mapping for the key.
*
- * If the value being returned is a byte[], the array returned IS a copy.
+ * If the value being returned is a byte[] representing AMQP binary, the
array returned IS a copy.
*
* @param key the key for the mapping
* @return the value if one exists for this key, or null if there was none.
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java?rev=1612856&r1=1612855&r2=1612856&view=diff
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
(original)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
Wed Jul 23 16:00:35 2014
@@ -32,6 +32,7 @@ public class AmqpObjectMessage extends A
private AmqpObjectMessageDelegate _delegate;
private boolean _useAmqpTypeEncoding = false;
+ //message to be sent
public AmqpObjectMessage()
{
super();
@@ -40,6 +41,7 @@ public class AmqpObjectMessage extends A
initDelegate(false);
}
+ //message just received
public AmqpObjectMessage(Message message, Delivery delivery,
AmqpConnection amqpConnection, boolean useAmqpTypes)
{
super(message, delivery, amqpConnection);
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java?rev=1612856&r1=1612855&r2=1612856&view=diff
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java
(original)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java
Wed Jul 23 16:00:35 2014
@@ -46,12 +46,14 @@ public class AmqpTextMessage extends Amq
private CharsetDecoder _decoder = Charset.forName(UTF_8).newDecoder();
+ //message to be sent
public AmqpTextMessage()
{
super();
setText(null);
}
+ //message just received
public AmqpTextMessage(Message message, Delivery delivery, AmqpConnection
amqpConnection)
{
super(message, delivery, amqpConnection);
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java?rev=1612856&r1=1612855&r2=1612856&view=diff
==============================================================================
---
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java
(original)
+++
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java
Wed Jul 23 16:00:35 2014
@@ -18,14 +18,26 @@
*/
package org.apache.qpid.jms.impl;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageNotReadableException;
import javax.jms.StreamMessage;
import org.apache.qpid.jms.engine.AmqpListMessage;
public class StreamMessageImpl extends MessageImpl<AmqpListMessage> implements
StreamMessage
{
+ private static final Set<Class<?>> SUPPORTED_TYPES = new
HashSet<Class<?>>(Arrays.asList(
+ Boolean.class, Byte.class, Short.class, Character.class,
Integer.class, Long.class, Float.class, Double.class, String.class,
byte[].class));
+
+ private static final int NO_BYTES_IN_FLIGHT = -1;
+ private int _remainingBytes = NO_BYTES_IN_FLIGHT;
+
//message to be sent
public StreamMessageImpl(SessionImpl sessionImpl, ConnectionImpl
connectionImpl) throws JMSException
{
@@ -45,13 +57,60 @@ public class StreamMessageImpl extends M
throw new UnsupportedOperationException("Not Implemented");
}
+ private void checkObjectType(Object value) throws
QpidJmsMessageFormatException
+ {
+ if(value != null && !SUPPORTED_TYPES.contains(value.getClass()))
+ {
+ throw new QpidJmsMessageFormatException("Invalid object value
type: " + value.getClass());
+ }
+ }
+
+ void checkBodyReadable() throws MessageNotReadableException
+ {
+ if(isBodyWritable())
+ {
+ throw new MessageNotReadableException("Message body is currently
in write-only mode");
+ }
+ }
+
+ private Object readObjectInternal(boolean checkExistingReadBytesUsage)
throws MessageEOFException, QpidJmsMessageFormatException
+ {
+ if(checkExistingReadBytesUsage)
+ {
+ if(_remainingBytes != NO_BYTES_IN_FLIGHT)
+ {
+ throw new QpidJmsMessageFormatException("Partially read bytes
entry still being retrieved using readBytes(byte[] dest)");
+ }
+ }
+
+ try
+ {
+ return getUnderlyingAmqpMessage(false).get();
+ }
+ catch(IndexOutOfBoundsException ioobe)
+ {
+ throw new MessageEOFException("No more data in message stream");
+ }
+ }
+
//======= JMS Methods =======
@Override
public boolean readBoolean() throws JMSException
{
- //TODO
- throw new UnsupportedOperationException("Not Implemented");
+ Object o = readObject();
+ if(o instanceof Boolean)
+ {
+ return (Boolean) o;
+ }
+ else if(o instanceof String || o == null)
+ {
+ return Boolean.valueOf((String)o);
+ }
+ else
+ {
+ throw new QpidJmsMessageFormatException("Stream entry of type " +
o.getClass().getName() + " cannot be converted to boolean.");
+ }
}
@Override
@@ -111,24 +170,81 @@ public class StreamMessageImpl extends M
}
@Override
- public int readBytes(byte[] value) throws JMSException
+ public int readBytes(byte[] dest) throws JMSException
{
//TODO
- throw new UnsupportedOperationException("Not Implemented");
+ Object o = readObjectInternal(false);
+
+ if(o == null)
+ {
+ return -1;
+ }
+
+ if(o instanceof byte[])
+ {
+ byte[] src = (byte[]) o;
+
+ if(src.length == 0)
+ {
+ return 0;
+ }
+
+ if(_remainingBytes == 0)
+ {
+ //We previously read all the bytes, but must have filled the
dest array.
+ //Clear the remaining marker and signal completion via return
value.
+ _remainingBytes = NO_BYTES_IN_FLIGHT;
+ return -1;
+ }
+
+ if(_remainingBytes == NO_BYTES_IN_FLIGHT)
+ {
+ //The field is non-null and non-empty, and this is the first
read attempt.
+ //Set the remaining marker to the full size
+ _remainingBytes = src.length;
+ }
+
+ int previouslyRead = src.length - _remainingBytes;
+ int lengthToCopy = Math.min(dest.length, _remainingBytes);
+
+ if(lengthToCopy > 0)
+ {
+ System.arraycopy(src, previouslyRead, dest, 0, lengthToCopy);
+ }
+
+ _remainingBytes -= lengthToCopy;
+
+ if(_remainingBytes == 0 && lengthToCopy < dest.length)
+ {
+ //All bytes have been read and dest array was not filled on
this call, so the return
+ //will enable the caller to determine completion. Clear the
remaining marker.
+ _remainingBytes = NO_BYTES_IN_FLIGHT;
+ }
+ else
+ {
+ //More work to do to complete reading this field, move the
position back.
+ getUnderlyingAmqpMessage(false).decrementPosition();
+ }
+
+ return lengthToCopy;
+ }
+ else
+ {
+ getUnderlyingAmqpMessage(false).decrementPosition();
+ throw new QpidJmsMessageFormatException("Stream entry of type " +
o.getClass().getName() + " cannot be converted to bytes.");
+ }
}
@Override
public Object readObject() throws JMSException
{
- //TODO
- throw new UnsupportedOperationException("Not Implemented");
+ return readObjectInternal(true);
}
@Override
public void writeBoolean(boolean value) throws JMSException
{
- //TODO
- throw new UnsupportedOperationException("Not Implemented");
+ writeObject(value);
}
@Override
@@ -190,36 +306,48 @@ public class StreamMessageImpl extends M
@Override
public void writeBytes(byte[] value) throws JMSException
{
- //TODO
- throw new UnsupportedOperationException("Not Implemented");
+ writeBytes(value, 0, value.length);
}
@Override
- public void writeBytes(byte[] value, int offset, int length)
- throws JMSException
+ public void writeBytes(byte[] value, int offset, int length) throws
JMSException
{
- //TODO
- throw new UnsupportedOperationException("Not Implemented");
+ checkBodyWritable();
+
+ byte[] dest = new byte[length];
+ System.arraycopy(value, offset, dest, 0, length);
+
+ getUnderlyingAmqpMessage(false).add(dest);
}
@Override
public void writeObject(Object value) throws JMSException
{
- //TODO
- throw new UnsupportedOperationException("Not Implemented");
+ if(value instanceof byte[])
+ {
+ writeBytes((byte[]) value);
+ return;
+ }
+
+ checkBodyWritable();
+ checkObjectType(value);
+
+ getUnderlyingAmqpMessage(false).add(value);
}
@Override
public void reset() throws JMSException
{
- //TODO
- throw new UnsupportedOperationException("Not Implemented");
+ getUnderlyingAmqpMessage(false).resetPosition();
+ setBodyWritable(false);
+ _remainingBytes = -1;
}
@Override
public void clearBody() throws JMSException
{
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
+ getUnderlyingAmqpMessage(false).clear();
+ setBodyWritable(true);
+ _remainingBytes = -1;
}
}
Added:
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpListMessageTest.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpListMessageTest.java?rev=1612856&view=auto
==============================================================================
---
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpListMessageTest.java
(added)
+++
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpListMessageTest.java
Wed Jul 23 16:00:35 2014
@@ -0,0 +1,192 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.engine;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.jms.impl.ClientProperties;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class AmqpListMessageTest extends QpidJmsTestCase
+{
+ private AmqpConnection _mockAmqpConnection;
+ private Delivery _mockDelivery;
+
+ @Before
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _mockAmqpConnection = Mockito.mock(AmqpConnection.class);
+ _mockDelivery = Mockito.mock(Delivery.class);
+ }
+
+ @Test
+ public void testNewMessageToSendContainsMessageTypeAnnotation() throws
Exception
+ {
+ AmqpMessage amqpListMessage = new AmqpListMessage();
+ assertTrue("expected message type annotation to be present",
amqpListMessage.messageAnnotationExists(ClientProperties.X_OPT_JMS_MSG_TYPE));
+ assertEquals("unexpected value for message type annotation value",
ClientProperties.STREAM_MESSAGE_TYPE,
amqpListMessage.getMessageAnnotation(ClientProperties.X_OPT_JMS_MSG_TYPE));
+ }
+
+ @Test
+ public void testGetWithNewMessageToSendThrowsIOOBE() throws Exception
+ {
+ AmqpListMessage amqpListMessage = new AmqpListMessage();
+
+ checkForIOOBE(amqpListMessage);
+ }
+
+ @Test
+ public void testGetUsingReceivedMessageReturnsExpectedValue() throws
Exception
+ {
+ Message message = Proton.message();
+ List<Object> list = new ArrayList<Object>();
+ list.add(Boolean.FALSE);
+ message.setBody(new AmqpValue(list));
+
+ AmqpListMessage amqpListMessage = new AmqpListMessage(message,
_mockDelivery, _mockAmqpConnection);
+
+ assertEquals("Unexpected value retrived", Boolean.FALSE,
amqpListMessage.get());
+ }
+
+ @Test
+ public void testResetPosition() throws Exception
+ {
+ AmqpListMessage amqpListMessage = new AmqpListMessage();
+
+ //add something
+ amqpListMessage.add(Boolean.TRUE);
+
+ //check we can access it, before IOOBE
+ checkSingleAccessWorksBeforeIOOBE(amqpListMessage, Boolean.TRUE);
+
+ //reset position
+ amqpListMessage.resetPosition();
+
+ //check it once more, should work again before another IOOBE
+ checkSingleAccessWorksBeforeIOOBE(amqpListMessage, Boolean.TRUE);
+ }
+
+ private void checkSingleAccessWorksBeforeIOOBE(AmqpListMessage
amqpListMessage, Object expected)
+ {
+ //check the value is as expected
+ assertEquals("unexpected value", expected, amqpListMessage.get());
+
+ //verify getting again results in IOOBE as there is nothing left
+ checkForIOOBE(amqpListMessage);
+ }
+
+ private void checkForIOOBE(AmqpListMessage amqpListMessage)
+ {
+ try
+ {
+ amqpListMessage.get();
+ fail("Expected exception to be thrown");
+ }
+ catch(IndexOutOfBoundsException ioobe)
+ {
+ //expected
+ }
+ }
+
+ @Test
+ public void testClear() throws Exception
+ {
+ AmqpListMessage amqpListMessage = new AmqpListMessage();
+
+ //add some stuff
+ amqpListMessage.add(Boolean.TRUE);
+ amqpListMessage.add(Boolean.FALSE);
+
+ //retrieve only some of it, leaving some unread
+ assertEquals("unexpected value", Boolean.TRUE, amqpListMessage.get());
+
+ //clear
+ amqpListMessage.clear();
+
+ //add something else
+ amqpListMessage.add(Character.valueOf('c'));
+
+ //check we can get it alone before another IOOBE (i.e position was
reset, other contents cleared)
+ checkSingleAccessWorksBeforeIOOBE(amqpListMessage,
Character.valueOf('c'));
+ }
+
+ @Test
+ public void
testCreateAmqpListMessageWithUnexpectedBodySectionTypeThrowsISE() throws
Exception
+ {
+ Message message = Proton.message();
+ message.setBody(new AmqpSequence(new ArrayList<Object>()));
+
+ try
+ {
+ new AmqpListMessage(message, _mockDelivery, _mockAmqpConnection);
+ fail("expected exception to be thrown");
+ }
+ catch(IllegalStateException ise)
+ {
+ //expected
+ }
+ }
+
+ @Test
+ public void
testCreateAmqpListMessageWithAmqpValueBodySectionContainingUnexpectedValueThrowsISE()
throws Exception
+ {
+ Message message = Proton.message();
+ message.setBody(new AmqpValue("not-a-list"));
+
+ try
+ {
+ new AmqpListMessage(message, _mockDelivery, _mockAmqpConnection);
+ fail("expected exception to be thrown");
+ }
+ catch(IllegalStateException ise)
+ {
+ //expected
+ }
+ }
+
+ @Test
+ public void testCreateAmqpListMessageWithEmptyAmqpValueBodySection()
throws Exception
+ {
+ Message message = Proton.message();
+ message.setBody(new AmqpValue(null));
+
+ AmqpListMessage amqpListMessage = new AmqpListMessage(message,
_mockDelivery, _mockAmqpConnection);
+
+ //Should be able to use the message, e.g clearing it and adding to it.
+ amqpListMessage.clear();
+ amqpListMessage.add("myString");
+ }
+}
\ No newline at end of file
Added:
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/StreamMessageImplTest.java
URL:
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/StreamMessageImplTest.java?rev=1612856&view=auto
==============================================================================
---
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/StreamMessageImplTest.java
(added)
+++
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/StreamMessageImplTest.java
Wed Jul 23 16:00:35 2014
@@ -0,0 +1,445 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.impl;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.StreamMessage;
+
+import org.apache.qpid.jms.QpidJmsTestCase;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class StreamMessageImplTest extends QpidJmsTestCase
+{
+ private ConnectionImpl _mockConnectionImpl;
+ private SessionImpl _mockSessionImpl;
+
+ @Before
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _mockConnectionImpl = Mockito.mock(ConnectionImpl.class);
+ _mockSessionImpl = Mockito.mock(SessionImpl.class);
+ Mockito.when(_mockSessionImpl.getDestinationHelper()).thenReturn(new
DestinationHelper());
+ }
+
+ // ======= general =========
+
+ @Test
+ public void testReadWithEmptyStreamThrowsMEOFE() throws Exception
+ {
+ StreamMessageImpl streamMessageImpl = new
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+ //make it readable
+ streamMessageImpl.reset();
+
+ try
+ {
+ streamMessageImpl.readBoolean();
+ fail("Expected exception to be thrown as message has no content");
+ }
+ catch(MessageEOFException meofe)
+ {
+ //expected
+ }
+ }
+
+ @Test
+ public void testClearBodyOnNewMessageRemovesExistingValues() throws
Exception
+ {
+ StreamMessageImpl streamMessageImpl = new
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+ streamMessageImpl.writeBoolean(true);
+
+ streamMessageImpl.clearBody();
+
+ streamMessageImpl.writeBoolean(false);
+ streamMessageImpl.reset();
+
+ //check we get only the value added after the clear
+ assertFalse("expected value added after the clear",
streamMessageImpl.readBoolean());
+
+ try
+ {
+ streamMessageImpl.readBoolean();
+ fail("Expected exception to be thrown");
+ }
+ catch(MessageEOFException meofe)
+ {
+ //expected
+ }
+ }
+
+ // ======= object =========
+
+ @Test
+ public void testWriteObjectWithIllegalTypeThrowsMFE() throws Exception
+ {
+ StreamMessageImpl streamMessageImpl = new
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+ try
+ {
+ streamMessageImpl.writeObject(new Exception());
+ fail("Expected exception to be thrown");
+ }
+ catch(MessageFormatException mfe)
+ {
+ //expected
+ }
+ }
+
+ // ======= bytes =========
+
+ /**
+ * Write bytes, then retrieve them as all of the legal type combinations
+ */
+ @Test
+ public void testWriteBytesReadLegal() throws Exception
+ {
+ StreamMessageImpl streamMessageImpl = new
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+ byte[] value = new byte[]{(byte)0, (byte)255, (byte)78};
+
+ streamMessageImpl.writeBytes(value);
+ streamMessageImpl.reset();
+
+ byte[] dest = new byte[value.length];
+
+ int readBytesLength = streamMessageImpl.readBytes(dest);
+ assertEquals("Number of bytes read did not match expectation",
value.length, readBytesLength);
+ assertArrayEquals("value not as expected", value, dest);
+ }
+
+ /**
+ * Write bytes, then retrieve them as all of the illegal type combinations
to verify it fails as expected
+ */
+ @Test
+ public void testWriteBytesReadIllegal() throws Exception
+ {
+ StreamMessageImpl streamMessageImpl = new
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+ byte[] value = new byte[]{(byte)0, (byte)255, (byte)78};
+
+ streamMessageImpl.writeBytes(value);
+ streamMessageImpl.reset();
+
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
Boolean.class);
+ /* TODO: enable when implementing
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
Byte.class);
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
Short.class);
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
Character.class);
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
Integer.class);
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
Long.class);
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
Float.class);
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
Double.class);
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
String.class);
+ */
+ }
+
+ @Test
+ public void testReadObjectForBytesReturnsNewArray() throws Exception
+ {
+ StreamMessageImpl streamMessageImpl = new
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+ byte[] bytes = new byte[]{(byte)11, (byte)44, (byte)99};
+ streamMessageImpl.writeBytes(bytes);
+
+ streamMessageImpl.reset();
+
+ byte[] retrievedBytes = (byte[]) streamMessageImpl.readObject();
+
+ assertNotSame("Expected different array objects", bytes,
retrievedBytes);
+ assertArrayEquals("Expected arrays to be equal", bytes,
retrievedBytes);
+ }
+
+ @Test
+ public void
testReadBytesFullWithUndersizedDestinationArrayUsingMultipleReads() throws
Exception
+ {
+ StreamMessageImpl streamMessageImpl = new
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+ byte[] bytes = new byte[]{(byte)3, (byte)78, (byte)253, (byte) 26,
(byte) 8};
+ assertEquals("bytes should be odd length", 1, bytes.length % 2);
+ int undersizedLength = 2;
+ int remaining = 1;
+
+ streamMessageImpl.writeBytes(bytes);
+ streamMessageImpl.reset();
+
+ byte[] undersizedDestination = new byte[undersizedLength];
+ byte[] fullRetrievedBytes = new byte[bytes.length];
+
+ assertEquals("Number of bytes read did not match destination array
length", undersizedLength, streamMessageImpl.readBytes(undersizedDestination));
+ int read = undersizedLength;
+ System.arraycopy(undersizedDestination, 0, fullRetrievedBytes, 0,
undersizedLength);
+ assertEquals("Number of bytes read did not match destination array
length", undersizedLength, streamMessageImpl.readBytes(undersizedDestination));
+ System.arraycopy(undersizedDestination, 0, fullRetrievedBytes, read,
undersizedLength);
+ read += undersizedLength;
+ assertEquals("Number of bytes read did not match expectation",
remaining, streamMessageImpl.readBytes(undersizedDestination));
+ System.arraycopy(undersizedDestination, 0, fullRetrievedBytes, read,
remaining);
+ read += remaining;
+ assertArrayEquals("Expected array to equal retrieved bytes", bytes,
fullRetrievedBytes);
+ }
+
+ @Test
+ public void testReadBytesFullWithPreciselySizedDestinationArray() throws
Exception
+ {
+ StreamMessageImpl streamMessageImpl = new
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+ byte[] bytes = new byte[]{(byte)11, (byte)44, (byte)99};
+ streamMessageImpl.writeBytes(bytes);
+
+ streamMessageImpl.reset();
+
+ byte[] retrievedByteArray = new byte[bytes.length];
+ int readBytesLength = streamMessageImpl.readBytes(retrievedByteArray);
+
+ assertEquals("Number of bytes read did not match original array
length", bytes.length, readBytesLength);
+ assertArrayEquals("Expected array to equal retrieved bytes", bytes,
retrievedByteArray);
+ assertEquals("Expected completion return value", -1,
streamMessageImpl.readBytes(retrievedByteArray));
+ }
+
+ @Test
+ public void testReadBytesFullWithOversizedDestinationArray() throws
Exception
+ {
+ StreamMessageImpl streamMessageImpl = new
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+ byte[] bytes = new byte[]{(byte)4, (byte)115, (byte)255};
+ streamMessageImpl.writeBytes(bytes);
+
+ streamMessageImpl.reset();
+
+ byte[] oversizedDestination = new byte[bytes.length + 1];
+ int readBytesLength =
streamMessageImpl.readBytes(oversizedDestination);
+
+ assertEquals("Number of bytes read did not match original array
length", bytes.length, readBytesLength);
+ assertArrayEquals("Expected array subset to equal retrieved bytes",
bytes, Arrays.copyOfRange(oversizedDestination, 0, readBytesLength));
+ }
+
+ /**
+ * {@link StreamMessage#readBytes(byte[])} indicates:
+ *
+ * "Once the first readBytes call on a byte[] field value has been made,
the full value of the field must be read
+ * before it is valid to read the next field. An attempt to read the next
field before that has been done will
+ * throw a MessageFormatException."
+ *
+ * {@link StreamMessage#readObject()} indicates:
+ * "An attempt to call readObject to read a byte field value into a new
byte[] object before the full value
+ * of the byte field has been read will throw a MessageFormatException."
+ *
+ * Test that these restrictions are met, and don't interfere with
completing the readBytes usage.
+ */
+ @Test
+ public void testReadObjectAfterPartialReadBytesThrowsMFE() throws Exception
+ {
+ StreamMessageImpl streamMessageImpl = new
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+ byte[] bytes = new byte[]{(byte)11, (byte)44, (byte)99};
+ streamMessageImpl.writeBytes(bytes);
+
+ streamMessageImpl.reset();
+
+ //start reading via readBytes
+ int partialLength = 2;
+ byte[] retrievedByteArray = new byte[partialLength];
+ int readBytesLength = streamMessageImpl.readBytes(retrievedByteArray);
+
+ assertEquals(partialLength, readBytesLength);
+ assertArrayEquals("Expected array subset to equal retrieved bytes",
Arrays.copyOf(bytes, partialLength), retrievedByteArray);
+
+ //check that using readObject does not return the full/remaining bytes
as a new array
+ try
+ {
+ streamMessageImpl.readObject();
+ fail("expected exception to be thrown");
+ }
+ catch(MessageFormatException mfe)
+ {
+ //expected
+ }
+
+ //finish reading via reaBytes to ensure it can be completed
+ readBytesLength = streamMessageImpl.readBytes(retrievedByteArray);
+ assertEquals(bytes.length - partialLength, readBytesLength);
+ assertArrayEquals("Expected array subset to equal retrieved bytes",
+ Arrays.copyOfRange(bytes, partialLength, bytes.length),
Arrays.copyOfRange(retrievedByteArray, 0, readBytesLength));
+ }
+
+ //========= boolean ========
+
+ /**
+ * Set a boolean, then retrieve it as all of the legal type combinations
to verify it is parsed correctly
+ */
+ @Test
+ public void testWriteBooleanReadLegal() throws Exception
+ {
+ StreamMessageImpl streamMessageImpl = new
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+ boolean value = true;
+
+ streamMessageImpl.writeBoolean(value);
+ streamMessageImpl.reset();
+
+ assertGetStreamEntryEquals(streamMessageImpl, value, Boolean.class);
+
+ /* TODO: enable when implementing
+ assertGetStreamEntryEquals(streamMessageImpl, String.valueOf(value),
String.class);
+ */
+ }
+
+ /**
+ * Set a boolean, then retrieve it as all of the illegal type combinations
to verify it fails as expected
+ */
+ @Test
+ public void testSetBooleanGetIllegal() throws Exception
+ {
+ StreamMessageImpl streamMessageImpl = new
StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+ boolean value = true;
+
+ streamMessageImpl.writeBoolean(value);
+ streamMessageImpl.reset();
+
+ /* TODO: enable when implementing
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
Byte.class);
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
Short.class);
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
Character.class);
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
Integer.class);
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
Long.class);
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
Float.class);
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
Double.class);
+ assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl,
byte[].class);
+ */
+ }
+
+ //========= utility methods ========
+
+ private void assertGetStreamEntryEquals(StreamMessageImpl testMessage,
+ Object expectedValue,
+ Class<?> clazz) throws JMSException
+ {
+ if(clazz == byte[].class)
+ {
+ throw new IllegalArgumentException("byte[] values not suported");
+ }
+
+ Object actualValue = getStreamEntryUsingTypeMethod(testMessage, clazz,
null);
+ assertEquals(expectedValue, actualValue);
+ }
+
+ private void
assertGetStreamEntryThrowsMessageFormatException(StreamMessageImpl testMessage,
+ Class<?>
clazz) throws JMSException
+ {
+ try
+ {
+ getStreamEntryUsingTypeMethod(testMessage, clazz, new byte[0]);
+
+ fail("expected exception to be thrown");
+ }
+ catch(MessageFormatException jmsMFE)
+ {
+ //expected
+ }
+ }
+
+ private void
assertGetStreamEntryThrowsNumberFormatException(StreamMessageImpl testMessage,
+ Class<?>
clazz) throws JMSException
+ {
+ assertGetStreamEntryThrowsNumberFormatException(testMessage, clazz,
null);
+ }
+
+ private void
assertGetStreamEntryThrowsNumberFormatException(StreamMessageImpl testMessage,
+ Class<?>
clazz,
+ byte[]
destination) throws JMSException
+ {
+ if(clazz == byte[].class && destination == null)
+ {
+ throw new IllegalArgumentException("Destinatinon byte[] must be
supplied");
+ }
+
+ try
+ {
+ getStreamEntryUsingTypeMethod(testMessage, clazz, destination);
+
+ fail("expected exception to be thrown");
+ }
+ catch(NumberFormatException nfe)
+ {
+ //expected
+ }
+ }
+
+ private Object getStreamEntryUsingTypeMethod(StreamMessageImpl
testMessage, Class<?> clazz, byte[] destination) throws JMSException
+ {
+ if(clazz == Boolean.class)
+ {
+ return testMessage.readBoolean();
+ }
+ else if(clazz == Byte.class)
+ {
+ return testMessage.readByte();
+ }
+ else if(clazz == Character.class)
+ {
+ return testMessage.readChar();
+ }
+ else if(clazz == Short.class)
+ {
+ return testMessage.readShort();
+ }
+ else if(clazz == Integer.class)
+ {
+ return testMessage.readInt();
+ }
+ else if(clazz == Long.class)
+ {
+ return testMessage.readLong();
+ }
+ else if(clazz == Float.class)
+ {
+ return testMessage.readFloat();
+ }
+ else if(clazz == Double.class)
+ {
+ return testMessage.readDouble();
+ }
+ else if(clazz == String.class)
+ {
+ return testMessage.readString();
+ }
+ else if(clazz == byte[].class)
+ {
+ return testMessage.readBytes(destination);
+ }
+ else
+ {
+ throw new RuntimeException("Unexpected entry type class");
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]