add test that only receives some of the available messages in the transaction


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f9e4a6ca
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f9e4a6ca
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f9e4a6ca

Branch: refs/heads/master
Commit: f9e4a6ca5664838596192c1563beea02924a6581
Parents: c839773
Author: Robert Gemmell <[email protected]>
Authored: Mon Nov 17 15:47:29 2014 +0000
Committer: Robert Gemmell <[email protected]>
Committed: Mon Nov 17 15:47:29 2014 +0000

----------------------------------------------------------------------
 .../jms/integration/SessionIntegrationTest.java | 35 ++++++++++++++------
 1 file changed, 25 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f9e4a6ca/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 498af7f..af03f37 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -326,7 +326,16 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
     }
 
     @Test(timeout=5000)
-    public void testCommitTransactedSessionWithConsumer() throws Exception {
+    public void testCommitTransactedSessionWithConsumerReceivingAllMessages() 
throws Exception {
+        doCommitTransactedSessionWithConsumerTestImpl(1, 1);
+    }
+
+    @Test(timeout=5000)
+    public void testCommitTransactedSessionWithConsumerReceivingSomeMessages() 
throws Exception {
+        doCommitTransactedSessionWithConsumerTestImpl(5, 2);
+    }
+
+    private void doCommitTransactedSessionWithConsumerTestImpl(int 
transferCount, int consumeCount) throws Exception {
         try (TestAmqpPeer testPeer = new 
TestAmqpPeer(IntegrationTestFixture.PORT);) {
             Connection connection = testFixture.establishConnecton(testPeer);
             connection.start();
@@ -339,7 +348,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase 
{
             Queue queue = session.createQueue("myQueue");
 
             testPeer.expectReceiverAttach();
-            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"));
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"), transferCount);
 
             // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
             // reply with a declared disposition state containing the txnId.
@@ -348,17 +357,23 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
             testPeer.expectTransfer(declareMatcher, false, new 
Declared().setTxnId(txnId), true);
 
-            // Then expect a TransactionalState disposition for the 
applications message once consumed
-            TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
-            stateMatcher.withTxnId(equalTo(txnId));
-            stateMatcher.withOutcome(new 
DescriptorMatcher(Accepted.DESCRIPTOR_CODE, Accepted.DESCRIPTOR_SYMBOL));
-            testPeer.expectDisposition(false, stateMatcher);
+
+            for (int i = 1; i <= consumeCount; i++) {
+                // Then expect a TransactionalState disposition for each 
message once received by the consumer
+                TransactionalStateMatcher stateMatcher = new 
TransactionalStateMatcher();
+                stateMatcher.withTxnId(equalTo(txnId));
+                stateMatcher.withOutcome(new 
DescriptorMatcher(Accepted.DESCRIPTOR_CODE, Accepted.DESCRIPTOR_SYMBOL));
+                testPeer.expectDisposition(false, stateMatcher);
+            }
 
             MessageConsumer messageConsumer = session.createConsumer(queue);
-            Message receivedMessage = messageConsumer.receive(1000);
 
-            assertNotNull(receivedMessage);
-            assertTrue(receivedMessage instanceof TextMessage);
+            for (int i = 1; i <= consumeCount; i++) {
+                Message receivedMessage = messageConsumer.receive(1000);
+
+                assertNotNull(receivedMessage);
+                assertTrue(receivedMessage instanceof TextMessage);
+            }
 
             // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
             // and reply with accepted and settled disposition to indicate the 
commit succeeded


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to