Check if the consumer link credit has already been exhausted, and no-op the 
'stop' request. Update test accordingly


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/1b75b7e0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/1b75b7e0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/1b75b7e0

Branch: refs/heads/master
Commit: 1b75b7e021a6e708d66986dee5a800b5f8797796
Parents: 12fc180
Author: Robert Gemmell <[email protected]>
Authored: Tue Dec 9 12:50:27 2014 +0000
Committer: Robert Gemmell <[email protected]>
Committed: Tue Dec 9 12:50:27 2014 +0000

----------------------------------------------------------------------
 .../org/apache/qpid/jms/provider/amqp/AmqpConsumer.java | 12 ++++++++++--
 .../qpid/jms/integration/SessionIntegrationTest.java    |  7 +++----
 2 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1b75b7e0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 45372dc..ff2b2b5 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -105,8 +105,16 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
         // of drain if it was supported. We would first need to understand 
what happens
         // if we reduce credit below the number of messages already in-flight 
before
         // the peer sees the update.
-        getEndpoint().drain(0);
-        drainRequest = request;
+
+        Receiver receiver = getEndpoint();
+        if(receiver.getRemoteCredit() <= 0) {
+            // Sender already used all the credit on offer
+            request.onSuccess();
+        }
+        else{
+            drainRequest = request;
+            receiver.drain(0);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1b75b7e0/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 1d4d304..b527589 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -548,7 +548,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase 
{
     }
 
     @Test(timeout=5000)
-    public void testRollbackTransactedSessionWithPrefetchFullBeforeDrain() 
throws Exception {
+    public void 
testRollbackTransactedSessionWithPrefetchFullBeforeStoppingConsumer() throws 
Exception {
         try (TestAmqpPeer testPeer = new 
TestAmqpPeer(IntegrationTestFixture.PORT);) {
             Connection connection = testFixture.establishConnecton(testPeer);
             int messageCount = 5;
@@ -587,9 +587,8 @@ public class SessionIntegrationTest extends QpidJmsTestCase 
{
 
             producer.send(session.createMessage());
 
-            // Expect the consumer to be 'stopped' prior to rollback by 
issuing a 'drain'. We will NOT send a flow
-            // response as we have manipulated that all the 'on the wire' 
credit was already used.
-            testPeer.expectLinkFlow(true, false, 
equalTo(UnsignedInteger.ZERO));
+            // The consumer will be 'stopped' prior to rollback, however we 
will NOT send a 'drain' Flow
+            // frame as we have manipulated that all the credit was already 
used, i.e. it already stopped.
 
             // Expect an unsettled 'discharge' transfer to the txn coordinator 
containing the txnId,
             // and reply with accepted and settled disposition to indicate the 
rollback succeeded


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to