Repository: qpid-jms Updated Branches: refs/heads/master 4e0c88fab -> 99dde7ac2
update stop process to handle presence of queued messages just in case, tidy up a little for clarity Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/99dde7ac Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/99dde7ac Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/99dde7ac Branch: refs/heads/master Commit: 99dde7ac2ecf2295ce84ee39e172d805119a67b2 Parents: 4e0c88f Author: Robert Gemmell <[email protected]> Authored: Wed Dec 10 11:26:42 2014 +0000 Committer: Robert Gemmell <[email protected]> Committed: Wed Dec 10 11:26:42 2014 +0000 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpConsumer.java | 50 +++++++++++--------- 1 file changed, 28 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/99dde7ac/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 64cacc3..19b4560 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -78,7 +78,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver private final AtomicLong _incomingSequence = new AtomicLong(0); - private AsyncResult drainRequest; + private AsyncResult stopRequest; public AmqpConsumer(AmqpSession session, JmsConsumerInfo info) { super(info); @@ -100,30 +100,35 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver * Stops the consumer, using all link credit and waiting for in-flight messages to arrive. */ public void stop(AsyncResult request) { - //TODO: We dont actually want the additional messages that could be sent while - // draining. We could explicitly reduce credit first, or possibly use 'echo' instead - // of drain if it was supported. We would first need to understand what happens - // if we reduce credit below the number of messages already in-flight before - // the peer sees the update. - Receiver receiver = getEndpoint(); - if(receiver.getRemoteCredit() <= 0) { - // Sender already used all the credit on offer - request.onSuccess(); - } - else{ - drainRequest = request; + if (receiver.getRemoteCredit() <= 0) { + if (receiver.getQueued() == 0) { + // We have no remote credit and all the deliveries have been processed. + request.onSuccess(); + } else { + // There are still deliveries to process, wait for them to be. + stopRequest = request; + } + } else { + //TODO: We dont actually want the additional messages that could be sent while + // draining. We could explicitly reduce credit first, or possibly use 'echo' instead + // of drain if it was supported. We would first need to understand what happens + // if we reduce credit below the number of messages already in-flight before + // the peer sees the update. + stopRequest = request; receiver.drain(0); } } @Override public void processFlowUpdates() throws IOException { - if (drainRequest != null) { + // Check if we tried to stop and have now run out of credit, and + // processed all locally queued messages + if (stopRequest != null) { Receiver receiver = getEndpoint(); - if (receiver.getDrain() && receiver.getRemoteCredit() <= 0) { - drainRequest.onSuccess(); - drainRequest = null; + if (receiver.getRemoteCredit() <= 0 && receiver.getQueued() == 0) { + stopRequest.onSuccess(); + stopRequest = null; } } @@ -360,12 +365,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver incoming = null; } } else { - //We have exhausted the currently available messages on this link. Check if we tried to drain. - if(drainRequest != null) { - if(getEndpoint().getDrain() && getEndpoint().getRemoteCredit() <= 0) + // We have exhausted the locally queued messages on this link. + // Check if we tried to stop and have now run out of credit. + if(stopRequest != null) { + if(getEndpoint().getRemoteCredit() <= 0) { - drainRequest.onSuccess(); - drainRequest = null; + stopRequest.onSuccess(); + stopRequest = null; } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
