https://issues.apache.org/jira/browse/AMQ-5089 - fix and test, respect client 
ack for topics and only decrement counters when ack is received


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b136df17
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b136df17
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b136df17

Branch: refs/heads/trunk
Commit: b136df177ff879de495c8908a5ba6a1e5c2778c9
Parents: 2efb6df
Author: gtully <[email protected]>
Authored: Wed Mar 19 15:33:44 2014 +0000
Committer: gtully <[email protected]>
Committed: Wed Mar 19 15:33:44 2014 +0000

----------------------------------------------------------------------
 .../broker/region/PrefetchSubscription.java     |  2 +-
 .../broker/region/TopicSubscription.java        | 48 +++++++++++++------
 .../activemq/ActiveMQMessageConsumer.java       |  2 +-
 .../org/apache/activemq/ActiveMQSession.java    |  2 +-
 .../org/apache/activemq/command/MessageAck.java |  9 ++++
 .../org/apache/activemq/JMSConsumerTest.java    |  3 +-
 .../policy/AbortSlowAckConsumer0Test.java       |  1 -
 .../usecases/ConsumeTopicPrefetchTest.java      | 49 ++++++++++++++++----
 8 files changed, 89 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b136df17/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 81f90fb..ff4c0aa 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -297,7 +297,7 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
                         break;
                     }
                 }
-            }else if (ack.isDeliveredAck()) {
+            }else if (ack.isDeliveredAck() || ack.isExpiredAck()) {
                 // Message was delivered but not acknowledged: update pre-fetch
                 // counters.
                 int index = 0;

http://git-wip-us.apache.org/repos/asf/activemq/blob/b136df17/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 7e17cf3..d17fb2f 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -18,7 +18,7 @@ package org.apache.activemq.broker.region;
 
 import java.io.IOException;
 import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.JMSException;
@@ -65,7 +65,7 @@ public class TopicSubscription extends AbstractSubscription {
     private final Object matchedListMutex = new Object();
     private final AtomicLong enqueueCounter = new AtomicLong(0);
     private final AtomicLong dequeueCounter = new AtomicLong(0);
-    private final AtomicBoolean prefetchWindowOpen = new AtomicBoolean(false);
+    private final AtomicInteger prefetchExtension = new AtomicInteger(0);
     private int memoryUsageHighWaterMark = 95;
     // allow duplicate suppression in a ring network of brokers
     protected int maxProducersToAudit = 1024;
@@ -288,16 +288,34 @@ public class TopicSubscription extends 
AbstractSubscription {
                 }
                 dequeueCounter.addAndGet(ack.getMessageCount());
             }
+            while (true) {
+                int currentExtension = prefetchExtension.get();
+                int newExtension = Math.max(0, currentExtension - 
ack.getMessageCount());
+                if (prefetchExtension.compareAndSet(currentExtension, 
newExtension)) {
+                    break;
+                }
+            }
             dispatchMatched();
             return;
         } else if (ack.isDeliveredAck()) {
             // Message was delivered but not acknowledged: update pre-fetch 
counters.
-            // also. get these for a consumer expired message.
-            if (destination != null && !ack.isInTransaction()) {
-                
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
+            prefetchExtension.addAndGet(ack.getMessageCount());
+            dispatchMatched();
+            return;
+        } else if (ack.isExpiredAck()) {
+            if (singleDestination && destination != null) {
                 
destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
+                
destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
+                
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
             }
             dequeueCounter.addAndGet(ack.getMessageCount());
+            while (true) {
+                int currentExtension = prefetchExtension.get();
+                int newExtension = Math.max(0, currentExtension - 
ack.getMessageCount());
+                if (prefetchExtension.compareAndSet(currentExtension, 
newExtension)) {
+                    break;
+                }
+            }
             dispatchMatched();
             return;
         } else if (ack.isRedeliveredAck()) {
@@ -313,15 +331,16 @@ public class TopicSubscription extends 
AbstractSubscription {
         // The slave should not deliver pull messages.
         if (getPrefetchSize() == 0 ) {
 
-            prefetchWindowOpen.set(true);
+            final long currentDispatchedCount = dispatchedCounter.get();
+            prefetchExtension.incrementAndGet();
             dispatchMatched();
 
             // If there was nothing dispatched.. we may need to setup a 
timeout.
-            if (prefetchWindowOpen.get()) {
+            if (currentDispatchedCount == dispatchedCounter.get()) {
 
                 // immediate timeout used by receiveNoWait()
                 if (pull.getTimeout() == -1) {
-                    prefetchWindowOpen.set(false);
+                    prefetchExtension.decrementAndGet();
                     // Send a NULL message to signal nothing pending.
                     dispatch(null);
                 }
@@ -331,7 +350,7 @@ public class TopicSubscription extends AbstractSubscription 
{
 
                         @Override
                         public void run() {
-                            pullTimeout();
+                            pullTimeout(currentDispatchedCount);
                         }
                     }, pull.getTimeout());
                 }
@@ -344,13 +363,15 @@ public class TopicSubscription extends 
AbstractSubscription {
      * Occurs when a pull times out. If nothing has been dispatched since the
      * timeout was setup, then send the NULL message.
      */
-    private final void pullTimeout() {
+    private final void pullTimeout(long currentDispatchedCount) {
         synchronized (matchedListMutex) {
-            if (prefetchWindowOpen.compareAndSet(true, false)) {
+            if (currentDispatchedCount == dispatchedCounter.get()) {
                 try {
                     dispatch(null);
                 } catch (Exception e) {
                     context.getConnection().serviceException(e);
+                } finally {
+                    prefetchExtension.decrementAndGet();
                 }
             }
         }
@@ -363,7 +384,7 @@ public class TopicSubscription extends AbstractSubscription 
{
 
     @Override
     public int getDispatchedQueueSize() {
-        return (int)(dispatchedCounter.get() - dequeueCounter.get());
+        return (int)(dispatchedCounter.get() - prefetchExtension.get() - 
dequeueCounter.get());
     }
 
     public int getMaximumPendingMessages() {
@@ -462,7 +483,7 @@ public class TopicSubscription extends AbstractSubscription 
{
     // 
-------------------------------------------------------------------------
     @Override
     public boolean isFull() {
-        return getDispatchedQueueSize() >= info.getPrefetchSize() && 
!prefetchWindowOpen.get();
+        return getDispatchedQueueSize() >= info.getPrefetchSize();
     }
 
     @Override
@@ -553,7 +574,6 @@ public class TopicSubscription extends AbstractSubscription 
{
                             continue; // just drop it.
                         }
                         dispatch(message);
-                        prefetchWindowOpen.set(false);
                     }
                 } finally {
                     matched.release();

http://git-wip-us.apache.org/repos/asf/activemq/blob/b136df17/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 39f55bf..d862f70 100755
--- 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -898,7 +898,7 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
             return;
         }
         if (messageExpired) {
-            acknowledge(md, MessageAck.DELIVERED_ACK_TYPE);
+            acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);
             stats.getExpiredMessageCount().increment();
         } else {
             stats.onMessage();

http://git-wip-us.apache.org/repos/asf/activemq/blob/b136df17/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 1f9ef32..1dc197f 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -865,7 +865,7 @@ public class ActiveMQSession implements Session, 
QueueSession, TopicSession, Sta
 
             MessageAck earlyAck = null;
             if (message.isExpired()) {
-                earlyAck = new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 
1);
+                earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
             } else if (connection.isDuplicate(ActiveMQSession.this, message)) {
                 LOG.debug("{} got duplicate: {}", this, 
message.getMessageId());
                 earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);

http://git-wip-us.apache.org/repos/asf/activemq/blob/b136df17/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java 
b/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java
index 1fc6c10..bb0a72f 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java
@@ -64,6 +64,11 @@ public class MessageAck extends BaseCommand {
      */
     public static final byte UNMATCHED_ACK_TYPE = 5;
 
+    /**
+     * the case where a consumer does not dispatch because message has expired 
inflight
+     */
+    public static final byte EXPIRED_ACK_TYPE = 6;
+
     protected byte ackType;
     protected ConsumerId consumerId;
     protected MessageId firstMessageId;
@@ -135,6 +140,10 @@ public class MessageAck extends BaseCommand {
         return ackType == UNMATCHED_ACK_TYPE;
     }
 
+    public boolean isExpiredAck() {
+        return ackType == EXPIRED_ACK_TYPE;
+    }
+
     /**
      * @openwire:property version=1 cache=true
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/b136df17/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
index a6bc997..c793dc8 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
@@ -885,7 +885,7 @@ public class JMSConsumerTest extends JmsTestSupport {
         connection.setStatsEnabled(true);
 
         Session sendSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-            MessageProducer producer = sendSession.createProducer(destination);
+        MessageProducer producer = sendSession.createProducer(destination);
         producer.setTimeToLive(1000);
         final int count = 4;
         for (int i = 0; i < count; i++) {
@@ -919,6 +919,7 @@ public class JMSConsumerTest extends JmsTestSupport {
         assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, 
view.getInFlightCount());
         assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, 
view.getDispatchCount());
         assertEquals("Wrong dequeue count: " + view.getDequeueCount(), 8, 
view.getDequeueCount());
+        assertEquals("Wrong expired count: " + view.getExpiredCount(), 4, 
view.getExpiredCount());
     }
 
     protected DestinationViewMBean createView(ActiveMQDestination destination) 
throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/b136df17/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
index 36a12f1..fa14142 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java
@@ -144,7 +144,6 @@ public class AbortSlowAckConsumer0Test extends 
AbortSlowConsumer0Test {
 
     @Test
     public void testIdleConsumerCanBeAborted() throws Exception {
-        AbortSlowAckConsumerStrategy strategy = createSlowConsumerStrategy();
         strategy.setIgnoreIdleConsumers(false);
         strategy.setMaxTimeSinceLastAck(2000); // Make it shorter
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/b136df17/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java
index f7b9794..d95d2d6 100755
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.usecases;
 
+import java.util.LinkedList;
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -64,11 +65,26 @@ public class ConsumeTopicPrefetchTest extends 
ProducerConsumerTestSupport {
         }
 
         validateConsumerPrefetch(this.getSubject(), prefetchSize);
-        
+
+        LinkedList<TextMessage> consumed = new LinkedList<TextMessage>();
         // lets consume them in two fetch batches
-        for (int i = 0; i < messageCount; i++) {
-            consumeMessge(i);
+        int batchSize = messageCount/2;
+        for (int i = 0; i < batchSize; i++) {
+            consumed.add(consumeMessge(i));
+        }
+
+        // delayed delivered ack a .5 prefetch
+        validateConsumerPrefetchGreaterOrEqual(this.getSubject(), (long) 
Math.min(messageCount, 1.5 * prefetchSize));
+
+        for (int i = 0; i < batchSize; i++) {
+            consumed.remove().acknowledge();
+        }
+
+        // second batch to consume the rest
+        for (int i = batchSize; i < messageCount; i++) {
+            consumeMessge(i).acknowledge();
         }
+        validateConsumerPrefetch(this.getSubject(), 0);
     }
 
     protected Connection createConnection() throws Exception {
@@ -95,9 +111,17 @@ public class ConsumeTopicPrefetchTest extends 
ProducerConsumerTestSupport {
         }
     }
 
-    protected void validateConsumerPrefetch(String destination, final long 
expectedCount) throws JMSException {
+    private void validateConsumerPrefetchGreaterOrEqual(String subject, long 
min) throws JMSException {
+        doValidateConsumerPrefetch(subject, min, true);
+    }
+
+    protected void validateConsumerPrefetch(String subject, final long 
expectedCount) throws JMSException {
+        doValidateConsumerPrefetch(subject, expectedCount, false);
+    }
+
+    protected void doValidateConsumerPrefetch(String destination, final long 
expectedCount, final boolean greaterOrEqual) throws JMSException {
         RegionBroker regionBroker = (RegionBroker) 
BrokerRegistry.getInstance().lookup("localhost").getRegionBroker();
-        for (org.apache.activemq.broker.region.Destination dest : 
regionBroker.getQueueRegion().getDestinationMap().values()) {
+        for (org.apache.activemq.broker.region.Destination dest : 
regionBroker.getTopicRegion().getDestinationMap().values()) {
             final org.apache.activemq.broker.region.Destination target = dest;
             if (dest.getName().equals(destination)) {
                 try {
@@ -105,7 +129,11 @@ public class ConsumeTopicPrefetchTest extends 
ProducerConsumerTestSupport {
                         public boolean isSatisified() throws Exception {
                             DestinationStatistics stats = 
target.getDestinationStatistics();
                             LOG.info("inflight for : " + target.getName() + ": 
" +  stats.getInflight().getCount());
-                            return stats.getInflight().getCount() == 
expectedCount;
+                            if (greaterOrEqual) {
+                                return stats.getInflight().getCount() >= 
expectedCount;
+                            } else {
+                                return stats.getInflight().getCount() == 
expectedCount;
+                            }
                         }
                     });
                 } catch (Exception e) {
@@ -113,8 +141,13 @@ public class ConsumeTopicPrefetchTest extends 
ProducerConsumerTestSupport {
                 }
                 DestinationStatistics stats = dest.getDestinationStatistics();
                 LOG.info("inflight for : " + dest.getName() + ": " + 
stats.getInflight().getCount());
-                assertEquals("inflight for: " + dest.getName() + ": " + 
stats.getInflight().getCount() + " matches", 
-                        expectedCount, stats.getInflight().getCount());      
+                if (greaterOrEqual) {
+                    assertTrue("inflight for: " + dest.getName() + ": " + 
stats.getInflight().getCount() + " > " + stats.getInflight().getCount(),
+                                             stats.getInflight().getCount() >= 
expectedCount);
+                } else {
+                    assertEquals("inflight for: " + dest.getName() + ": " + 
stats.getInflight().getCount() + " matches",
+                        expectedCount, stats.getInflight().getCount());
+                }
             }
         }
     }

Reply via email to