Repository: qpid-jms Updated Branches: refs/heads/master 7a4169f9f -> 7666819e2
NO-JIRA: add some support to test peer for multi-frame messages, add some receiving tests Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/7666819e Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/7666819e Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/7666819e Branch: refs/heads/master Commit: 7666819e286b0133af6cf880d0d720dca88f3eb4 Parents: 7a4169f Author: Robbie Gemmell <rob...@apache.org> Authored: Wed Jun 20 18:04:19 2018 +0100 Committer: Robbie Gemmell <rob...@apache.org> Committed: Wed Jun 20 18:04:19 2018 +0100 ---------------------------------------------------------------------- .../BytesMessageIntegrationTest.java | 5 + ...ultiTransferFrameMessageIntegrationTest.java | 128 +++++++++++++++++++ .../qpid/jms/test/testpeer/AmqpDataFramer.java | 12 +- .../qpid/jms/test/testpeer/TestAmqpPeer.java | 119 ++++++++++++++--- .../jms/test/testpeer/TestAmqpPeerRunner.java | 9 +- 5 files changed, 253 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7666819e/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java index 81c1b60..3ba96c0 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java @@ -122,6 +122,11 @@ public class BytesMessageIntegrationTest extends QpidJmsTestCase { doReceiveBasicBytesMessageUsingDataSectionTestImpl("type/unknown", false); } + @Test(timeout = 20000) + public void testReceiveBasicBytesMessageUsingDataSectionWithContentTypeNotSetNoTypeAnnotation() throws Exception { + doReceiveBasicBytesMessageUsingDataSectionTestImpl(null, false); + } + private void doReceiveBasicBytesMessageUsingDataSectionTestImpl(String contentType, boolean typeAnnotation) throws JMSException, InterruptedException, Exception, IOException { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7666819e/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MultiTransferFrameMessageIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MultiTransferFrameMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MultiTransferFrameMessageIntegrationTest.java new file mode 100644 index 0000000..c53e14f --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MultiTransferFrameMessageIntegrationTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.jms.integration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport; +import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; +import org.apache.qpid.jms.test.testpeer.describedtypes.sections.DataDescribedType; +import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType; +import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.hamcrest.Matchers; +import org.junit.Test; + +public class MultiTransferFrameMessageIntegrationTest extends QpidJmsTestCase { + private final IntegrationTestFixture testFixture = new IntegrationTestFixture(); + + @Test(timeout = 20000) + public void testReceiveMultiFrameBytesMessage() throws Exception { + doReceiveMultiFrameBytesMessageTestImpl(false); + } + + @Test(timeout = 20000) + public void testReceiveMultiFrameBytesMessageWithEmptyFinalTransfer() throws Exception { + doReceiveMultiFrameBytesMessageTestImpl(true); + } + + private void doReceiveMultiFrameBytesMessageTestImpl(boolean sendFinalTransferFrameWithoutPayload) throws JMSException, InterruptedException, Exception, IOException { + int payloadSizeInBytes = 20_123_321; + int msgPayloadPerFrame = 101_234; + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0"); + connection.start(); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + PropertiesDescribedType properties = new PropertiesDescribedType(); + properties.setContentType(Symbol.valueOf(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE)); + + MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType(); + msgAnnotations.setSymbolKeyedAnnotation(AmqpMessageSupport.JMS_MSG_TYPE, AmqpMessageSupport.JMS_BYTES_MESSAGE); + + Random rand = new Random(System.currentTimeMillis()); + int payloadStartPoint = rand.nextInt(6); + + final byte[] expectedContent = createMessageBodyContent(payloadSizeInBytes, payloadStartPoint); + DescribedType dataContent = new DataDescribedType(new Binary(expectedContent)); + + testPeer.expectReceiverAttach(); + + testPeer.expectLinkFlowAndSendBackMessages(null, msgAnnotations, properties, null, dataContent, 1, + true, false, Matchers.equalTo(UnsignedInteger.valueOf(1)), 1, + false, false, msgPayloadPerFrame, sendFinalTransferFrameWithoutPayload); + + testPeer.expectDispositionThatIsAcceptedAndSettled(); + + MessageConsumer messageConsumer = session.createConsumer(queue); + Message receivedMessage = messageConsumer.receiveNoWait(); + testPeer.waitForAllHandlersToComplete(3000); + + assertNotNull(receivedMessage); + assertTrue(receivedMessage instanceof BytesMessage); + BytesMessage bytesMessage = (BytesMessage) receivedMessage; + assertEquals("Unexpected message body length", expectedContent.length, bytesMessage.getBodyLength()); + + byte[] receivedContent = new byte[expectedContent.length]; + int readBytes = bytesMessage.readBytes(receivedContent); + + assertEquals("Unexpected content length read", receivedContent.length, readBytes); + assertTrue("Unexpected content", Arrays.equals(expectedContent, receivedContent)); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(3000); + } + } + + private static byte[] createMessageBodyContent(int sizeInBytes, int startPoint) { + byte[] payload = new byte[sizeInBytes]; + for (int i = 0; i < sizeInBytes; i++) { + // An odd number of digit characters + int offset = (startPoint + i) % 7; + payload[i] = (byte) (48 + offset); + } + + return payload; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7666819e/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java index 43b7965..d49a230 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java @@ -52,8 +52,16 @@ public class AmqpDataFramer if(payload != null) { - //TODO grow buffer if needed rather than throw BOE - buffer.put(payload.asByteBuffer()); + ByteBuffer framePayload = payload.asByteBuffer(); + + if(framePayload.remaining() > buffer.remaining()) { + ByteBuffer oldBuffer = buffer; + buffer = ByteBuffer.allocate(oldBuffer.position() + framePayload.remaining()); + oldBuffer.flip(); + buffer.put(oldBuffer); + } + + buffer.put(framePayload); } int frameSize = buffer.position(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7666819e/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index ed28c82..a84406e 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -1795,6 +1795,26 @@ public class TestAmqpPeer implements AutoCloseable final boolean sendSettled, boolean addMessageNumberProperty) { + expectLinkFlowAndSendBackMessages(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, + appPropertiesDescribedType, content, count, drain, sendDrainFlowResponse, + creditMatcher, nextIncomingId, sendSettled, addMessageNumberProperty, 0, false); + } + + public void expectLinkFlowAndSendBackMessages(final HeaderDescribedType headerDescribedType, + final MessageAnnotationsDescribedType messageAnnotationsDescribedType, + final PropertiesDescribedType propertiesDescribedType, + ApplicationPropertiesDescribedType appPropertiesDescribedType, + final DescribedType content, + final int count, + final boolean drain, + final boolean sendDrainFlowResponse, + Matcher<UnsignedInteger> creditMatcher, + final Integer nextIncomingId, + final boolean sendSettled, + boolean addMessageNumberProperty, + int msgPayloadPerFrame, + boolean sendFinalTransferFrameWithoutPayload) + { if (nextIncomingId == null && count > 0) { throw new IllegalArgumentException("The remote NextIncomingId must be specified if transfers have been requested"); @@ -1843,29 +1863,56 @@ public class TestAmqpPeer implements AutoCloseable appPropertiesDescribedType.setApplicationProperty(MESSAGE_NUMBER, i); } - final TransferFrame transferResponse = new TransferFrame() - .setDeliveryId(UnsignedInteger.valueOf(nextId)) - .setDeliveryTag(dtag) - .setMessageFormat(UnsignedInteger.ZERO) - .setSettled(sendSettled); Binary payload = prepareTransferPayload(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content); - // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. - final FrameSender transferResponseSender = new FrameSender(this, FrameType.AMQP, -1, transferResponse, payload); - transferResponseSender.setValueProvider(new ValueProvider() - { - @Override - public void setValues() - { - transferResponse.setHandle(flowMatcher.getReceivedHandle()); - transferResponseSender.setChannel(flowMatcher.getActualChannel()); + int length = payload.getLength(); + int sent = 0; + + while (sent < length) { + final TransferFrame transferFrame = new TransferFrame() + .setDeliveryId(UnsignedInteger.valueOf(nextId)) + .setDeliveryTag(dtag) + .setMessageFormat(UnsignedInteger.ZERO) + .setSettled(sendSettled); + + int remaining = length - sent; + Binary chunk; + if(msgPayloadPerFrame != 0 && msgPayloadPerFrame < length) { + int chunkSize = Math.min(msgPayloadPerFrame, remaining); + chunk = payload.subBinary(sent, chunkSize); + sent += chunkSize; + } else { + chunk = payload; + sent = length; + } + + if(sent < length || (sent == length && sendFinalTransferFrameWithoutPayload)) { + // Indicate more frames if there is payload left, or we want to send a final transfer without payload + transferFrame.setMore(true); } - }); + + // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. + final FrameSender transferResponseSender = new FrameSender(this, FrameType.AMQP, -1, transferFrame, chunk); + transferResponseSender.setValueProvider(new ValueProvider() + { + @Override + public void setValues() + { + transferFrame.setHandle(flowMatcher.getReceivedHandle()); + transferResponseSender.setChannel(flowMatcher.getActualChannel()); + } + }); + + composite.add(transferResponseSender); + } + + if(sendFinalTransferFrameWithoutPayload) { + sendEmptyFinalTransfer(composite, flowMatcher, nextId, dtag, sendSettled); + } addComposite = true; - composite.add(transferResponseSender); } if(drain && sendDrainFlowResponse) @@ -1902,6 +1949,28 @@ public class TestAmqpPeer implements AutoCloseable addHandler(flowMatcher); } + private void sendEmptyFinalTransfer(CompositeAmqpPeerRunnable composite, final FlowMatcher flowMatcher, final int deliveryId, Binary dTag, final boolean settled) { + final TransferFrame transferFrame = new TransferFrame() + .setDeliveryId(UnsignedInteger.valueOf(deliveryId)) + .setDeliveryTag(dTag) + .setMessageFormat(UnsignedInteger.ZERO) + .setSettled(settled); + + // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. + final FrameSender finalEmptyTransferSender = new FrameSender(this, FrameType.AMQP, -1, transferFrame, null); + finalEmptyTransferSender.setValueProvider(new ValueProvider() + { + @Override + public void setValues() + { + transferFrame.setHandle(flowMatcher.getReceivedHandle()); + finalEmptyTransferSender.setChannel(flowMatcher.getActualChannel()); + } + }); + + composite.add(finalEmptyTransferSender); + } + public void expectLinkFlowThenPerformUnexpectedDeliveryCountAdvanceThenCreditTopupThenTransfers(final int prefetch, final int topUp, final int messageCount) { final FlowMatcher flowMatcher = new FlowMatcher() @@ -2017,6 +2086,9 @@ public class TestAmqpPeer implements AutoCloseable return nid.add(UnsignedInteger.valueOf(sentCount)); } + /** + * Encodes and returns transfer payload Binary, or null if no message sections were supplied. + */ private Binary prepareTransferPayload(final HeaderDescribedType headerDescribedType, final MessageAnnotationsDescribedType messageAnnotationsDescribedType, final PropertiesDescribedType propertiesDescribedType, @@ -2024,33 +2096,46 @@ public class TestAmqpPeer implements AutoCloseable final DescribedType content) { Data payloadData = Data.Factory.create(); + boolean hasSection = false; if(headerDescribedType != null) { + hasSection = true; payloadData.putDescribedType(headerDescribedType); } if(messageAnnotationsDescribedType != null) { + hasSection = true; payloadData.putDescribedType(messageAnnotationsDescribedType); } if(propertiesDescribedType != null) { + hasSection = true; payloadData.putDescribedType(propertiesDescribedType); } if(appPropertiesDescribedType != null) { + hasSection = true; payloadData.putDescribedType(appPropertiesDescribedType); } if(content != null) { + hasSection = true; payloadData.putDescribedType(content); } - return payloadData.encode(); + if (hasSection) + { + return payloadData.encode(); + } + else + { + return null; + } } public void expectTransferButDoNotRespond(Matcher<Binary> expectedPayloadMatcher) http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7666819e/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java index 393e70a..a7dba31 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java @@ -40,6 +40,7 @@ class TestAmqpPeerRunner implements Runnable private static final Logger LOGGER = LoggerFactory.getLogger(TestAmqpPeerRunner.class); private static final int PORT = 25672; + private static final int TRACE_FRAME_PAYLOAD_LENGTH = Integer.getInteger("testPeerSendingTraceFramePayloadLength", 1024); private final ServerSocket _serverSocket; private final boolean useFixedPort = Boolean.getBoolean("testPeerUsesFixedPort"); @@ -207,7 +208,13 @@ class TestAmqpPeerRunner implements Runnable public void sendBytes(byte[] bytes) { - LOGGER.debug("Sending: {} ({} bytes)", new Binary(bytes), bytes.length); + if(bytes.length > TRACE_FRAME_PAYLOAD_LENGTH) { + Binary print = new Binary(bytes, 0, TRACE_FRAME_PAYLOAD_LENGTH); + LOGGER.debug("Sending: {}...(truncated) ({} bytes)", print, bytes.length); + } else { + LOGGER.debug("Sending: {} ({} bytes)", new Binary(bytes), bytes.length); + } + try { _networkOutputStream.write(bytes); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org