add a test where the drain attempt uses all the available credit and no flow 
response is sent as a result


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/58616e84
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/58616e84
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/58616e84

Branch: refs/heads/master
Commit: 58616e84bc6faaeaa89a628aa3486f51285d16fc
Parents: b5e981c
Author: Robert Gemmell <[email protected]>
Authored: Tue Dec 9 15:50:19 2014 +0000
Committer: Robert Gemmell <[email protected]>
Committed: Tue Dec 9 15:50:19 2014 +0000

----------------------------------------------------------------------
 .../jms/integration/SessionIntegrationTest.java | 70 ++++++++++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |  2 +-
 2 files changed, 71 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/58616e84/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index b527589..21e8285 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -614,6 +614,76 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
     }
 
     @Test(timeout=5000)
+    public void 
testRollbackTransactedSessionWithPrefetchFullyUtilisedByDrainWhenStoppingConsumer()
 throws Exception {
+        try (TestAmqpPeer testPeer = new 
TestAmqpPeer(IntegrationTestFixture.PORT);) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            int messageCount = 5;
+            ((JmsConnection) 
connection).getPrefetchPolicy().setAll(messageCount);
+            connection.start();
+
+            testPeer.expectBegin(true);
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Queue queue = session.createQueue("myQueue");
+
+            // Create a consumer, expect it to flow credit, but don't send it 
any messages
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(messageCount)));
+
+            session.createConsumer(queue);
+
+            // Create a producer to use in provoking creation of the AMQP 
transaction
+            testPeer.expectSenderAttach();
+            MessageProducer producer  = session.createProducer(queue);
+
+            // First expect an unsettled 'declare' transfer to the txn 
coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 
7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new 
TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(new Declare()));
+            testPeer.expectTransfer(declareMatcher, false, new 
Declared().setTxnId(txnId), true);
+
+            // Expect the message which provoked creating the transaction
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher( new 
MessageAnnotationsSectionMatcher(true));
+            testPeer.expectTransfer(messageMatcher); //TODO: check it is 
marked as being in the transaction
+
+            producer.send(session.createMessage());
+
+            // Expect the consumer to be 'stopped' prior to rollback by 
issuing a 'drain' Flow.
+            // Action the drain by filling the prefetch (which is equivalent 
to this having happened while
+            // the Flow was in flight to the peer), and then DONT send a flow 
frame back to the client
+            // as it can tell from the messages that all the credit has been 
used.
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, 
new AmqpValueDescribedType("content"),
+                                                       messageCount, true, 
false, equalTo(UnsignedInteger.valueOf(messageCount)), 1);
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
+            // and reply with accepted and settled disposition to indicate the 
rollback succeeded
+            Discharge discharge = new Discharge();
+            discharge.setFail(true);
+            discharge.setTxnId(txnId);
+            TransferPayloadCompositeMatcher dischargeMatcher = new 
TransferPayloadCompositeMatcher();
+            dischargeMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(discharge));
+            testPeer.expectTransfer(dischargeMatcher, false, new Accepted(), 
true);
+
+            // Expect the messages that were not consumed to be released
+            for (int i = 1; i <= messageCount; i++) {
+                testPeer.expectDisposition(true, new ReleasedMatcher());
+            }
+
+            // Expect the consumer to be 'started' again as rollback completes
+            testPeer.expectLinkFlow(false, false, 
equalTo(UnsignedInteger.valueOf(messageCount)));
+
+            session.rollback();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout=5000)
     public void 
testDefaultOutcomeIsModifiedForConsumerSourceOnTransactedSession() throws 
Exception {
         try (TestAmqpPeer testPeer = new 
TestAmqpPeer(IntegrationTestFixture.PORT);) {
             Connection connection = testFixture.establishConnecton(testPeer);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/58616e84/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index cca5ab5..efac92c 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -784,7 +784,6 @@ public class TestAmqpPeer implements AutoCloseable
             Binary dtag = new Binary(tagString.getBytes());
 
             final TransferFrame transferResponse = new TransferFrame()
-            .setHandle(UnsignedInteger.valueOf(_nextLinkHandle - 1)) // TODO: 
this needs to be the value used in the attach response
             .setDeliveryId(UnsignedInteger.valueOf(nextId))
             .setDeliveryTag(dtag)
             .setMessageFormat(UnsignedInteger.ZERO)
@@ -800,6 +799,7 @@ public class TestAmqpPeer implements AutoCloseable
                 @Override
                 public void setValues()
                 {
+                    
transferResponse.setHandle(calculateLinkHandle(flowMatcher));
                     
transferResponseSender.setChannel(flowMatcher.getActualChannel());
                 }
             });


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to