Repository: qpid-jms Updated Branches: refs/heads/master e7692d9d0 -> 84c95aa1e
QPIDJMS-239: add test using the test peer Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/84c95aa1 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/84c95aa1 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/84c95aa1 Branch: refs/heads/master Commit: 84c95aa1e1f5f9b43e32b490dcc1091b343cde3e Parents: e7692d9 Author: Robert Gemmell <[email protected]> Authored: Thu Jan 5 15:43:48 2017 +0000 Committer: Robert Gemmell <[email protected]> Committed: Thu Jan 5 16:48:09 2017 +0000 ---------------------------------------------------------------------- .../integration/ConsumerIntegrationTest.java | 96 ++++++++++++++++++++ .../qpid/jms/test/testpeer/AmqpDataFramer.java | 15 ++- .../jms/test/testpeer/TestAmqpPeerRunner.java | 2 +- 3 files changed, 108 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/84c95aa1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java index 6c667ec..59eec0a 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java @@ -20,12 +20,14 @@ package org.apache.qpid.jms.integration; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -61,12 +63,17 @@ import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; +import org.apache.qpid.jms.util.QpidJMSTestRunner; +import org.apache.qpid.jms.util.Repeat; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.hamcrest.Matchers; import org.junit.Test; +import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(QpidJMSTestRunner.class) public class ConsumerIntegrationTest extends QpidJmsTestCase { private static final Logger LOG = LoggerFactory.getLogger(ConsumerIntegrationTest.class); @@ -1066,4 +1073,93 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(2000); } } + + @Repeat(repetitions = 1) + @Test(timeout=20000) + public void testRecoverOrderingWithAsyncConsumer() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>(null); + + final int recoverCount = 5; + final int messageCount = 8; + final int testPayloadLength = 255; // Don't go over 255, Proton <= 0.16.0 issue affecting the test[ only]. + String payload = new String(new byte[testPayloadLength], StandardCharsets.UTF_8); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + + final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue destination = session.createQueue(getTestName()); + connection.start(); + + testPeer.expectReceiverAttach(); + + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType(payload), + messageCount, false, false, Matchers.greaterThan(UnsignedInteger.valueOf(messageCount)), 1, true); + + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + boolean complete; + private int messageSeen = 0; + private int expectedIndex = 0; + + @Override + public void onMessage(Message message) { + if (complete) { + LOG.debug("Ignoring message as test already completed (either pass or fail)"); + return; + } + + try { + int actualIndex = message.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER); + LOG.debug("Got message {}", actualIndex); + assertEquals("Received Message Out Of Order", expectedIndex, actualIndex); + + // don't ack the message until we receive it X times + if (messageSeen < recoverCount) { + LOG.debug("Ignoring message " + actualIndex + " and calling recover"); + session.recover(); + messageSeen++; + } else { + messageSeen = 0; + expectedIndex++; + + // Have the peer expect the accept the disposition (1-based, hence pre-incremented). + testPeer.expectDisposition(true, new AcceptedMatcher(), expectedIndex, expectedIndex); + + LOG.debug("Acknowledging message {}", actualIndex); + message.acknowledge(); + + //testPeer.waitForAllHandlersToComplete(2000); + + if (expectedIndex == messageCount) { + complete = true; + latch.countDown(); + } + } + } catch (Throwable t) { + complete = true; + asyncError.set(t); + latch.countDown(); + } + } + }); + + boolean await = latch.await(15, TimeUnit.SECONDS); + assertTrue("Messages not received within given timeout." + latch.getCount(), await); + + Throwable ex = asyncError.get(); + assertNull("Unexpected exception", ex); + + testPeer.waitForAllHandlersToComplete(2000); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(2000); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/84c95aa1/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java index a2ab389..43b7965 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AmqpDataFramer.java @@ -30,23 +30,29 @@ import org.apache.qpid.proton.codec.Data; */ public class AmqpDataFramer { - private static final int CAPACITY = 2024; + private static final int INITIAL_CAPACITY = 2048; private static final byte FRAME_PREAMBLE_SIZE_IN_FOUR_BYTE_WORDS = 2; + private static final int FRAME_HEADER_SIZE = 8; public static byte[] encodeFrame(FrameType type, int channel, DescribedType describedType, Binary payload) { - ByteBuffer buffer = ByteBuffer.allocate(CAPACITY); //TODO: set a proper size + ByteBuffer buffer = ByteBuffer.allocate(INITIAL_CAPACITY); - buffer.position(8); // leave hole for frame header + buffer.position(FRAME_HEADER_SIZE); // leave hole for frame header if (describedType != null) { Data frameBody = Data.Factory.create(); frameBody.putDescribedType(describedType); - frameBody.encode(buffer); + + long encodedLength = frameBody.encode(buffer); + if(encodedLength > buffer.capacity() - FRAME_HEADER_SIZE) { + throw new IllegalStateException("Performative encoding exceeded buffer size"); + } } if(payload != null) { + //TODO grow buffer if needed rather than throw BOE buffer.put(payload.asByteBuffer()); } @@ -63,4 +69,5 @@ public class AmqpDataFramer buffer.get(target, 0, frameSize); return target; } + } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/84c95aa1/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java index 38005b0..0f01256 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java @@ -206,7 +206,7 @@ class TestAmqpPeerRunner implements Runnable public void sendBytes(byte[] bytes) { - LOGGER.debug("Sending: {}", new Binary(bytes)); + LOGGER.debug("Sending: {} ({} bytes)", new Binary(bytes), bytes.length); try { _networkOutputStream.write(bytes); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
