Repository: activemq
Updated Branches:
  refs/heads/master d9c74d731 -> 511b9b642


[AMQ-6562] - suppress warn of durable sub duplicate from the store on cache 
exhaustion - expected in the absense of ordered sequenceid and setBatch. Fix 
leak of duplicates pending processing on batch fill for the durable sub case 
and remove eager page in for prefetch=0


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/511b9b64
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/511b9b64
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/511b9b64

Branch: refs/heads/master
Commit: 511b9b642a198fe98409e1f1aac6176b9420d951
Parents: d9c74d7
Author: gtully <gary.tu...@gmail.com>
Authored: Fri Jan 13 11:04:48 2017 +0000
Committer: gtully <gary.tu...@gmail.com>
Committed: Fri Jan 13 11:06:05 2017 +0000

----------------------------------------------------------------------
 .../broker/region/PrefetchSubscription.java     |   2 +-
 .../region/cursors/AbstractStoreCursor.java     |   9 +-
 .../region/cursors/TopicStorePrefetch.java      |   9 +
 .../activemq/usecases/DurableSubCacheTest.java  | 197 +++++++++++++++++++
 4 files changed, 214 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/511b9b64/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 0a277fb..db133c1 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -684,7 +684,7 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
                     setPendingBatchSize(pending, numberToDispatch);
                     int count = 0;
                     pending.reset();
-                    while (pending.hasNext() && !isFull() && count < 
numberToDispatch) {
+                    while (count < numberToDispatch && !isFull() && 
pending.hasNext()) {
                         MessageReference node = pending.next();
                         if (node == null) {
                             break;

http://git-wip-us.apache.org/repos/asf/activemq/blob/511b9b64/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 4d7ffea..0295b33 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -110,8 +110,7 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
             recovered = true;
         } else if (!cached) {
             // a duplicate from the store (!cached) - needs to be 
removed/acked - otherwise it will get re dispatched on restart
-            if (message.isRecievedByDFBridge()) {
-                // expected for messages pending acks with 
kahadb.concurrentStoreAndDispatchQueues=true
+            if (duplicateFromStoreExcepted(message)) {
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("{} store replayed pending message due to 
concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), 
message.getMessageId().getFutureOrSequenceLong());
                 }
@@ -128,6 +127,12 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
         return recovered;
     }
 
+    protected boolean duplicateFromStoreExcepted(Message message) {
+        // expected for messages pending acks with 
kahadb.concurrentStoreAndDispatchQueues=true for
+        // which this existing unused flag has been repurposed
+        return message.isRecievedByDFBridge();
+    }
+
     public static boolean gotToTheStore(Message message) throws Exception {
         if (message.isRecievedByDFBridge()) {
             // concurrent store and dispatch - wait to see if the message gets 
to the store to see

http://git-wip-us.apache.org/repos/asf/activemq/blob/511b9b64/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
index bfc745e..71bb4eb 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
@@ -95,6 +95,14 @@ class TopicStorePrefetch extends AbstractStoreCursor {
     }
 
     @Override
+    protected boolean duplicateFromStoreExcepted(Message message) {
+        // setBatch is not implemented - sequence order not reliable with 
concurrent transactions
+        // on cache exhaustion - first pageIn starts from last ack location 
which may replay what
+        // cursor has dispatched
+        return true;
+    }
+
+    @Override
     protected synchronized int getStoreSize() {
         try {
             return store.getMessageCount(clientId, subscriberName);
@@ -137,6 +145,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
         this.storeHasMessages = false;
         this.store.recoverNextMessages(clientId, subscriberName,
                 maxBatchSize, this);
+        dealWithDuplicates();
         if (!this.storeHasMessages && (!this.batchList.isEmpty() || 
!hadSpace)) {
             this.storeHasMessages = true;
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/511b9b64/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubCacheTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubCacheTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubCacheTest.java
new file mode 100644
index 0000000..17d1931
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubCacheTest.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.cursors.AbstractStoreCursor;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TopicSubscriber;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class DurableSubCacheTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DurableSubCacheTest.class);
+
+
+    private final ActiveMQTopic topic = new ActiveMQTopic("T1");
+    private BrokerService broker;
+
+    @Before
+    public void setUp() throws Exception {
+
+        broker = createAndStartBroker();
+        broker.waitUntilStarted();
+    }
+
+
+    private BrokerService createAndStartBroker()
+            throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setUseJmx(false);
+        broker.setAdvisorySupport(false);
+        broker.getSystemUsage().getMemoryUsage().setLimit(100 * 1024);
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setCursorMemoryHighWaterMark(20);
+        policyMap.put(topic, policy);
+        broker.setDestinationPolicy(policyMap);
+
+        broker.start();
+
+        return broker;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    @Test
+    public void testCacheExhaustion() throws Exception {
+        doTestCacheExhaustion(1000);
+    }
+
+    @Test
+    public void testCacheExhaustionPrefetch0() throws Exception {
+        doTestCacheExhaustion(0);
+    }
+
+    public void doTestCacheExhaustion(int prefetch) throws Exception {
+
+        createDurableSub(topic, "my_sub_1");
+
+        publishMesssages(topic, 20);
+
+        org.apache.log4j.Logger log4jLogger = 
org.apache.log4j.Logger.getLogger(AbstractStoreCursor.class.getCanonicalName());
+        final AtomicBoolean failed = new AtomicBoolean(false);
+
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getLevel() == Level.WARN) {
+                    LOG.info("Got warn event:" + event.getRenderedMessage());
+                    failed.set(true);
+                }
+            }
+        };
+        log4jLogger.addAppender(appender);
+
+        try {
+            consumeDurableSub(topic, "my_sub_1", 20, prefetch);
+        } finally {
+            log4jLogger.removeAppender(appender);
+        }
+
+        assertFalse("no warning from the cursor", failed.get());
+    }
+
+    private void publishMesssages(ActiveMQTopic topic, int messageCount) 
throws Exception {
+
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(broker.getVmConnectorURI());
+        connectionFactory.setWatchTopicAdvisories(false);
+        Connection con = connectionFactory.createConnection();
+        con.start();
+
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(topic);
+
+        try {
+            String textMessage = new String(new byte[1024]);
+            TextMessage msg = session.createTextMessage(textMessage);
+
+            for (int i = 0; i < messageCount; i++) {
+                producer.send(msg);
+            }
+        } finally {
+            con.close();
+        }
+
+    }
+
+
+    private void createDurableSub(ActiveMQTopic topic, String subID) throws 
Exception {
+
+
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(broker.getVmConnectorURI());
+        connectionFactory.setWatchTopicAdvisories(false);
+        Connection con = connectionFactory.createConnection();
+        con.setClientID("CONNECTION-" + subID);
+        con.start();
+
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        session.createDurableSubscriber(topic, subID, null, true);
+        session.close();
+        con.close();
+    }
+
+    private void consumeDurableSub(ActiveMQTopic topic, String subID, int 
messageCount) throws Exception {
+        consumeDurableSub(topic, subID, messageCount, 1000);
+    }
+
+    private void consumeDurableSub(ActiveMQTopic topic, String subID, int 
messageCount, int prefetch) throws Exception {
+
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(broker.getVmConnectorURI());
+        ActiveMQConnection con = (ActiveMQConnection) 
connectionFactory.createConnection();
+        con.setClientID("CONNECTION-" + subID);
+        con.getPrefetchPolicy().setAll(prefetch);
+        con.start();
+
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        TopicSubscriber topicSubscriber
+                = session.createDurableSubscriber(topic, subID, null, true);
+
+        try {
+
+            for (int i = 0; i < messageCount; i++) {
+                javax.jms.Message message = topicSubscriber.receive(4000l);
+                if (message == null) {
+                    fail("should have received a message");
+                }
+            }
+
+        } finally {
+            con.close();
+        }
+    }
+
+
+}
\ No newline at end of file

Reply via email to