update handling of drain attempts to handle messages using all the credit and 
no flow frame arriving to indicate completion


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/c5ec1a54
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/c5ec1a54
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/c5ec1a54

Branch: refs/heads/master
Commit: c5ec1a54924fb77916aca48224e3113e2ef4a566
Parents: 58616e8
Author: Robert Gemmell <[email protected]>
Authored: Tue Dec 9 15:52:05 2014 +0000
Committer: Robert Gemmell <[email protected]>
Committed: Tue Dec 9 15:52:05 2014 +0000

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 31 +++++++++++++-------
 1 file changed, 21 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c5ec1a54/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index ff2b2b5..64cacc3 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -121,7 +121,7 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
     public void processFlowUpdates() throws IOException {
         if (drainRequest != null) {
             Receiver receiver = getEndpoint();
-            if (receiver.getDrain() && !receiver.draining()) {
+            if (receiver.getDrain() && receiver.getRemoteCredit() <= 0) {
                 drainRequest.onSuccess();
                 drainRequest = null;
             }
@@ -346,17 +346,28 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
         Delivery incoming = null;
         do {
             incoming = getEndpoint().current();
-            if (incoming != null && incoming.isReadable() && 
!incoming.isPartial()) {
-                LOG.trace("{} has incoming Message(s).", this);
-                try {
-                    processDelivery(incoming);
-                } catch (Exception e) {
-                    throw IOExceptionSupport.create(e);
+            if (incoming != null) {
+                if(incoming.isReadable() && !incoming.isPartial()) {
+                    LOG.trace("{} has incoming Message(s).", this);
+                    try {
+                        processDelivery(incoming);
+                    } catch (Exception e) {
+                        throw IOExceptionSupport.create(e);
+                    }
+                    getEndpoint().advance();
+                } else {
+                    LOG.trace("{} has a partial incoming Message(s), 
deferring.", this);
+                    incoming = null;
                 }
-                getEndpoint().advance();
             } else {
-                LOG.trace("{} has a partial incoming Message(s), deferring.", 
this);
-                incoming = null;
+                //We have exhausted the currently available messages on this 
link. Check if we tried to drain.
+                if(drainRequest != null) {
+                    if(getEndpoint().getDrain() && 
getEndpoint().getRemoteCredit() <= 0)
+                    {
+                        drainRequest.onSuccess();
+                        drainRequest = null;
+                    }
+                }
             }
         } while (incoming != null);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to