Repository: qpid-jms Updated Branches: refs/heads/master e343dd054 -> f3b9cd01f
QPIDJMS-264 Adds a couple tests for order of dispatch in a TX Adds tests that validate if messages are dispatched to the consumer in order while consuming under a TX. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f3b9cd01 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f3b9cd01 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f3b9cd01 Branch: refs/heads/master Commit: f3b9cd01fc19efb4e50a8b3a187c88019becee9f Parents: e343dd0 Author: Timothy Bish <[email protected]> Authored: Wed Feb 15 18:38:07 2017 -0500 Committer: Timothy Bish <[email protected]> Committed: Wed Feb 15 18:38:12 2017 -0500 ---------------------------------------------------------------------- .../TransactionsIntegrationTest.java | 68 ++++++++++++++++++++ .../transactions/JmsTransactedConsumerTest.java | 35 +++++++++- 2 files changed, 102 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f3b9cd01/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java index bd8d997..867d2d9 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java @@ -23,11 +23,14 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; + import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -44,6 +47,8 @@ import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; +import org.apache.qpid.jms.test.testpeer.describedtypes.Declare; +import org.apache.qpid.jms.test.testpeer.describedtypes.Declared; import org.apache.qpid.jms.test.testpeer.describedtypes.Error; import org.apache.qpid.jms.test.testpeer.describedtypes.Modified; import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected; @@ -58,17 +63,22 @@ import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; +import org.apache.qpid.jms.util.QpidJMSTestRunner; +import org.apache.qpid.jms.util.Repeat; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.junit.Test; +import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Tests for behavior of Transacted Session operations. */ +@RunWith(QpidJMSTestRunner.class) public class TransactionsIntegrationTest extends QpidJmsTestCase { private static final Logger LOG = LoggerFactory.getLogger(TransactionsIntegrationTest.class); @@ -1372,4 +1382,62 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(1000); } } + + @Test(timeout=30000) + @Repeat(repetitions = 1) + public void testConsumerMessageOrderOnTransactedSession() throws IOException, Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + final int messageCount = 10; + + testPeer.expectBegin(); + testPeer.expectCoordinatorAttach(); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); + + // Expect the browser enumeration to create a underlying consumer + testPeer.expectReceiverAttach(); + // Expect initial credit to be sent, respond with some messages that are tagged with + // a sequence number we can use to determine if order is maintained. + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, + messageCount, false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH)), 1, false, true); + + for (int i = 1; i <= messageCount; i++) { + // Then expect an *settled* TransactionalState disposition for each message once received by the consumer + TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(new AcceptedMatcher()); + + testPeer.expectDisposition(true, stateMatcher); + } + + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < messageCount; ++i) { + Message message = consumer.receive(500); + assertNotNull(message); + assertEquals(i, message.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER)); + } + + // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, + // and reply with accepted and settled disposition to indicate the rollback succeeded + testPeer.expectDischarge(txnId, true); + testPeer.expectEnd(); + + session.close(); + + testPeer.waitForAllHandlersToComplete(3000); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f3b9cd01/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java index fff403b..cde7656 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java @@ -535,5 +535,38 @@ public class JmsTransactedConsumerTest extends AmqpTestSupport { connection.close(); } -} + @Test(timeout = 90000) + public void testConsumerMessagesInOrder() throws Exception { + + for (int i = 0; i < 5; ++i) { + + connection = createAmqpConnection(); + connection.start(); + + final int MESSAGE_COUNT = 20; + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(name.getMethodName()); + + sendToAmqQueue(MESSAGE_COUNT); + + MessageConsumer consumer = session.createConsumer(queue); + + for (int j = 0; j < MESSAGE_COUNT; ++j) { + Message message = consumer.receive(5000); + assertNotNull(message); + assertEquals(j + 1, message.getIntProperty(MESSAGE_NUMBER)); + } + + session.close(); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + proxy.purge(); + + assertEquals(0, proxy.getQueueSize()); + + consumer.close(); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
