add brokerless integration test of rollback on a transacted session with a 
consumer


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/51a27a4a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/51a27a4a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/51a27a4a

Branch: refs/heads/master
Commit: 51a27a4ab993b7956fe3ca103e7a2f1a01491639
Parents: 86e51a8
Author: Robert Gemmell <[email protected]>
Authored: Mon Nov 17 17:12:21 2014 +0000
Committer: Robert Gemmell <[email protected]>
Committed: Mon Nov 17 17:12:21 2014 +0000

----------------------------------------------------------------------
 .../jms/integration/SessionIntegrationTest.java | 72 ++++++++++++++++++++
 1 file changed, 72 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/51a27a4a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 58f1fa2..ed97272 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -48,6 +48,7 @@ import 
org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Modified;
 import 
org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
@@ -396,4 +397,75 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test(timeout=5000)
+    public void 
testRollbackTransactedSessionWithConsumerReceivingAllMessages() throws 
Exception {
+        doRollbackTransactedSessionWithConsumerTestImpl(1, 1);
+    }
+
+    @Test(timeout=5000)
+    public void 
testRollbackTransactedSessionWithConsumerReceivingSomeMessages() throws 
Exception {
+        doRollbackTransactedSessionWithConsumerTestImpl(5, 2);
+    }
+
+    private void doRollbackTransactedSessionWithConsumerTestImpl(int 
transferCount, int consumeCount) throws Exception {
+        try (TestAmqpPeer testPeer = new 
TestAmqpPeer(IntegrationTestFixture.PORT);) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin(true);
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), transferCount);
+
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, false, new 
Declared().setTxnId(txnId), true);
+
+            for (int i = 1; i <= consumeCount; i++) {
+                // Then expect a TransactionalState disposition for each 
message once received by the consumer
+                TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
+                stateMatcher.withTxnId(equalTo(txnId));
+                stateMatcher.withOutcome(new 
DescriptorMatcher(Accepted.DESCRIPTOR_CODE, Accepted.DESCRIPTOR_SYMBOL));
+                testPeer.expectDisposition(false, stateMatcher);
+            }
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+
+            for (int i = 1; i <= consumeCount; i++) {
+                Message receivedMessage = messageConsumer.receive(1000);
+
+                assertNotNull(receivedMessage);
+                assertTrue(receivedMessage instanceof TextMessage);
+            }
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
+            // and reply with accepted and settled disposition to indicate the 
rollback succeeded
+            Discharge discharge = new Discharge();
+            discharge.setFail(true);
+            discharge.setTxnId(txnId);
+            TransferPayloadCompositeMatcher dischargeMatcher = new 
TransferPayloadCompositeMatcher();
+            dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
+            testPeer.expectTransfer(dischargeMatcher, false, new Accepted(), 
true);
+
+            for (int i = 1; i <= consumeCount; i++) {
+                // Then expect a 'Modified' disposition for each message 
received by the consumer
+                testPeer.expectDisposition(true, new 
DescriptorMatcher(Modified.DESCRIPTOR_CODE, Modified.DESCRIPTOR_SYMBOL));
+            }
+
+            session.rollback();
+
+            //TODO: what about messages not received by the consumer?
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to