https://issues.apache.org/jira/browse/AMQ-4907 - sanity check on the index when checkForCorruptJournalFiles - test and check that validates the orderindex makes sense
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4b01fe35 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4b01fe35 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4b01fe35 Branch: refs/heads/activemq-5.9 Commit: 4b01fe3512d9b384d2243547f01f9c67d7106ceb Parents: 4aae032 Author: gtully <[email protected]> Authored: Thu Nov 28 21:13:02 2013 +0000 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Mar 12 13:11:20 2014 -0400 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 9 +++++++++ .../store/kahadb/KahaDBStoreRecoveryBrokerTest.java | 16 ++++++++++++++-- 2 files changed, 23 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/4b01fe35/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index b3c2d6f..8c8fe0f 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -303,6 +303,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe Entry<String, StoredDestination> entry = iterator.next(); StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); storedDestinations.put(entry.getKey(), sd); + + if (checkForCorruptJournalFiles) { + // sanity check the index also + if (!entry.getValue().locationIndex.isEmpty(tx)) { + if (entry.getValue().orderIndex.nextMessageId <= 0) { + throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey()); + } + } + } } } }); http://git-wip-us.apache.org/repos/asf/activemq/blob/4b01fe35/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java index 2ce9533..3725572 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java @@ -46,7 +46,7 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest { public static final String KAHADB_DIR_BASE = "target/activemq-data/kahadb"; public static String kahaDbDirectoryName; - enum CorruptionType { None, FailToLoad, LoadInvalid, LoadCorrupt }; + enum CorruptionType { None, FailToLoad, LoadInvalid, LoadCorrupt, LoadOrderIndex0 }; public CorruptionType failTest = CorruptionType.None; @Override @@ -71,6 +71,7 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest { KahaDBStore kaha = new KahaDBStore(); kaha.setDirectory(new File(kahaDbDirectoryName)); kaha.deleteAllMessages(); + kaha.setCheckForCorruptJournalFiles(failTest == CorruptionType.LoadOrderIndex0); broker.setPersistenceAdapter(kaha); return broker; } @@ -100,6 +101,16 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest { raf.seek(8*1024 + 57); raf.writeLong(Integer.MAX_VALUE-10); break; + case LoadOrderIndex0: + // loadable but invalid metadata + // location of order index default priority index size + // so looks like there are no ids in the order index + // picked up by setCheckForCorruptJournalFiles + raf.seek(12*1024 + 21); + raf.writeShort(0); + raf.writeChar(0); + raf.writeLong(-1); + break; default: } raf.close(); @@ -107,6 +118,7 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest { // starting broker BrokerService broker = new BrokerService(); KahaDBStore kaha = new KahaDBStore(); + kaha.setCheckForCorruptJournalFiles(failTest == CorruptionType.LoadOrderIndex0); // uncomment if you want to test archiving //kaha.setArchiveCorruptedIndex(true); kaha.setDirectory(new File(kahaDbDirectoryName)); @@ -123,7 +135,7 @@ public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest { } public void initCombosForTestLargeQueuePersistentMessagesNotLostOnRestart() { - this.addCombinationValues("failTest", new CorruptionType[]{CorruptionType.FailToLoad, CorruptionType.LoadInvalid, CorruptionType.LoadCorrupt} ); + this.addCombinationValues("failTest", new CorruptionType[]{CorruptionType.FailToLoad, CorruptionType.LoadInvalid, CorruptionType.LoadCorrupt, CorruptionType.LoadOrderIndex0} ); } public void testLargeQueuePersistentMessagesNotLostOnRestart() throws Exception {
