Repository: qpid-jms
Updated Branches:
  refs/heads/master 7a4169f9f -> 7666819e2


NO-JIRA: add some support to test peer for multi-frame messages, add some 
receiving tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/7666819e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/7666819e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/7666819e

Branch: refs/heads/master
Commit: 7666819e286b0133af6cf880d0d720dca88f3eb4
Parents: 7a4169f
Author: Robbie Gemmell <rob...@apache.org>
Authored: Wed Jun 20 18:04:19 2018 +0100
Committer: Robbie Gemmell <rob...@apache.org>
Committed: Wed Jun 20 18:04:19 2018 +0100

----------------------------------------------------------------------
 .../BytesMessageIntegrationTest.java            |   5 +
 ...ultiTransferFrameMessageIntegrationTest.java | 128 +++++++++++++++++++
 .../qpid/jms/test/testpeer/AmqpDataFramer.java  |  12 +-
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 119 ++++++++++++++---
 .../jms/test/testpeer/TestAmqpPeerRunner.java   |   9 +-
 5 files changed, 253 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7666819e/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
index 81c1b60..3ba96c0 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
@@ -122,6 +122,11 @@ public class BytesMessageIntegrationTest extends 
QpidJmsTestCase {
         doReceiveBasicBytesMessageUsingDataSectionTestImpl("type/unknown", 
false);
     }
 
+    @Test(timeout = 20000)
+    public void 
testReceiveBasicBytesMessageUsingDataSectionWithContentTypeNotSetNoTypeAnnotation()
 throws Exception {
+        doReceiveBasicBytesMessageUsingDataSectionTestImpl(null, false);
+    }
+
     private void doReceiveBasicBytesMessageUsingDataSectionTestImpl(String 
contentType, boolean typeAnnotation) throws JMSException, InterruptedException, 
Exception, IOException {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7666819e/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MultiTransferFrameMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MultiTransferFrameMessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MultiTransferFrameMessageIntegrationTest.java
new file mode 100644
index 0000000..c53e14f
--- /dev/null
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MultiTransferFrameMessageIntegrationTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import 
org.apache.qpid.jms.test.testpeer.describedtypes.sections.DataDescribedType;
+import 
org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
+import 
org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+public class MultiTransferFrameMessageIntegrationTest extends QpidJmsTestCase {
+    private final IntegrationTestFixture testFixture = new 
IntegrationTestFixture();
+
+    @Test(timeout = 20000)
+    public void testReceiveMultiFrameBytesMessage() throws Exception {
+        doReceiveMultiFrameBytesMessageTestImpl(false);
+    }
+
+    @Test(timeout = 20000)
+    public void testReceiveMultiFrameBytesMessageWithEmptyFinalTransfer() 
throws Exception {
+        doReceiveMultiFrameBytesMessageTestImpl(true);
+    }
+
+    private void doReceiveMultiFrameBytesMessageTestImpl(boolean 
sendFinalTransferFrameWithoutPayload) throws JMSException, 
InterruptedException, Exception, IOException {
+        int payloadSizeInBytes = 20_123_321;
+        int msgPayloadPerFrame = 101_234;
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, 
"?jms.prefetchPolicy.all=0");
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            PropertiesDescribedType properties = new PropertiesDescribedType();
+            
properties.setContentType(Symbol.valueOf(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE));
+
+            MessageAnnotationsDescribedType msgAnnotations = new 
MessageAnnotationsDescribedType();
+            
msgAnnotations.setSymbolKeyedAnnotation(AmqpMessageSupport.JMS_MSG_TYPE, 
AmqpMessageSupport.JMS_BYTES_MESSAGE);
+
+            Random rand = new Random(System.currentTimeMillis());
+            int payloadStartPoint = rand.nextInt(6);
+
+            final byte[] expectedContent = 
createMessageBodyContent(payloadSizeInBytes, payloadStartPoint);
+            DescribedType dataContent = new DataDescribedType(new 
Binary(expectedContent));
+
+            testPeer.expectReceiverAttach();
+
+            testPeer.expectLinkFlowAndSendBackMessages(null, msgAnnotations, 
properties, null, dataContent, 1,
+                                                      true, false, 
Matchers.equalTo(UnsignedInteger.valueOf(1)), 1,
+                                                      false, false, 
msgPayloadPerFrame, sendFinalTransferFrameWithoutPayload);
+
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receiveNoWait();
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            assertNotNull(receivedMessage);
+            assertTrue(receivedMessage instanceof BytesMessage);
+            BytesMessage bytesMessage = (BytesMessage) receivedMessage;
+            assertEquals("Unexpected message body length", 
expectedContent.length, bytesMessage.getBodyLength());
+
+            byte[] receivedContent = new byte[expectedContent.length];
+            int readBytes = bytesMessage.readBytes(receivedContent);
+
+            assertEquals("Unexpected content length read", 
receivedContent.length, readBytes);
+            assertTrue("Unexpected content", Arrays.equals(expectedContent, 
receivedContent));
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    private static byte[] createMessageBodyContent(int sizeInBytes, int 
startPoint) {
+        byte[] payload = new byte[sizeInBytes];
+        for (int i = 0; i < sizeInBytes; i++) {
+            // An odd number of digit characters
+            int offset = (startPoint + i) % 7;
+            payload[i] = (byte) (48 + offset);
+        }
+
+        return payload;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7666819e/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
index 43b7965..d49a230 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java
@@ -52,8 +52,16 @@ public class AmqpDataFramer
 
         if(payload != null)
         {
-            //TODO grow buffer if needed rather than throw BOE
-            buffer.put(payload.asByteBuffer());
+            ByteBuffer framePayload = payload.asByteBuffer();
+
+            if(framePayload.remaining() > buffer.remaining()) {
+                ByteBuffer oldBuffer = buffer;
+                buffer = ByteBuffer.allocate(oldBuffer.position() + 
framePayload.remaining());
+                oldBuffer.flip();
+                buffer.put(oldBuffer);
+            }
+
+            buffer.put(framePayload);
         }
 
         int frameSize = buffer.position();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7666819e/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index ed28c82..a84406e 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1795,6 +1795,26 @@ public class TestAmqpPeer implements AutoCloseable
             final boolean sendSettled,
             boolean addMessageNumberProperty)
     {
+        expectLinkFlowAndSendBackMessages(headerDescribedType, 
messageAnnotationsDescribedType, propertiesDescribedType,
+                                                appPropertiesDescribedType, 
content, count, drain, sendDrainFlowResponse,
+                                                creditMatcher, nextIncomingId, 
sendSettled, addMessageNumberProperty, 0, false);
+    }
+
+    public void expectLinkFlowAndSendBackMessages(final HeaderDescribedType 
headerDescribedType,
+            final MessageAnnotationsDescribedType 
messageAnnotationsDescribedType,
+            final PropertiesDescribedType propertiesDescribedType,
+            ApplicationPropertiesDescribedType appPropertiesDescribedType,
+            final DescribedType content,
+            final int count,
+            final boolean drain,
+            final boolean sendDrainFlowResponse,
+            Matcher<UnsignedInteger> creditMatcher,
+            final Integer nextIncomingId,
+            final boolean sendSettled,
+            boolean addMessageNumberProperty,
+            int msgPayloadPerFrame,
+            boolean sendFinalTransferFrameWithoutPayload)
+    {
         if (nextIncomingId == null && count > 0)
         {
             throw new IllegalArgumentException("The remote NextIncomingId must 
be specified if transfers have been requested");
@@ -1843,29 +1863,56 @@ public class TestAmqpPeer implements AutoCloseable
                 
appPropertiesDescribedType.setApplicationProperty(MESSAGE_NUMBER, i);
             }
 
-            final TransferFrame transferResponse = new TransferFrame()
-            .setDeliveryId(UnsignedInteger.valueOf(nextId))
-            .setDeliveryTag(dtag)
-            .setMessageFormat(UnsignedInteger.ZERO)
-            .setSettled(sendSettled);
 
             Binary payload = prepareTransferPayload(headerDescribedType, 
messageAnnotationsDescribedType,
                     propertiesDescribedType, appPropertiesDescribedType, 
content);
 
-            // The response frame channel will be dynamically set based on the 
incoming frame. Using the -1 is an illegal placeholder.
-            final FrameSender transferResponseSender = new FrameSender(this, 
FrameType.AMQP, -1, transferResponse, payload);
-            transferResponseSender.setValueProvider(new ValueProvider()
-            {
-                @Override
-                public void setValues()
-                {
-                    
transferResponse.setHandle(flowMatcher.getReceivedHandle());
-                    
transferResponseSender.setChannel(flowMatcher.getActualChannel());
+            int length = payload.getLength();
+            int sent = 0;
+
+            while (sent < length) {
+                final TransferFrame transferFrame = new TransferFrame()
+                        .setDeliveryId(UnsignedInteger.valueOf(nextId))
+                        .setDeliveryTag(dtag)
+                        .setMessageFormat(UnsignedInteger.ZERO)
+                        .setSettled(sendSettled);
+
+                int remaining = length - sent;
+                Binary chunk;
+                if(msgPayloadPerFrame != 0 && msgPayloadPerFrame < length) {
+                    int chunkSize = Math.min(msgPayloadPerFrame, remaining);
+                    chunk = payload.subBinary(sent, chunkSize);
+                    sent += chunkSize;
+                } else {
+                    chunk = payload;
+                    sent = length;
+                }
+
+                if(sent < length || (sent == length && 
sendFinalTransferFrameWithoutPayload)) {
+                    // Indicate more frames if there is payload left, or we 
want to send a final transfer without payload
+                    transferFrame.setMore(true);
                 }
-            });
+
+                // The response frame channel will be dynamically set based on 
the incoming frame. Using the -1 is an illegal placeholder.
+                final FrameSender transferResponseSender = new 
FrameSender(this, FrameType.AMQP, -1, transferFrame, chunk);
+                transferResponseSender.setValueProvider(new ValueProvider()
+                {
+                    @Override
+                    public void setValues()
+                    {
+                        
transferFrame.setHandle(flowMatcher.getReceivedHandle());
+                        
transferResponseSender.setChannel(flowMatcher.getActualChannel());
+                    }
+                });
+
+                composite.add(transferResponseSender);
+            }
+
+            if(sendFinalTransferFrameWithoutPayload) {
+                sendEmptyFinalTransfer(composite, flowMatcher, nextId, dtag, 
sendSettled);
+            }
 
             addComposite = true;
-            composite.add(transferResponseSender);
         }
 
         if(drain && sendDrainFlowResponse)
@@ -1902,6 +1949,28 @@ public class TestAmqpPeer implements AutoCloseable
         addHandler(flowMatcher);
     }
 
+    private void sendEmptyFinalTransfer(CompositeAmqpPeerRunnable composite, 
final FlowMatcher flowMatcher, final int deliveryId, Binary dTag, final boolean 
settled) {
+        final TransferFrame transferFrame = new TransferFrame()
+                .setDeliveryId(UnsignedInteger.valueOf(deliveryId))
+                .setDeliveryTag(dTag)
+                .setMessageFormat(UnsignedInteger.ZERO)
+                .setSettled(settled);
+
+        // The response frame channel will be dynamically set based on the 
incoming frame. Using the -1 is an illegal placeholder.
+        final FrameSender finalEmptyTransferSender = new FrameSender(this, 
FrameType.AMQP, -1, transferFrame, null);
+        finalEmptyTransferSender.setValueProvider(new ValueProvider()
+        {
+            @Override
+            public void setValues()
+            {
+                transferFrame.setHandle(flowMatcher.getReceivedHandle());
+                
finalEmptyTransferSender.setChannel(flowMatcher.getActualChannel());
+            }
+        });
+
+        composite.add(finalEmptyTransferSender);
+    }
+
     public void 
expectLinkFlowThenPerformUnexpectedDeliveryCountAdvanceThenCreditTopupThenTransfers(final
 int prefetch, final int topUp, final int messageCount)
     {
         final FlowMatcher flowMatcher = new FlowMatcher()
@@ -2017,6 +2086,9 @@ public class TestAmqpPeer implements AutoCloseable
         return nid.add(UnsignedInteger.valueOf(sentCount));
     }
 
+    /**
+     * Encodes and returns transfer payload Binary, or null if no message 
sections were supplied.
+     */
     private Binary prepareTransferPayload(final HeaderDescribedType 
headerDescribedType,
                                           final 
MessageAnnotationsDescribedType messageAnnotationsDescribedType,
                                           final PropertiesDescribedType 
propertiesDescribedType,
@@ -2024,33 +2096,46 @@ public class TestAmqpPeer implements AutoCloseable
                                           final DescribedType content)
     {
         Data payloadData = Data.Factory.create();
+        boolean hasSection = false;
 
         if(headerDescribedType != null)
         {
+            hasSection = true;
             payloadData.putDescribedType(headerDescribedType);
         }
 
         if(messageAnnotationsDescribedType != null)
         {
+            hasSection = true;
             payloadData.putDescribedType(messageAnnotationsDescribedType);
         }
 
         if(propertiesDescribedType != null)
         {
+            hasSection = true;
             payloadData.putDescribedType(propertiesDescribedType);
         }
 
         if(appPropertiesDescribedType != null)
         {
+            hasSection = true;
             payloadData.putDescribedType(appPropertiesDescribedType);
         }
 
         if(content != null)
         {
+            hasSection = true;
             payloadData.putDescribedType(content);
         }
 
-        return payloadData.encode();
+        if (hasSection)
+        {
+            return payloadData.encode();
+        }
+        else
+        {
+            return null;
+        }
     }
 
     public void expectTransferButDoNotRespond(Matcher<Binary> 
expectedPayloadMatcher)

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7666819e/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
index 393e70a..a7dba31 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
@@ -40,6 +40,7 @@ class TestAmqpPeerRunner implements Runnable
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TestAmqpPeerRunner.class);
 
     private static final int PORT = 25672;
+    private static final int TRACE_FRAME_PAYLOAD_LENGTH = 
Integer.getInteger("testPeerSendingTraceFramePayloadLength", 1024);
 
     private final ServerSocket _serverSocket;
     private final boolean useFixedPort = 
Boolean.getBoolean("testPeerUsesFixedPort");
@@ -207,7 +208,13 @@ class TestAmqpPeerRunner implements Runnable
 
     public void sendBytes(byte[] bytes)
     {
-        LOGGER.debug("Sending: {} ({} bytes)", new Binary(bytes), 
bytes.length);
+        if(bytes.length > TRACE_FRAME_PAYLOAD_LENGTH) {
+            Binary print = new Binary(bytes, 0, TRACE_FRAME_PAYLOAD_LENGTH);
+            LOGGER.debug("Sending: {}...(truncated) ({} bytes)", print, 
bytes.length);
+        } else {
+            LOGGER.debug("Sending: {} ({} bytes)", new Binary(bytes), 
bytes.length);
+        }
+
         try
         {
             _networkOutputStream.write(bytes);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to