Replicated leveldb slaves index snapshots were being labeled with higher journal positions than what they really contained.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6ef202e0 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6ef202e0 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6ef202e0 Branch: refs/heads/activemq-5.9 Commit: 6ef202e020d321c2f58bf95e58aadc7b2eb4c83e Parents: a367eaa Author: Hiram Chirino <[email protected]> Authored: Fri Nov 8 10:25:50 2013 -0500 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Mar 12 09:03:24 2014 -0400 ---------------------------------------------------------------------- .../apache/activemq/leveldb/LevelDBClient.scala | 49 +++++++++-------- .../leveldb/replicated/SlaveLevelDBStore.scala | 7 ++- .../test/ReplicatedLevelDBStoreTest.java | 57 ++++++++++++-------- 3 files changed, 68 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/6ef202e0/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index 1a0dd35..15f7bb0 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -671,6 +671,7 @@ class LevelDBClient(store: LevelDBStore) { for( (id, file)<- lastSnapshotIndex ) { try { copyIndex(file, dirtyIndexFile) + debug("Recovering from last index snapshot at: "+dirtyIndexFile) } catch { case e:Exception => warn(e, "Could not recover snapshot of the index: "+e) @@ -678,11 +679,9 @@ class LevelDBClient(store: LevelDBStore) { } } index = new RichDB(factory.open(dirtyIndexFile, indexOptions)); - if ( store.paranoidChecks ) { - for(value <- index.get(DIRTY_INDEX_KEY) ) { - if( java.util.Arrays.equals(value, TRUE) ) { - warn("Recovering from a dirty index.") - } + for(value <- index.get(DIRTY_INDEX_KEY) ) { + if( java.util.Arrays.equals(value, TRUE) ) { + warn("Recovering from a dirty index.") } } index.put(DIRTY_INDEX_KEY, TRUE) @@ -691,6 +690,7 @@ class LevelDBClient(store: LevelDBStore) { } var replay_write_batch: WriteBatch = null + var indexRecoveryPosition = 0L def replay_from(from:Long, limit:Long, print_progress:Boolean=true) = { debug("Replay of journal from: %d to %d.", from, limit) @@ -700,19 +700,19 @@ class LevelDBClient(store: LevelDBStore) { might_fail { try { // Update the index /w what was stored on the logs.. - var pos = from; + indexRecoveryPosition = from; var last_reported_at = System.currentTimeMillis(); var showing_progress = false var last_reported_pos = 0L try { - while (pos < limit) { + while (indexRecoveryPosition < limit) { if( print_progress ) { val now = System.currentTimeMillis(); if( now > last_reported_at+1000 ) { - val at = pos-from + val at = indexRecoveryPosition-from val total = limit-from - val rate = (pos-last_reported_pos)*1000.0 / (now - last_reported_at) + val rate = (indexRecoveryPosition-last_reported_pos)*1000.0 / (now - last_reported_at) val eta = (total-at)/rate val remaining = if(eta > 60*60) { "%.2f hrs".format(eta/(60*60)) @@ -726,24 +726,24 @@ class LevelDBClient(store: LevelDBStore) { at*100.0/total, at, total, rate/1024, remaining)) showing_progress = true; last_reported_at = now - last_reported_pos = pos + last_reported_pos = indexRecoveryPosition } } - log.read(pos).map { + log.read(indexRecoveryPosition).map { case (kind, data, nextPos) => kind match { case LOG_DATA => val message = decodeMessage(data) store.db.producerSequenceIdTracker.isDuplicate(message.getMessageId) - trace("Replay of LOG_DATA at %d, message id: ", pos, message.getMessageId) + trace("Replay of LOG_DATA at %d, message id: ", indexRecoveryPosition, message.getMessageId) case LOG_ADD_COLLECTION => val record= decodeCollectionRecord(data) replay_write_batch.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data) collectionMeta.put(record.getKey, new CollectionMeta) - trace("Replay of LOG_ADD_COLLECTION at %d, collection: %s", pos, record.getKey) + trace("Replay of LOG_ADD_COLLECTION at %d, collection: %s", indexRecoveryPosition, record.getKey) case LOG_REMOVE_COLLECTION => val record = decodeCollectionKeyRecord(data) @@ -761,7 +761,7 @@ class LevelDBClient(store: LevelDBStore) { } index.delete(data) collectionMeta.remove(record.getKey) - trace("Replay of LOG_REMOVE_COLLECTION at %d, collection: %s", pos, record.getKey) + trace("Replay of LOG_REMOVE_COLLECTION at %d, collection: %s", indexRecoveryPosition, record.getKey) case LOG_ADD_ENTRY | LOG_UPDATE_ENTRY => val record = decodeEntryRecord(data) @@ -779,7 +779,8 @@ class LevelDBClient(store: LevelDBStore) { } collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray) } - trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey) + trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", indexRecoveryPosition, record.getCollectionKey, record.getEntryKey) + case LOG_REMOVE_ENTRY => val record = decodeEntryRecord(data) @@ -791,19 +792,19 @@ class LevelDBClient(store: LevelDBStore) { replay_write_batch.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey)) collectionDecrementSize( record.getCollectionKey) - trace("Replay of LOG_REMOVE_ENTRY collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey) + trace("Replay of LOG_REMOVE_ENTRY collection: %s, entry: %s", indexRecoveryPosition, record.getCollectionKey, record.getEntryKey) case LOG_TRACE => - trace("Replay of LOG_TRACE, message: %s", pos, data.ascii()) + trace("Replay of LOG_TRACE, message: %s", indexRecoveryPosition, data.ascii()) case RecordLog.UOW_END_RECORD => trace("Replay of UOW_END_RECORD") index.db.write(replay_write_batch) replay_write_batch=index.db.createWriteBatch() case kind => // Skip other records, they don't modify the index. - trace("Skipping replay of %d record kind at %d", kind, pos) + trace("Skipping replay of %d record kind at %d", kind, indexRecoveryPosition) } - pos = nextPos + indexRecoveryPosition = nextPos } } } @@ -989,10 +990,11 @@ class LevelDBClient(store: LevelDBStore) { index.put(DIRTY_INDEX_KEY, FALSE, new WriteOptions().sync(true)) index.close index = null + debug("Gracefuly closed the index") + copyDirtyIndexToSnapshot } if (log!=null && log.isOpen) { log.close - copyDirtyIndexToSnapshot stored_wal_append_position = log.appender_limit log = null } @@ -1043,15 +1045,18 @@ class LevelDBClient(store: LevelDBStore) { snapshotRwLock.writeLock().unlock() } + def nextIndexSnapshotPos:Long = wal_append_position + def copyDirtyIndexToSnapshot:Unit = { - if( log.appender_limit == lastIndexSnapshotPos ) { + if( nextIndexSnapshotPos == lastIndexSnapshotPos ) { // no need to snapshot again... return } - copyDirtyIndexToSnapshot(log.appender_limit) + copyDirtyIndexToSnapshot(nextIndexSnapshotPos) } def copyDirtyIndexToSnapshot(walPosition:Long):Unit = { + debug("Taking a snapshot of the current index: "+snapshotIndexFile(walPosition)) // Where we start copying files into. Delete this on // restart. val tmpDir = tempIndexFile http://git-wip-us.apache.org/repos/asf/activemq/blob/6ef202e0/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala index 2fd7c1e..2d288bd 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/SlaveLevelDBStore.scala @@ -58,9 +58,14 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { // he slave is caught up. override def post_log_rotate: Unit = { if( caughtUp ) { - super.post_log_rotate + writeExecutor { + snapshotIndex(false) + } } } + + // The snapshots we create are based on what has been replayed. + override def nextIndexSnapshotPos:Long = indexRecoveryPosition } override def doStart() = { http://git-wip-us.apache.org/repos/asf/activemq/blob/6ef202e0/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java index 119b08f..d9ba101 100644 --- a/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java +++ b/activemq-leveldb-store/src/test/java/org/apache/activemq/leveldb/test/ReplicatedLevelDBStoreTest.java @@ -118,21 +118,22 @@ public class ReplicatedLevelDBStoreTest { return f; } - @Test(timeout = 1000*60*60) + @Test(timeout = 1000*60*20) public void testReplication() throws Exception { LinkedList<File> directories = new LinkedList<File>(); directories.add(new File("target/activemq-data/leveldb-node1")); directories.add(new File("target/activemq-data/leveldb-node2")); directories.add(new File("target/activemq-data/leveldb-node3")); + resetDirectories(directories); - for (File f : directories) { - FileSupport.toRichFile(f).recursiveDelete(); - } + // For some reason this had to be 64k to trigger a bug where + // slave index snapshots were being done incorrectly. + String playload = createPlayload(64*1024); ArrayList<String> expected_list = new ArrayList<String>(); // We will rotate between 3 nodes the task of being the master. - for (int j = 0; j < 10; j++) { + for (int j = 0; j < 5; j++) { MasterLevelDBStore master = createMaster(directories.get(0)); CountDownFuture masterStart = asyncStart(master); @@ -141,8 +142,12 @@ public class ReplicatedLevelDBStoreTest { asyncStart(slave2); masterStart.await(); - LOG.info("Adding messages..."); MessageStore ms = master.createQueueMessageStore(new ActiveMQQueue("TEST")); + + LOG.info("Checking: "+master.getDirectory()); + assertEquals(expected_list, getMessages(ms)); + + LOG.info("Adding messages..."); final int TOTAL = 500; for (int i = 0; i < TOTAL; i++) { if (i % ((int) (TOTAL * 0.10)) == 0) { @@ -152,19 +157,23 @@ public class ReplicatedLevelDBStoreTest { if (i == 250) { slave1.start(); slave2.stop(); + LOG.info("Checking: "+master.getDirectory()); + assertEquals(expected_list, getMessages(ms)); } String msgid = "m:" + j + ":" + i; - addMessage(ms, msgid); + addMessage(ms, msgid, playload); expected_list.add(msgid); } - LOG.info("Checking master state"); + LOG.info("Checking: "+master.getDirectory()); assertEquals(expected_list, getMessages(ms)); - LOG.info("Stopping master: " + master.node_id()); + LOG.info("Stopping master: " + master.getDirectory()); master.stop(); - LOG.info("Stopping slave: " + slave1.node_id()); + + Thread.sleep(3*1000); + LOG.info("Stopping slave: " + slave1.getDirectory()); slave1.stop(); // Rotate the dir order so that slave1 becomes the master next. @@ -172,22 +181,26 @@ public class ReplicatedLevelDBStoreTest { } } + void resetDirectories(LinkedList<File> directories) { + for (File directory : directories) { + FileSupport.toRichFile(directory).recursiveDelete(); + directory.mkdirs(); + FileSupport.toRichFile(new File(directory, "nodeid.txt")).writeText(directory.getName(), "UTF-8"); + } + } + @Test(timeout = 1000*60*60) public void testSlowSlave() throws Exception { - File node1Dir = new File("target/activemq-data/leveldb-node1"); - File node2Dir = new File("target/activemq-data/leveldb-node2"); - File node3Dir = new File("target/activemq-data/leveldb-node3"); - - FileSupport.toRichFile(node1Dir).recursiveDelete(); - FileSupport.toRichFile(node2Dir).recursiveDelete(); - FileSupport.toRichFile(node3Dir).recursiveDelete(); - - node2Dir.mkdirs(); - node3Dir.mkdirs(); - FileSupport.toRichFile(new File(node2Dir, "nodeid.txt")).writeText("node2", "UTF-8"); - FileSupport.toRichFile(new File(node3Dir, "nodeid.txt")).writeText("node3", "UTF-8"); + LinkedList<File> directories = new LinkedList<File>(); + directories.add(new File("target/activemq-data/leveldb-node1")); + directories.add(new File("target/activemq-data/leveldb-node2")); + directories.add(new File("target/activemq-data/leveldb-node3")); + resetDirectories(directories); + File node1Dir = directories.get(0); + File node2Dir = directories.get(1); + File node3Dir = directories.get(2); ArrayList<String> expected_list = new ArrayList<String>();
