Repository: activemq
Updated Branches:
  refs/heads/activemq-5.12.x 789419ab9 -> 8ef31b91e


https://issues.apache.org/jira/browse/AMQ-5994 
https://issues.apache.org/jira/browse/AMQ-4000 - proper fix for duplicate sub 
info from the store on recovery failure from AMQ2149Test. Additional test from 
Christopher L

(cherry picked from commit dc06c8dc7540233aad6722afe554c2ea505754e2)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8ef31b91
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8ef31b91
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8ef31b91

Branch: refs/heads/activemq-5.12.x
Commit: 8ef31b91e6db5a10cebc79785f075649d8ba0a56
Parents: 789419a
Author: gtully <gary.tu...@gmail.com>
Authored: Tue Oct 6 11:56:58 2015 +0100
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Wed Oct 7 12:02:23 2015 +0000

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  15 +-
 .../org/apache/activemq/bugs/AMQ2149Test.java   |  19 +-
 .../store/kahadb/SubscriptionRecoveryTest.java  | 363 +++++++++++++++++++
 3 files changed, 379 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8ef31b91/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index ac767a7..3379333 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1063,14 +1063,6 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
      */
     void process(JournalCommand<?> data, final Location location, final 
Location inDoubtlocation) throws IOException {
         if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 
0) {
-            if (data instanceof KahaSubscriptionCommand) {
-                KahaSubscriptionCommand kahaSubscriptionCommand = 
(KahaSubscriptionCommand)data;
-                if (kahaSubscriptionCommand.hasSubscriptionInfo()) {
-                    // needs to be processed via activate and will be replayed 
on reconnect
-                    LOG.debug("ignoring add sub command during recovery 
replay:" + data);
-                    return;
-                }
-            }
             process(data, location, (IndexAware) null);
         } else {
             // just recover producer audit
@@ -1497,6 +1489,13 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
 
         // If set then we are creating it.. otherwise we are destroying the sub
         if (command.hasSubscriptionInfo()) {
+            Location existing = sd.subLocations.get(tx, subscriptionKey);
+            if (existing != null && existing.compareTo(location) == 0) {
+                // replay on recovery, ignore
+                LOG.trace("ignoring journal replay of replay of sub from: " + 
location);
+                return;
+            }
+
             sd.subscriptions.put(tx, subscriptionKey, command);
             sd.subLocations.put(tx, subscriptionKey, location);
             long ackLocation=NOT_ACKED;

http://git-wip-us.apache.org/repos/asf/activemq/blob/8ef31b91/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
index bc80ea9..804cc68 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
@@ -19,7 +19,6 @@ package org.apache.activemq.bugs;
 
 import java.io.File;
 import java.lang.IllegalStateException;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -156,7 +155,7 @@ public class AMQ2149Test {
 
         private final MessageConsumer messageConsumer;
 
-        private volatile long nextExpectedSeqNum = 0;
+        private volatile long nextExpectedSeqNum = 1;
                 
         private final boolean transactional;
 
@@ -201,10 +200,11 @@ public class AMQ2149Test {
                     }
                 }
                 if (resumeOnNextOrPreviousIsOk) {
-                    // after an indoubt commit we need to accept what we get 
(within reason)
+                    // after an indoubt commit we need to accept what we get
+                    // either a batch replay or next batch
                     if (seqNum != nextExpectedSeqNum) {
-                        if (seqNum == nextExpectedSeqNum - (TRANSACITON_BATCH 
-1)) {
-                            nextExpectedSeqNum -= (TRANSACITON_BATCH -1);
+                        if (seqNum == nextExpectedSeqNum - TRANSACITON_BATCH) {
+                            nextExpectedSeqNum -= TRANSACITON_BATCH;
                             LOG.info("In doubt commit failed, getting replay 
at:" +  nextExpectedSeqNum);
                         }
                     }
@@ -223,21 +223,21 @@ public class AMQ2149Test {
                 ++nextExpectedSeqNum;
                 lastId = message.getJMSMessageID();
             } catch (TransactionRolledBackException 
expectedSometimesOnFailoverRecovery) {
+                ++nextExpectedSeqNum;
                 LOG.info("got rollback: " + 
expectedSometimesOnFailoverRecovery);
                 if 
(expectedSometimesOnFailoverRecovery.getMessage().contains("completion in 
doubt")) {
                     // in doubt - either commit command or reply missing
                     // don't know if we will get a replay
                     resumeOnNextOrPreviousIsOk = true;
-                    nextExpectedSeqNum++;
                     LOG.info("in doubt transaction completion: ok to get next 
or previous batch. next:" + nextExpectedSeqNum);
                 } else {
                     resumeOnNextOrPreviousIsOk = false;
                     // batch will be replayed
-                    nextExpectedSeqNum -= (TRANSACITON_BATCH -1);
+                    nextExpectedSeqNum -= TRANSACITON_BATCH;
                 }
 
             } catch (Throwable e) {
-                LOG.error(dest + " onMessage error", e);
+                LOG.error(dest + " onMessage error:" + e);
                 exceptions.add(e);
             }
         }
@@ -274,8 +274,7 @@ public class AMQ2149Test {
                     final Message message = session
                             .createTextMessage(longString);
                     message.setLongProperty(SEQ_NUM_PROPERTY,
-                            nextSequenceNumber);
-                    ++nextSequenceNumber;
+                            ++nextSequenceNumber);
                     messageProducer.send(message);
                     
                     if ((nextSequenceNumber % 500) == 0) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/8ef31b91/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/SubscriptionRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/SubscriptionRecoveryTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/SubscriptionRecoveryTest.java
new file mode 100644
index 0000000..8bdb28f
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/SubscriptionRecoveryTest.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SubscriptionRecoveryTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SubscriptionRecoveryTest.class);
+
+    private BrokerService service;
+    private String connectionUri;
+    private ActiveMQConnectionFactory cf;
+
+    private final int MSG_COUNT = 256;
+
+    @Before
+    public void setUp() throws IOException, Exception {
+        createBroker(true, false);
+    }
+
+    public void createBroker(boolean deleteAllMessages, boolean recover) 
throws Exception {
+        service = new BrokerService();
+        service.setBrokerName("InactiveSubTest");
+        service.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        service.setPersistent(true);
+
+        KahaDBPersistenceAdapter pa=new KahaDBPersistenceAdapter();
+        File dataFile=new File("KahaDB");
+        pa.setDirectory(dataFile);
+        pa.setJournalMaxFileLength(10*1024);
+        pa.setCheckpointInterval(TimeUnit.SECONDS.toMillis(5));
+        pa.setCleanupInterval(TimeUnit.SECONDS.toMillis(5));
+        //Delete the index files on recovery
+        if (recover) {
+            for (File index : FileUtils.listFiles(dataFile, new 
WildcardFileFilter("*.data"), TrueFileFilter.INSTANCE)) {
+                LOG.info("deleting: " + index);
+                FileUtils.deleteQuietly(index);
+            }
+        }
+
+        service.setPersistenceAdapter(pa);
+        service.start();
+        service.waitUntilStarted();
+
+        connectionUri = "vm://InactiveSubTest?create=false";
+        cf = new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    private void restartBroker() throws Exception {
+        stopBroker();
+        createBroker(false, false);
+    }
+
+    private void recoverBroker() throws Exception {
+        stopBroker();
+        createBroker(false, true);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (service != null) {
+            service.stop();
+            service.waitUntilStopped();
+            service = null;
+        }
+    }
+
+    @Test
+    public void testDurableSubPrefetchRecovered() throws Exception {
+
+        ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
+        ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
+
+        // Send to a Queue to create some journal files
+        sendMessages(queue);
+
+        LOG.info("There are currently [{}] journal log files.", 
getNumberOfJournalFiles());
+
+        createInactiveDurableSub(topic);
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = 
service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+
+        // Now send some more to the queue to create even more files.
+        sendMessages(queue);
+
+        LOG.info("There are currently [{}] journal log files.", 
getNumberOfJournalFiles());
+        assertTrue(getNumberOfJournalFiles() > 1);
+
+        LOG.info("Restarting the broker.");
+        restartBroker();
+        LOG.info("Restarted the broker.");
+
+        LOG.info("There are currently [{}] journal log files.", 
getNumberOfJournalFiles());
+        assertTrue(getNumberOfJournalFiles() > 1);
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = 
service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+
+        // Clear out all queue data
+        service.getAdminView().removeQueue(queue.getQueueName());
+
+        assertTrue("Less than two journal files expected, was " + 
getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getNumberOfJournalFiles() <= 2;
+            }
+        }, TimeUnit.MINUTES.toMillis(2)));
+
+        LOG.info("Sending {} Messages to the Topic.", MSG_COUNT);
+        // Send some messages to the inactive destination
+        sendMessages(topic);
+
+        LOG.info("Attempt to consume {} messages from the Topic.", MSG_COUNT);
+        assertEquals(MSG_COUNT, consumeFromInactiveDurableSub(topic));
+
+        LOG.info("Recovering the broker.");
+        recoverBroker();
+        LOG.info("Recovering the broker.");
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = 
service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+    }
+
+    @Test
+    public void testDurableAcksNotDropped() throws Exception {
+
+        ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
+        ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
+
+        // Create durable sub in first data file.
+        createInactiveDurableSub(topic);
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = 
service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+
+        // Send to a Topic
+        sendMessages(topic, 1);
+
+        // Send to a Queue to create some journal files
+        sendMessages(queue);
+
+        LOG.info("Before consume there are currently [{}] journal log files.", 
getNumberOfJournalFiles());
+
+        // Consume all the Messages leaving acks behind.
+        consumeDurableMessages(topic, 1);
+
+        LOG.info("After consume there are currently [{}] journal log files.", 
getNumberOfJournalFiles());
+
+        // Now send some more to the queue to create even more files.
+        sendMessages(queue);
+
+        LOG.info("More Queued. There are currently [{}] journal log files.", 
getNumberOfJournalFiles());
+        assertTrue(getNumberOfJournalFiles() > 1);
+
+        LOG.info("Restarting the broker.");
+        restartBroker();
+        LOG.info("Restarted the broker.");
+
+        LOG.info("There are currently [{}] journal log files.", 
getNumberOfJournalFiles());
+        assertTrue(getNumberOfJournalFiles() > 1);
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = 
service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+
+        // Clear out all queue data
+        service.getAdminView().removeQueue(queue.getQueueName());
+
+        assertTrue("Less than three journal file expected, was " + 
getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getNumberOfJournalFiles() <= 3;
+            }
+        }, TimeUnit.MINUTES.toMillis(3)));
+
+        // See if we receive any message they should all be acked.
+        tryConsumeExpectNone(topic);
+
+        LOG.info("There are currently [{}] journal log files.", 
getNumberOfJournalFiles());
+
+        LOG.info("Recovering the broker.");
+        recoverBroker();
+        LOG.info("Recovering the broker.");
+
+        LOG.info("There are currently [{}] journal log files.", 
getNumberOfJournalFiles());
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = 
service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+
+        // See if we receive any message they should all be acked.
+        tryConsumeExpectNone(topic);
+
+        assertTrue("Less than three journal file expected, was " + 
getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getNumberOfJournalFiles() == 1;
+            }
+        }, TimeUnit.MINUTES.toMillis(1)));
+    }
+
+    private int getNumberOfJournalFiles() throws IOException {
+        Collection<DataFile> files =
+            ((KahaDBPersistenceAdapter) 
service.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+        int reality = 0;
+        for (DataFile file : files) {
+            if (file != null) {
+                reality++;
+            }
+        }
+
+        return reality;
+    }
+
+    private void createInactiveDurableSub(Topic topic) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, 
"Inactive");
+        consumer.close();
+        connection.close();
+    }
+
+    private void consumeDurableMessages(Topic topic, int count) throws 
Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, 
"Inactive");
+        connection.start();
+        for (int i = 0; i < count; ++i) {
+           if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) == null) {
+               fail("should have received a message");
+           }
+        }
+        consumer.close();
+        connection.close();
+    }
+
+    private void tryConsumeExpectNone(Topic topic) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, 
"Inactive");
+        connection.start();
+        if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) != null) {
+            fail("Should be no messages for this durable.");
+        }
+        consumer.close();
+        connection.close();
+    }
+
+    private int consumeFromInactiveDurableSub(Topic topic) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, 
"Inactive");
+
+        int count = 0;
+
+        while (consumer.receive(10000) != null) {
+            count++;
+        }
+
+        consumer.close();
+        connection.close();
+
+        return count;
+    }
+
+    private void sendMessages(Destination destination) throws Exception {
+        sendMessages(destination, MSG_COUNT);
+    }
+
+    private void sendMessages(Destination destination, int count) throws 
Exception {
+        Connection connection = cf.createConnection();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        for (int i = 0; i < count; ++i) {
+            TextMessage message = session.createTextMessage("Message #" + i + 
" for destination: " + destination);
+            producer.send(message);
+        }
+        connection.close();
+    }
+
+}
\ No newline at end of file

Reply via email to