Fixes for leveldb replication: make sure we only apply index updates when we encounter a UOW_END_RECORD so that we don't end up with an inconsistent index if a partial UOW is replicated.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ca02709d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ca02709d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ca02709d Branch: refs/heads/activemq-5.9 Commit: ca02709d5642252eb71b79adf7b067cb3df9020f Parents: c7a65cc Author: Hiram Chirino <[email protected]> Authored: Fri Nov 1 14:21:30 2013 -0400 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Mar 12 09:00:11 2014 -0400 ---------------------------------------------------------------------- .../apache/activemq/leveldb/LevelDBClient.scala | 42 +++++++++++++++++--- 1 file changed, 37 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ca02709d/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index dbf6512..b130a22 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -31,7 +31,7 @@ import record.{CollectionKey, EntryKey, EntryRecord, CollectionRecord} import org.apache.activemq.leveldb.util._ import java.util.concurrent._ import org.fusesource.hawtbuf._ -import java.io.{IOException, ObjectInputStream, ObjectOutputStream, File} +import java.io._ import scala.Option._ import org.apache.activemq.command.{MessageAck, Message} import org.apache.activemq.util.{IOExceptionSupport, ByteSequence} @@ -41,6 +41,17 @@ import org.apache.activemq.leveldb.util.TimeMetric import org.apache.activemq.leveldb.RecordLog.LogInfo import org.fusesource.leveldbjni.internal.JniDB import org.apache.activemq.ActiveMQMessageAuditNoSync +import java.util.zip.CRC32 +import org.apache.activemq.leveldb.util.TimeMetric +import org.fusesource.hawtbuf.ByteArrayInputStream +import org.apache.activemq.leveldb.RecordLog.LogInfo +import scala.Some +import scala.Serializable +import org.apache.activemq.leveldb.XaAckRecord +import org.apache.activemq.leveldb.MessageRecord +import org.apache.activemq.leveldb.EntryLocator +import org.apache.activemq.leveldb.DataLocator +import org.fusesource.hawtbuf.ByteArrayOutputStream /** * @author <a href="http://hiramchirino.com">Hiram Chirino</a> @@ -551,6 +562,7 @@ class LevelDBClient(store: LevelDBStore) { log.open() } replay_from(lastIndexSnapshotPos, log.appender_limit) + replay_write_batch = null; } def init() ={ @@ -678,7 +690,13 @@ class LevelDBClient(store: LevelDBStore) { } } + var replay_write_batch: WriteBatch = null + def replay_from(from:Long, limit:Long, print_progress:Boolean=true) = { + debug("Replay of journal from: %d to %d.", from, limit) + if( replay_write_batch==null ) { + replay_write_batch = index.db.createWriteBatch() + } might_fail { try { // Update the index /w what was stored on the logs.. @@ -719,11 +737,13 @@ class LevelDBClient(store: LevelDBStore) { case LOG_DATA => val message = decodeMessage(data) store.db.producerSequenceIdTracker.isDuplicate(message.getMessageId) + trace("Replay of LOG_DATA at %d, message id: ", pos, message.getMessageId) case LOG_ADD_COLLECTION => val record= decodeCollectionRecord(data) - index.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data) + replay_write_batch.put(encodeLongKey(COLLECTION_PREFIX, record.getKey), data) collectionMeta.put(record.getKey, new CollectionMeta) + trace("Replay of LOG_ADD_COLLECTION at %d, collection: %s", pos, record.getKey) case LOG_REMOVE_COLLECTION => val record = decodeCollectionKeyRecord(data) @@ -741,6 +761,7 @@ class LevelDBClient(store: LevelDBStore) { } index.delete(data) collectionMeta.remove(record.getKey) + trace("Replay of LOG_REMOVE_COLLECTION at %d, collection: %s", pos, record.getKey) case LOG_ADD_ENTRY | LOG_UPDATE_ENTRY => val record = decodeEntryRecord(data) @@ -750,7 +771,7 @@ class LevelDBClient(store: LevelDBStore) { index_record.setValueLength(record.getValueLength) val index_value = encodeEntryRecord(index_record.freeze()).toByteArray - index.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value) + replay_write_batch.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value) if( kind==LOG_ADD_ENTRY ) { if ( record.hasValueLocation ) { @@ -758,6 +779,7 @@ class LevelDBClient(store: LevelDBStore) { } collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray) } + trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey) case LOG_REMOVE_ENTRY => val record = decodeEntryRecord(data) @@ -767,10 +789,18 @@ class LevelDBClient(store: LevelDBStore) { logRefDecrement(record.getValueLocation) } - index.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey)) + replay_write_batch.delete(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey)) collectionDecrementSize( record.getCollectionKey) + trace("Replay of LOG_REMOVE_ENTRY collection: %s, entry: %s", pos, record.getCollectionKey, record.getEntryKey) - case _ => // Skip other records, they don't modify the index. + case LOG_TRACE => + trace("Replay of LOG_TRACE, message: %s", pos, data.ascii()) + case RecordLog.UOW_END_RECORD => + trace("Replay of UOW_END_RECORD") + index.db.write(replay_write_batch) + replay_write_batch=index.db.createWriteBatch() + case kind => // Skip other records, they don't modify the index. + trace("Skipping replay of %d record kind at %d", kind, pos) } pos = nextPos @@ -788,9 +818,11 @@ class LevelDBClient(store: LevelDBStore) { case e:Throwable => // replay failed.. good thing we are in a retry block... index.close + replay_write_batch = null throw e; } finally { recoveryLogs = null + debug("Replay of journal done") } } }
