add brokerless integration test of rollback on a transacted session with a consumer
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/51a27a4a Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/51a27a4a Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/51a27a4a Branch: refs/heads/master Commit: 51a27a4ab993b7956fe3ca103e7a2f1a01491639 Parents: 86e51a8 Author: Robert Gemmell <[email protected]> Authored: Mon Nov 17 17:12:21 2014 +0000 Committer: Robert Gemmell <[email protected]> Committed: Mon Nov 17 17:12:21 2014 +0000 ---------------------------------------------------------------------- .../jms/integration/SessionIntegrationTest.java | 72 ++++++++++++++++++++ 1 file changed, 72 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/51a27a4a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index 58f1fa2..ed97272 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -48,6 +48,7 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; import org.apache.qpid.jms.test.testpeer.describedtypes.Declare; import org.apache.qpid.jms.test.testpeer.describedtypes.Declared; import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge; +import org.apache.qpid.jms.test.testpeer.describedtypes.Modified; import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher; import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher; @@ -396,4 +397,75 @@ public class SessionIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(1000); } } + + @Test(timeout=5000) + public void testRollbackTransactedSessionWithConsumerReceivingAllMessages() throws Exception { + doRollbackTransactedSessionWithConsumerTestImpl(1, 1); + } + + @Test(timeout=5000) + public void testRollbackTransactedSessionWithConsumerReceivingSomeMessages() throws Exception { + doRollbackTransactedSessionWithConsumerTestImpl(5, 2); + } + + private void doRollbackTransactedSessionWithConsumerTestImpl(int transferCount, int consumeCount) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(true); + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), transferCount); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); + declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); + testPeer.expectTransfer(declareMatcher, false, new Declared().setTxnId(txnId), true); + + for (int i = 1; i <= consumeCount; i++) { + // Then expect a TransactionalState disposition for each message once received by the consumer + TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(new DescriptorMatcher(Accepted.DESCRIPTOR_CODE, Accepted.DESCRIPTOR_SYMBOL)); + testPeer.expectDisposition(false, stateMatcher); + } + + MessageConsumer messageConsumer = session.createConsumer(queue); + + for (int i = 1; i <= consumeCount; i++) { + Message receivedMessage = messageConsumer.receive(1000); + + assertNotNull(receivedMessage); + assertTrue(receivedMessage instanceof TextMessage); + } + + // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, + // and reply with accepted and settled disposition to indicate the rollback succeeded + Discharge discharge = new Discharge(); + discharge.setFail(true); + discharge.setTxnId(txnId); + TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); + dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); + testPeer.expectTransfer(dischargeMatcher, false, new Accepted(), true); + + for (int i = 1; i <= consumeCount; i++) { + // Then expect a 'Modified' disposition for each message received by the consumer + testPeer.expectDisposition(true, new DescriptorMatcher(Modified.DESCRIPTOR_CODE, Modified.DESCRIPTOR_SYMBOL)); + } + + session.rollback(); + + //TODO: what about messages not received by the consumer? + + testPeer.waitForAllHandlersToComplete(1000); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
