Repository: qpid-jms
Updated Branches:
  refs/heads/master 84045ed6d -> 2931fbb13


add basic StreamMessage integration test


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/68f8edc8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/68f8edc8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/68f8edc8

Branch: refs/heads/master
Commit: 68f8edc8f65c0bc90ef3ba70b65af626923241b5
Parents: 84045ed
Author: Robert Gemmell <[email protected]>
Authored: Fri Oct 3 09:41:23 2014 +0100
Committer: Robert Gemmell <[email protected]>
Committed: Fri Oct 3 09:41:23 2014 +0100

----------------------------------------------------------------------
 .../StreamMessageIntegrationTest.java           | 210 +++++++++++++++++++
 1 file changed, 210 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/68f8edc8/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
new file mode 100644
index 0000000..a9e860a
--- /dev/null
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+
+import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import 
org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import 
org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+public class StreamMessageIntegrationTest extends QpidJmsTestCase {
+    private final IntegrationTestFixture testFixture = new 
IntegrationTestFixture();
+
+    /**
+     * Test that a message received from the test peer with an AmqpValue 
section containing
+     * a list which holds entries of the various supported entry types is 
returned as a
+     * {@link StreamMessage}, and verify the values can all be retrieved as 
expected.
+     */
+    @Test(timeout = 5000)
+    public void testReceiveBasicMapMessage() throws Exception {
+        try (TestAmqpPeer testPeer = new 
TestAmqpPeer(IntegrationTestFixture.PORT);) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin(true);
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            //Prepare an AMQP message for the test peer to send, containing an
+            //AmqpValue section holding a list with entries for each supported 
type,
+            //and annotated as a JMS stream message.
+            boolean myBool = true;
+            byte myByte = Byte.MAX_VALUE;
+            char myChar = 'c';
+            double myDouble = 1234567890123456789.1234;
+            float myFloat = 1.1F;
+            int myInt = Integer.MAX_VALUE;
+            long myLong = Long.MAX_VALUE;
+            short myShort = Short.MAX_VALUE;
+            String myString = "myString";
+            byte[] myBytes = "myBytes".getBytes();
+
+            List<Object> list = new ArrayList<Object>();
+            list.add(myBool);
+            list.add(myByte);
+            list.add(new Binary(myBytes));//the underlying AMQP message uses 
Binary rather than byte[] directly.
+            list.add(myChar);
+            list.add(myDouble);
+            list.add(myFloat);
+            list.add(myInt);
+            list.add(myLong);
+            list.add(myShort);
+            list.add(myString);
+
+            MessageAnnotationsDescribedType msgAnnotations = new 
MessageAnnotationsDescribedType();
+            
msgAnnotations.setSymbolKeyedAnnotation(AmqpMessageSupport.JMS_MSG_TYPE, 
AmqpMessageSupport.JMS_STREAM_MESSAGE);
+
+            DescribedType amqpValueSectionContent = new 
AmqpValueDescribedType(list);
+
+            //receive the message from the test peer
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, 
null, null, amqpValueSectionContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(1000);
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            //verify the content is as expected
+            assertNotNull("Message was not received", receivedMessage);
+            assertTrue("Message was not a MapMessage", receivedMessage 
instanceof StreamMessage);
+            StreamMessage receivedStreamMessage  = (StreamMessage) 
receivedMessage;
+
+            assertEquals("Unexpected boolean value", myBool, 
receivedStreamMessage.readBoolean());
+            assertEquals("Unexpected byte value", myByte, 
receivedStreamMessage.readByte());
+            byte[] readBytes = (byte[]) 
receivedStreamMessage.readObject();//using readObject to get a new byte[]
+            assertArrayEquals("Read bytes were not as expected", myBytes, 
readBytes);
+            assertEquals("Unexpected char value", myChar, 
receivedStreamMessage.readChar());
+            assertEquals("Unexpected double value", myDouble, 
receivedStreamMessage.readDouble(), 0.0);
+            assertEquals("Unexpected float value", myFloat, 
receivedStreamMessage.readFloat(), 0.0);
+            assertEquals("Unexpected int value", myInt, 
receivedStreamMessage.readInt());
+            assertEquals("Unexpected long value", myLong, 
receivedStreamMessage.readLong());
+            assertEquals("Unexpected short value", myShort, 
receivedStreamMessage.readShort());
+            assertEquals("Unexpected UTF value", myString, 
receivedStreamMessage.readString());
+        }
+    }
+
+/*
+ * TODO: decide what to do about this
+ *
+ * The test below fails if a char is added, because the DataImpl-based decoder 
used by the test peer
+ * decodes the char to an Integer object and thus the EncodedAmqpValueMatcher 
fails the comparison
+ * of its contained map due to the differing types. This doesn't happen in the 
above test as the
+ * reversed roles mean it is DecoderImpl doing the decoding and it casts the 
output to a char.
+ *
+ * The below test has a hack to 'expect an int' to work round this currently.
+
+    /**
+     * Test that sending a stream message to the test peer results in receipt 
of a message with
+     * an AmqpValue section containing a list which holds entries of the 
various supported entry
+     * types with the expected values.
+     */
+    @Test(timeout = 5000)
+    public void testSendBasicMapMessage() throws Exception {
+        try (TestAmqpPeer testPeer = new 
TestAmqpPeer(IntegrationTestFixture.PORT);) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin(true);
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+
+            boolean myBool = true;
+            byte myByte = Byte.MAX_VALUE;
+            char myChar = 'c';
+            double myDouble = 1234567890123456789.1234;
+            float myFloat = 1.1F;
+            int myInt = Integer.MAX_VALUE;
+            long myLong = Long.MAX_VALUE;
+            short myShort = Short.MAX_VALUE;
+            String myString = "myString";
+            byte[] myBytes = "myBytes".getBytes();
+
+            //Prepare a MapMessage to send to the test peer to send
+            StreamMessage streamMessage = session.createStreamMessage();
+
+            streamMessage.writeBoolean(myBool);
+            streamMessage.writeByte(myByte);
+            streamMessage.writeBytes(myBytes);
+            streamMessage.writeChar(myChar);
+            streamMessage.writeDouble(myDouble);
+            streamMessage.writeFloat(myFloat);
+            streamMessage.writeInt(myInt);
+            streamMessage.writeLong(myLong);
+            streamMessage.writeShort(myShort);
+            streamMessage.writeString(myString);
+
+            //prepare a matcher for the test peer to use to receive and verify 
the message
+            List<Object> list = new ArrayList<Object>();
+            list.add(myBool);
+            list.add(myByte);
+            list.add(new Binary(myBytes));//the underlying AMQP message uses 
Binary rather than byte[] directly.
+            list.add((int) myChar);//TODO: see note above about chars
+            list.add(myDouble);
+            list.add(myFloat);
+            list.add(myInt);
+            list.add(myLong);
+            list.add(myShort);
+            list.add(myString);
+
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            
msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpMessageSupport.JMS_MSG_TYPE),
 equalTo(AmqpMessageSupport.JMS_STREAM_MESSAGE));
+            MessagePropertiesSectionMatcher propertiesMatcher = new 
MessagePropertiesSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(propertiesMatcher);
+            messageMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(list));
+
+            //send the message
+            testPeer.expectTransfer(messageMatcher);
+            producer.send(streamMessage);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to