Repository: activemq
Updated Branches:
  refs/heads/master 5d9f1cd3d -> ffee8b442


https://issues.apache.org/jira/browse/AMQ-6422 - match proton sender view 
credit to prefetchExtension - tracking credit to dispatch delta to track 
additional flow requests. Proton sender layer is distinct from the transport 
layer - they mirror each other


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ffee8b44
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ffee8b44
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ffee8b44

Branch: refs/heads/master
Commit: ffee8b442f57b38d57a59745b9062e8d963c65ba
Parents: 5d9f1cd
Author: gtully <gary.tu...@gmail.com>
Authored: Wed Sep 21 10:33:20 2016 +0100
Committer: gtully <gary.tu...@gmail.com>
Committed: Wed Sep 21 10:33:20 2016 +0100

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpSender.java     | 91 ++++++++++----------
 .../amqp/JMSClientTransactionTest.java          |  6 --
 .../amqp/interop/AmqpSendReceiveTest.java       |  8 +-
 .../activemq/broker/region/AbstractRegion.java  |  2 +-
 .../broker/region/AbstractSubscription.java     | 12 +++
 .../broker/region/PrefetchSubscription.java     | 10 +--
 .../broker/region/QueueSubscription.java        |  2 +-
 .../broker/region/TopicSubscription.java        | 21 ++++-
 .../TopicSubscriptionZeroPrefetchTest.java      | 19 ++++
 9 files changed, 103 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 0b85858..75f2371 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -20,8 +20,9 @@ import static 
org.apache.activemq.transport.amqp.AmqpSupport.toLong;
 
 import java.io.IOException;
 import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.AbstractSubscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ConsumerControl;
@@ -81,7 +82,10 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     private final String MESSAGE_FORMAT_KEY = 
outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
 
     private final ConsumerInfo consumerInfo;
-    private Subscription subscription;
+    private AbstractSubscription subscription;
+    private AtomicInteger prefetchExtension;
+    private int currentCreditRequest;
+    private int logicalDeliveryCount; // echoes prefetch extension but from 
protons perspective
     private final boolean presettle;
 
     private boolean draining;
@@ -111,7 +115,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     public void open() {
         if (!isClosed()) {
             session.registerSender(getConsumerId(), this);
-            subscription = 
session.getConnection().lookupPrefetchSubscription(consumerInfo);
+            subscription = 
(AbstractSubscription)session.getConnection().lookupPrefetchSubscription(consumerInfo);
+            prefetchExtension = subscription.getPrefetchExtension();
         }
 
         super.open();
@@ -168,24 +173,15 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     public void flow() throws Exception {
         Link endpoint = getEndpoint();
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, 
queued={}, unsettled={}",
+            LOG.trace("Flow: draining={}, drain={} credit={}, 
currentCredit={}, senderDeliveryCount={} - Sub={}",
                     draining, endpoint.getDrain(),
-                    endpoint.getCredit(), endpoint.getRemoteCredit(), 
endpoint.getQueued(), endpoint.getUnsettled());
+                    endpoint.getCredit(), currentCreditRequest, 
logicalDeliveryCount, subscription);
         }
 
+        final int endpointCredit = endpoint.getCredit();
         if (endpoint.getDrain() && !draining) {
 
-            // Revert to a pull consumer.
-            ConsumerControl control = new ConsumerControl();
-            control.setConsumerId(getConsumerId());
-            control.setDestination(getDestination());
-            control.setPrefetch(0);
-
-            LOG.trace("Flow: Pull case -> consumer control with prefetch (0) 
to control output");
-
-            sendToActiveMQ(control);
-
-            if (endpoint.getCredit() > 0) {
+            if (endpointCredit > 0) {
                 draining = true;
 
                 // Now request dispatch of the drain amount, we request 
immediate
@@ -196,9 +192,9 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                 pullRequest.setDestination(getDestination());
                 pullRequest.setTimeout(-1);
                 pullRequest.setAlwaysSignalDone(true);
-                pullRequest.setQuantity(endpoint.getCredit());
+                pullRequest.setQuantity(endpointCredit);
 
-                LOG.trace("Pull case -> consumer pull request quantity = {}", 
endpoint.getCredit());
+                LOG.trace("Pull case -> consumer pull request quantity = {}", 
endpointCredit);
 
                 sendToActiveMQ(pullRequest);
             } else {
@@ -207,25 +203,36 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                 pumpOutbound();
                 getEndpoint().drained();
                 session.pumpProtonToSocket();
+                currentCreditRequest = 0;
+                logicalDeliveryCount = 0;
             }
-        } else {
-            ConsumerControl control = new ConsumerControl();
-            control.setConsumerId(getConsumerId());
-            control.setDestination(getDestination());
-
-            int remoteCredit = endpoint.getRemoteCredit();
-            if (remoteCredit > 0 && subscription != null) {
-                // ensure prefetch exceeds credit + inflight
-                if (remoteCredit + endpoint.getUnsettled() + 
endpoint.getQueued() > subscription.getPrefetchSize()) {
-                    LOG.trace("Adding dispatched size to credit for sub: " + 
subscription);
-                    remoteCredit += subscription.getDispatchedQueueSize();
-                }
-            }
-            control.setPrefetch(remoteCredit);
+        } else if (endpointCredit >= 0) {
+
+            if (endpointCredit == 0 && currentCreditRequest != 0) {
+
+                prefetchExtension.set(0);
+                currentCreditRequest = 0;
+                logicalDeliveryCount = 0;
+                LOG.trace("Flow: credit 0 for sub:" + subscription);
 
-            LOG.trace("Flow: update -> consumer control with prefetch {}", 
control.getPrefetch());
+            } else {
 
-            sendToActiveMQ(control);
+                int deltaToAdd = endpointCredit;
+                int logicalCredit = currentCreditRequest - 
logicalDeliveryCount;
+                if (logicalCredit > 0) {
+                    deltaToAdd -= logicalCredit;
+                } else {
+                    // reset delivery counter - dispatch from broker 
concurrent with credit=0 flow can go negative
+                    logicalDeliveryCount = 0;
+                }
+                if (deltaToAdd > 0) {
+                    currentCreditRequest = 
prefetchExtension.addAndGet(deltaToAdd);
+                    subscription.wakeupDestinationsForDispatch();
+                    // force dispatch of matched/pending for topics (pending 
messages accumulate in the sub and are dispatched on update of prefetch)
+                    subscription.setPrefetchSize(0);
+                    LOG.trace("Flow: credit addition of {} for sub {}", 
deltaToAdd, subscription);
+                }
+            }
         }
     }
 
@@ -285,6 +292,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
         }
 
         pumpOutbound();
+        logicalDeliveryCount++;
     }
 
     @Override
@@ -440,6 +448,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                     // It's the end of browse signal in response to a 
MessagePull
                     getEndpoint().drained();
                     draining = false;
+                    currentCreditRequest = 0;
+                    logicalDeliveryCount = 0;
                 } else {
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, 
credit={}, remoteCredit={}, queued={}",
@@ -451,6 +461,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                         LOG.trace("Sender:[{}] browse complete.", 
getEndpoint().getName());
                         getEndpoint().drained();
                         draining = false;
+                        currentCreditRequest = 0;
+                        logicalDeliveryCount = 0;
                     }
 
                     jms.setRedeliveryCounter(md.getRedeliveryCounter());
@@ -481,17 +493,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
             tagCache.returnTag(tag);
         }
 
-        int newCredit = Math.max(0, getEndpoint().getCredit() - 1);
-        LOG.trace("Sender:[{}] updating conumser prefetch:{} after delivery 
settled.",
-                  getEndpoint().getName(), newCredit);
-
-        ConsumerControl control = new ConsumerControl();
-        control.setConsumerId(getConsumerId());
-        control.setDestination(getDestination());
-        control.setPrefetch(newCredit);
-
-        sendToActiveMQ(control);
-
         if (ackType == -1) {
             // we are going to settle, but redeliver.. we we won't yet ack to 
ActiveMQ
             delivery.settle();

http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
index 1251410..f481ba9 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
@@ -193,8 +193,6 @@ public class JMSClientTransactionTest extends 
JMSClientTestSupport {
         assertEquals(MSG_COUNT, 
getProxyToQueue(getDestinationName()).getQueueSize());
         SubscriptionViewMBean subscription = 
getProxyToQueueSubscriber(getDestinationName());
         assertNotNull(subscription);
-        LOG.info("Subscription[{}]: prefetch size after rollback = {}", 
subscription.getSubscriptionId(), subscription.getPrefetchSize());
-        assertTrue(subscription.getPrefetchSize() > 0);
 
         for (int i = 1; i <= MSG_COUNT; i++) {
             LOG.info("Trying to receive message: {}", i);
@@ -259,8 +257,6 @@ public class JMSClientTransactionTest extends 
JMSClientTestSupport {
         assertEquals(MSG_COUNT, 
getProxyToQueue(getDestinationName()).getQueueSize());
         SubscriptionViewMBean subscription = 
getProxyToQueueSubscriber(getDestinationName());
         assertNotNull(subscription);
-        LOG.info("Subscription[{}]: prefetch size after rollback = {}", 
subscription.getSubscriptionId(), subscription.getPrefetchSize());
-        assertTrue(subscription.getPrefetchSize() > 0);
 
         assertTrue("Should read all " + MSG_COUNT + " messages.", 
Wait.waitFor(new Wait.Condition() {
 
@@ -273,7 +269,6 @@ public class JMSClientTransactionTest extends 
JMSClientTestSupport {
         LOG.info("COMMIT of first received batch here:");
         session.commit();
 
-        assertTrue(subscription.getPrefetchSize() > 0);
         for (int i = 1; i <= MSG_COUNT; i++) {
             LOG.info("Sending message: {} to commit", msgIndex++);
             TextMessage message = session.createTextMessage("Commit Message: " 
+ msgIndex);
@@ -286,7 +281,6 @@ public class JMSClientTransactionTest extends 
JMSClientTestSupport {
 
         LOG.info("WAITING -> for next three messages to arrive:");
 
-        assertTrue(subscription.getPrefetchSize() > 0);
         assertTrue("Should read all " + MSG_COUNT + " messages.", 
Wait.waitFor(new Wait.Condition() {
 
             @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index 3132e6e..34436f2 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -294,7 +294,7 @@ public class AmqpSendReceiveTest extends 
AmqpClientTestSupport {
         receiver2.flow(splitCredit);
         for (int i = 0; i < splitCredit; i++) {
             AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
-            assertNotNull("Receiver #2 should have read a message", message);
+            assertNotNull("Receiver #2 should have read message[" + i + "]", 
message);
             LOG.info("Receiver #2 read message: {}", message.getMessageId());
             message.accept();
         }
@@ -671,7 +671,7 @@ public class AmqpSendReceiveTest extends 
AmqpClientTestSupport {
         LOG.info("*** Attempting to read remaining messages with both 
receivers");
         int splitCredit = (MSG_COUNT - 4) / 2;
 
-        LOG.info("**** Receiver #1 granting creadit[{}] for its block of 
messages", splitCredit);
+        LOG.info("**** Receiver #1 granting credit[{}] for its block of 
messages", splitCredit);
         receiver1.flow(splitCredit);
         for (int i = 0; i < splitCredit; i++) {
             AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
@@ -680,11 +680,11 @@ public class AmqpSendReceiveTest extends 
AmqpClientTestSupport {
             message.accept();
         }
 
-        LOG.info("**** Receiver #2 granting creadit[{}] for its block of 
messages", splitCredit);
+        LOG.info("**** Receiver #2 granting credit[{}] for its block of 
messages", splitCredit);
         receiver2.flow(splitCredit);
         for (int i = 0; i < splitCredit; i++) {
             AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
-            assertNotNull("Receiver #2 should have read a message", message);
+            assertNotNull("Receiver #2 should have read a message[" + i + "]", 
message);
             LOG.info("Receiver #2 read message: {}", message.getMessageId());
             message.accept();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index be77b6e..13251c8 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -688,7 +688,7 @@ public abstract class AbstractRegion implements Region {
                     entry.configurePrefetch(sub);
                 }
             }
-            LOG.debug("setting prefetch: {}, on subscription: {}; resulting 
value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), 
sub.getConsumerInfo().getCurrentPrefetchSize()});
+            LOG.debug("setting prefetch: {}, on subscription: {}; resulting 
value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), 
sub.getConsumerInfo().getPrefetchSize()});
             try {
                 lookup(consumerExchange.getConnectionContext(), 
control.getDestination(),false).wakeup();
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
index 1d84269..3cb2f1f 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -49,6 +50,7 @@ public abstract class AbstractSubscription implements 
Subscription {
     protected ConsumerInfo info;
     protected final DestinationFilter destinationFilter;
     protected final CopyOnWriteArrayList<Destination> destinations = new 
CopyOnWriteArrayList<Destination>();
+    protected final AtomicInteger prefetchExtension = new AtomicInteger(0);
 
     private BooleanExpression selectorExpression;
     private ObjectName objectName;
@@ -309,4 +311,14 @@ public abstract class AbstractSubscription implements 
Subscription {
     public SubscriptionStatistics getSubscriptionStatistics() {
         return subscriptionStatistics;
     }
+
+    public void wakeupDestinationsForDispatch() {
+        for (Destination dest : destinations) {
+            dest.wakeup();
+        }
+    }
+
+    public AtomicInteger getPrefetchExtension() {
+        return this.prefetchExtension;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 5254440..0a277fb 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -23,7 +23,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.JMSException;
 
@@ -57,7 +56,6 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
 
     protected PendingMessageCursor pending;
     protected final List<MessageReference> dispatched = new 
ArrayList<MessageReference>();
-    protected final AtomicInteger prefetchExtension = new AtomicInteger();
     protected boolean usePrefetchExtension = true;
     private int maxProducersToAudit=32;
     private int maxAuditDepth=2048;
@@ -431,9 +429,7 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
             dispatchPending();
 
             if (pending.isEmpty()) {
-                for (Destination dest : destinations) {
-                    dest.wakeup();
-                }
+                wakeupDestinationsForDispatch();
             }
         } else {
             LOG.debug("Acknowledgment out of sync (Normally occurs when 
failover connection reconnects): {}", ack);
@@ -904,10 +900,6 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
         this.usePrefetchExtension = usePrefetchExtension;
     }
 
-    protected int getPrefetchExtension() {
-        return this.prefetchExtension.get();
-    }
-
     @Override
     public void setPrefetchSize(int prefetchSize) {
         this.info.setPrefetchSize(prefetchSize);

http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
index 358f946..6e865ec 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
@@ -69,7 +69,7 @@ public class QueueSubscription extends PrefetchSubscription 
implements LockOwner
     @Override
     public synchronized String toString() {
         return "QueueSubscription:" + " consumer=" + info.getConsumerId() + ", 
destinations=" + destinations.size() + ", dispatched=" + dispatched.size() + ", 
delivered="
-               + this.prefetchExtension + ", pending=" + getPendingQueueSize();
+               + this.prefetchExtension + ", pending=" + getPendingQueueSize() 
+ ", prefetch=" + getPrefetchSize() + ", prefetchExtension=" + 
prefetchExtension.get();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index eff2393..6ab264d 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -64,7 +64,6 @@ public class TopicSubscription extends AbstractSubscription {
     private MessageEvictionStrategy messageEvictionStrategy = new 
OldestMessageEvictionStrategy();
     private int discarded;
     private final Object matchedListMutex = new Object();
-    private final AtomicInteger prefetchExtension = new AtomicInteger(0);
     private int memoryUsageHighWaterMark = 95;
     // allow duplicate suppression in a ring network of brokers
     protected int maxProducersToAudit = 1024;
@@ -410,6 +409,16 @@ public class TopicSubscription extends 
AbstractSubscription {
         }
     }
 
+    private void decrementPrefetchExtension() {
+        while (true) {
+            int currentExtension = prefetchExtension.get();
+            int newExtension = Math.max(0, currentExtension - 1);
+            if (prefetchExtension.compareAndSet(currentExtension, 
newExtension)) {
+                break;
+            }
+        }
+    }
+
     @Override
     public int countBeforeFull() {
         return getPrefetchSize() == 0 ? prefetchExtension.get() : 
info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize();
@@ -529,6 +538,9 @@ public class TopicSubscription extends AbstractSubscription 
{
     // 
-------------------------------------------------------------------------
     @Override
     public boolean isFull() {
+        if (info.getPrefetchSize() == 0) {
+            return prefetchExtension.get() == 0;
+        }
         return getDispatchedQueueSize() >= info.getPrefetchSize();
     }
 
@@ -655,6 +667,11 @@ public class TopicSubscription extends 
AbstractSubscription {
                     }
                 }
             }
+
+            if (getPrefetchSize() == 0) {
+                decrementPrefetchExtension();
+            }
+
         }
         if (info.isDispatchAsync()) {
             if (node != null) {
@@ -712,7 +729,7 @@ public class TopicSubscription extends AbstractSubscription 
{
     @Override
     public String toString() {
         return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", 
destinations=" + destinations.size() + ", dispatched=" + 
getDispatchedQueueSize() + ", delivered="
-                + getDequeueCounter() + ", matched=" + matched() + ", 
discarded=" + discarded();
+                + getDequeueCounter() + ", matched=" + matched() + ", 
discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/ffee8b44/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
index b9f0d50..38fa921 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
@@ -22,6 +22,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -75,6 +76,24 @@ public class TopicSubscriptionZeroPrefetchTest {
         Assert.assertNotNull("should have received a message the published 
message", consumedMessage);
     }
 
+    @Test(timeout=60000)
+    public void testTopicConsumerPrefetchZeroClientAckLoop() throws Exception {
+        ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + 
"?consumer.retroactive=true&consumer.prefetchSize=0");
+        Session consumerClientAckSession = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        consumer = 
consumerClientAckSession.createConsumer(consumerDestination);
+
+        final int count = 10;
+        for (int i=0;i<count;i++) {
+            Message txtMessage = session.createTextMessage("M:"+ i);
+            producer.send(txtMessage);
+        }
+
+        for (int i=0;i<count;i++) {
+            Message consumedMessage = consumer.receive(2000);
+            Assert.assertNotNull("should have received message[" + i +"]", 
consumedMessage);
+        }
+    }
+
     /*
      * test durable topic subscription with prefetch zero
      */

Reply via email to