Repository: activemq Updated Branches: refs/heads/trunk c5cebd5ec -> b54606b12
https://issues.apache.org/jira/browse/AMQ-5300 - fix and test that verifies recovery in the absense of an index Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b54606b1 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b54606b1 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b54606b1 Branch: refs/heads/trunk Commit: b54606b12417c371af6589b96b72f7110de26b34 Parents: f1df9f8 Author: gtully <[email protected]> Authored: Wed Jan 7 17:10:10 2015 +0000 Committer: gtully <[email protected]> Committed: Wed Jan 7 17:11:09 2015 +0000 ---------------------------------------------------------------------- .../apache/activemq/leveldb/LevelDBClient.scala | 12 +- .../activemq/leveldb/test/IndexRebuildTest.java | 128 +++++++++++++++++++ .../leveldb/test/ReplicationTestSupport.java | 16 ++- 3 files changed, 151 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/b54606b1/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index 64bbcee..db6d653 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -559,7 +559,17 @@ class LevelDBClient(store: LevelDBStore) { might_fail { log.open() } - replay_from(lastIndexSnapshotPos, log.appender_limit) + + var startPosition = lastIndexSnapshotPos; + // if we cannot locate a log for a snapshot, replay from + // first entry of first available log + if (log.log_info(startPosition).isEmpty) { + if (!log.log_infos.isEmpty) { + startPosition = log.log_infos.firstKey(); + } + } + + replay_from(startPosition, log.appender_limit) replay_write_batch = null; } http://git-wip-us.apache.org/repos/asf/activemq/blob/b54606b1/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/IndexRebuildTest.java ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/IndexRebuildTest.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/IndexRebuildTest.java new file mode 100644 index 0000000..a9be570 --- /dev/null +++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/IndexRebuildTest.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.leveldb.test; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.leveldb.LevelDBStore; +import org.apache.activemq.leveldb.LevelDBStoreView; +import org.apache.activemq.leveldb.util.FileSupport; +import org.apache.activemq.store.MessageStore; +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.apache.activemq.leveldb.test.ReplicationTestSupport.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class IndexRebuildTest { + protected static final Logger LOG = LoggerFactory.getLogger(IndexRebuildTest.class); + final int max = 30; + final int toLeave = 5; + ArrayList<LevelDBStore> stores = new ArrayList<LevelDBStore>(); + + @Test(timeout = 1000 * 60 * 10) + public void testRebuildIndex() throws Exception { + + File masterDir = new File("target/activemq-data/leveldb-rebuild"); + FileSupport.toRichFile(masterDir).recursiveDelete(); + + final LevelDBStore store = new LevelDBStore(); + store.setDirectory(masterDir); + store.setLogDirectory(masterDir); + + store.setLogSize(1024 * 10); + store.start(); + stores.add(store); + + ArrayList<MessageId> inserts = new ArrayList<MessageId>(); + MessageStore ms = store.createQueueMessageStore(new ActiveMQQueue("TEST")); + for (int i = 0; i < max; i++) { + inserts.add(addMessage(ms, "m" + i).getMessageId()); + } + int logFileCount = countLogFiles(store); + assertTrue("more than one journal file", logFileCount > 1); + + for (MessageId id : inserts.subList(0, inserts.size() - toLeave)) { + removeMessage(ms, id); + } + + LevelDBStoreView view = new LevelDBStoreView(store); + view.compact(); + + int reducedLogFileCount = countLogFiles(store); + assertTrue("log files deleted", logFileCount > reducedLogFileCount); + + store.stop(); + + deleteTheIndex(store); + + assertEquals("log files remain", reducedLogFileCount, countLogFiles(store)); + + // restart, recover and verify message read + store.start(); + ms = store.createQueueMessageStore(new ActiveMQQueue("TEST")); + + assertEquals(toLeave + " messages remain", toLeave, getMessages(ms).size()); + } + + private void deleteTheIndex(LevelDBStore store) throws IOException { + for (String index : store.getLogDirectory().list(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + LOG.info("dir:" + dir + ", name: " + name); + return (name != null && name.endsWith(".index")); + } + })) { + + File file = new File(store.getLogDirectory().getAbsoluteFile(), index); + LOG.info("Deleting index directory:" + file); + FileUtils.deleteDirectory(file); + } + + } + + private int countLogFiles(LevelDBStore store) { + return store.getLogDirectory().list(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + LOG.info("dir:" + dir + ", name: " + name); + return (name != null && name.endsWith(".log")); + } + }).length; + } + + @After + public void stop() throws Exception { + for (LevelDBStore store : stores) { + if (store.isStarted()) { + store.stop(); + } + FileUtils.deleteDirectory(store.directory()); + } + stores.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/b54606b1/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java index 181d11d..9ad57db 100644 --- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java +++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicationTestSupport.java @@ -16,17 +16,17 @@ */ package org.apache.activemq.leveldb.test; +import java.io.IOException; +import java.util.ArrayList; +import javax.jms.JMSException; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; -import javax.jms.JMSException; -import java.io.IOException; -import java.util.ArrayList; - /** */ public class ReplicationTestSupport { @@ -60,6 +60,14 @@ public class ReplicationTestSupport { return message; } + static public void removeMessage(MessageStore ms, MessageId messageId) throws JMSException, IOException { + MessageAck ack = new MessageAck(); + ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE); + ack.setFirstMessageId(messageId); + ack.setLastMessageId(messageId); + ms.removeMessage(new ConnectionContext(), ack); + } + static public ArrayList<String> getMessages(MessageStore ms) throws Exception { final ArrayList<String> rc = new ArrayList<String>(); ms.recover(new MessageRecoveryListener() {
