Fixing https://issues.apache.org/jira/browse/AMQ-4917 : LevelDB store can fail when using durable subs.
We were browsing durable sub entries which had been concurrently GCed causing leveldb store failures which then caused the broker to restart. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3faddcd2 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3faddcd2 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3faddcd2 Branch: refs/heads/activemq-5.9 Commit: 3faddcd2d19de634eac53e2584bebc856e24d6d3 Parents: 720b8ac Author: Hiram Chirino <[email protected]> Authored: Tue Dec 3 19:20:11 2013 -0500 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Mar 12 13:13:33 2014 -0400 ---------------------------------------------------------------------- .../org/apache/activemq/leveldb/DBManager.scala | 12 +- .../apache/activemq/leveldb/LevelDBClient.scala | 117 +++++++++---------- .../apache/activemq/leveldb/LevelDBStore.scala | 10 +- .../org/apache/activemq/leveldb/RecordLog.scala | 61 +++++----- 4 files changed, 104 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/3faddcd2/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 6b575ee..722d932 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -37,7 +37,9 @@ import org.apache.activemq.ActiveMQMessageAuditNoSync import org.fusesource.hawtdispatch case class EntryLocator(qid:Long, seq:Long) -case class DataLocator(store:LevelDBStore, pos:Long, len:Int) +case class DataLocator(store:LevelDBStore, pos:Long, len:Int) { + override def toString: String = "DataLocator(%x, %d)".format(pos, len) +} case class MessageRecord(store:LevelDBStore, id:MessageId, data:Buffer, syncNeeded:Boolean) { var locator:DataLocator = _ } @@ -860,8 +862,12 @@ class DBManager(val parent:LevelDBStore) { def getMessage(x: MessageId):Message = { val id = Option(pendingStores.get(x)).flatMap(_.headOption).map(_.id).getOrElse(x) val locator = id.getDataLocator() - val msg = client.getMessageWithRetry(locator) - msg.setMessageId(id) + val msg = client.getMessage(locator) + if( msg!=null ) { + msg.setMessageId(id) + } else { + LevelDBStore.warn("Could not load messages for: "+x+" at: "+locator) + } msg } http://git-wip-us.apache.org/repos/asf/activemq/blob/3faddcd2/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index fe29012..7872ff8 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -21,7 +21,6 @@ import java.{lang=>jl} import java.{util=>ju} import java.util.concurrent.locks.ReentrantReadWriteLock -import java.util.concurrent.atomic.AtomicBoolean import collection.immutable.TreeMap import collection.mutable.{HashMap, ListBuffer} import org.iq80.leveldb._ @@ -37,20 +36,13 @@ import org.apache.activemq.command.{MessageAck, Message} import org.apache.activemq.util.{IOExceptionSupport, ByteSequence} import java.text.SimpleDateFormat import java.util.{Date, Collections} -import org.apache.activemq.leveldb.util.TimeMetric -import org.apache.activemq.leveldb.RecordLog.LogInfo import org.fusesource.leveldbjni.internal.JniDB import org.apache.activemq.ActiveMQMessageAuditNoSync -import java.util.zip.CRC32 import org.apache.activemq.leveldb.util.TimeMetric import org.fusesource.hawtbuf.ByteArrayInputStream import org.apache.activemq.leveldb.RecordLog.LogInfo import scala.Some import scala.Serializable -import org.apache.activemq.leveldb.XaAckRecord -import org.apache.activemq.leveldb.MessageRecord -import org.apache.activemq.leveldb.EntryLocator -import org.apache.activemq.leveldb.DataLocator import org.fusesource.hawtbuf.ByteArrayOutputStream import org.apache.activemq.broker.SuppressReplyException @@ -772,19 +764,20 @@ class LevelDBClient(store: LevelDBStore) { val index_record = new EntryRecord.Bean() index_record.setValueLocation(record.getValueLocation) - index_record.setValueLength(record.getValueLength) - val index_value = encodeEntryRecord(index_record.freeze()).toByteArray + if( record.hasValueLength ) { + index_record.setValueLength(record.getValueLength) + } + val index_value = encodeEntryRecord(index_record.freeze()).toByteArray replay_write_batch.put(encodeEntryKey(ENTRY_PREFIX, record.getCollectionKey, record.getEntryKey), index_value) if( kind==LOG_ADD_ENTRY ) { - if ( record.hasValueLocation ) { - logRefIncrement(record.getValueLocation) - } + logRefIncrement(record.getValueLocation) collectionIncrementSize(record.getCollectionKey, record.getEntryKey.toByteArray) + trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", indexRecoveryPosition, record.getCollectionKey, record.getEntryKey) + } else { + trace("Replay of LOG_UPDATE_ENTRY at %d, collection: %s, entry: %s", indexRecoveryPosition, record.getCollectionKey, record.getEntryKey) } - trace("Replay of LOG_ADD_ENTRY at %d, collection: %s, entry: %s", indexRecoveryPosition, record.getCollectionKey, record.getEntryKey) - case LOG_REMOVE_ENTRY => val record = decodeEntryRecord(data) @@ -834,10 +827,9 @@ class LevelDBClient(store: LevelDBStore) { private def logRefDecrement(pos: Long) { for( key <- logRefKey(pos) ) { - logRefs.get(key).foreach { counter => - if (counter.decrementAndGet() == 0) { - logRefs.remove(key) - } + logRefs.get(key) match { + case Some(counter) => counter.decrementAndGet() == 0 + case None => warn("invalid: logRefDecrement: "+pos) } } } @@ -1252,11 +1244,16 @@ class LevelDBClient(store: LevelDBStore) { collectionCursor(collectionKey, encodeLong(seq)) { (key, value) => val seq = decodeLong(key) var locator = DataLocator(store, value.getValueLocation, value.getValueLength) - val msg = getMessageWithRetry(locator) - msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq)) - msg.getMessageId().setDataLocator(locator) - msg.setRedeliveryCounter(decodeQueueEntryMeta(value)) - func(msg) + val msg = getMessage(locator) + if( msg !=null ) { + msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq)) + msg.getMessageId().setDataLocator(locator) + msg.setRedeliveryCounter(decodeQueueEntryMeta(value)) + func(msg) + } else { + warn("Could not load message seq: "+seq+" from "+locator) + true + } } } @@ -1278,10 +1275,15 @@ class LevelDBClient(store: LevelDBStore) { func(XaAckRecord(collectionKey, seq, ack, sub)) } else { var locator = DataLocator(store, value.getValueLocation, value.getValueLength) - val msg = getMessageWithRetry(locator) - msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq)) - msg.getMessageId().setDataLocator(locator) - func(msg) + val msg = getMessage(locator) + if( msg !=null ) { + msg.getMessageId().setEntryLocator(EntryLocator(collectionKey, seq)) + msg.getMessageId().setDataLocator(locator) + func(msg) + } else { + warn("Could not load XA message seq: "+seq+" from "+locator) + true + } } } } @@ -1295,22 +1297,6 @@ class LevelDBClient(store: LevelDBStore) { } } - def getMessageWithRetry(locator:AnyRef):Message = { - var retry = 0 - var rc = getMessage(locator); - while( rc == null ) { - if( retry > 10 ) - return null; - Thread.sleep(retry*10) - rc = getMessage(locator); - retry+=1 - } - if( retry > 0 ) { - info("Recovered from 'failed getMessage' on retry: "+retry) - } - rc - } - def getMessage(locator:AnyRef):Message = { assert(locator!=null) val buffer = locator match { @@ -1487,9 +1473,7 @@ class LevelDBClient(store: LevelDBStore) { batch.put(key, index_data) if( kind==LOG_ADD_ENTRY ) { - for (key <- logRefKey(dataLocator.pos, log_info)) { - logRefs.getOrElseUpdate(key, new LongCounter()).incrementAndGet() - } + logRefIncrement(dataLocator.pos) collectionIncrementSize(entry.queueKey, log_record.getEntryKey.toByteArray) } @@ -1537,11 +1521,11 @@ class LevelDBClient(store: LevelDBStore) { log_record.setCollectionKey(entry.subKey) log_record.setEntryKey(ACK_POSITION) log_record.setValueLocation(entry.ackPosition) - appender.append(LOG_ADD_ENTRY, encodeEntryRecord(log_record.freeze())) + appender.append(LOG_UPDATE_ENTRY, encodeEntryRecord(log_record.freeze())) val index_record = new EntryRecord.Bean() index_record.setValueLocation(entry.ackPosition) - batch.put(key, encodeEntryRecord(log_record.freeze()).toByteArray) + batch.put(key, encodeEntryRecord(index_record.freeze()).toByteArray) } if (uow.syncNeeded) { @@ -1625,17 +1609,6 @@ class LevelDBClient(store: LevelDBStore) { def gc(topicPositions:Seq[(Long, Long)]):Unit = { - detect_if_compact_needed - - // Lets compact the leveldb index if it looks like we need to. - if( index.compact_needed ) { - debug("Compacting the leveldb index at: %s", dirtyIndexFile) - val start = System.nanoTime() - index.compact - val duration = System.nanoTime() - start; - info("Compacted the leveldb index at: %s in %.2f ms", dirtyIndexFile, (duration / 1000000.0)) - } - // Delete message refs for topics who's consumers have advanced.. if( !topicPositions.isEmpty ) { might_fail_using_index { @@ -1646,6 +1619,7 @@ class LevelDBClient(store: LevelDBStore) { ro.verifyChecksums(verifyChecksums) val start = encodeEntryKey(ENTRY_PREFIX, topic, 0) val end = encodeEntryKey(ENTRY_PREFIX, topic, first) + debug("Topic: %d GC to seq: %d", topic, first) index.cursorRange(start, end, ro) { case (key, value) => val entry = EntryRecord.FACTORY.parseUnframed(value) batch.delete(key) @@ -1657,8 +1631,30 @@ class LevelDBClient(store: LevelDBStore) { } } + detect_if_compact_needed + + // Lets compact the leveldb index if it looks like we need to. + if( index.compact_needed ) { + val start = System.nanoTime() + index.compact + val duration = System.nanoTime() - start; + info("Compacted the leveldb index at: %s in %.2f ms", dirtyIndexFile, (duration / 1000000.0)) + } + import collection.JavaConversions._ lastIndexSnapshotPos + + // drop the logs that are no longer referenced. + for( (x,y) <- logRefs.toSeq ) { + if( y.get() <= 0 ) { + if( y.get() < 0 ) { + warn("Found a negative log reference for log: "+x) + } + debug("Log no longer referenced: %x", x) + logRefs.remove(x) + } + } + val emptyJournals = log.log_infos.keySet.toSet -- logRefs.keySet // We don't want to delete any journals that the index has not snapshot'ed or @@ -1669,6 +1665,7 @@ class LevelDBClient(store: LevelDBStore) { emptyJournals.foreach { id => if ( id < deleteLimit ) { + debug("Deleting log at %x", id) log.delete(id) } } http://git-wip-us.apache.org/repos/asf/activemq/blob/3faddcd2/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 43238a9..7b90b0c 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -669,6 +669,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P lastSeq.set(db.getLastQueueEntrySeq(key)) + def cursorResetPosition = 0L + def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef] = { check_running val seq = lastSeq.incrementAndGet() @@ -731,7 +733,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def removeAllMessages(context: ConnectionContext): Unit = { check_running db.collectionEmpty(key) - cursorPosition = 0 + cursorPosition = cursorResetPosition } def getMessageCount: Int = { @@ -744,11 +746,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def recover(listener: MessageRecoveryListener): Unit = { check_running - cursorPosition = db.cursorMessages(preparedAcks, key, listener, 0) + cursorPosition = db.cursorMessages(preparedAcks, key, listener, cursorResetPosition) } def resetBatching: Unit = { - cursorPosition = 0 + cursorPosition = cursorResetPosition } def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit = { @@ -789,6 +791,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P val subscriptions = collection.mutable.HashMap[(String, String), DurableSubscription]() var firstSeq = 0L + override def cursorResetPosition = firstSeq + def subscription_with_key(key:Long) = subscriptions.find(_._2.subKey == key).map(_._2) override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay: Boolean): Future[AnyRef] = { http://git-wip-us.apache.org/repos/asf/activemq/blob/3faddcd2/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala index 28e1be1..08f18a7 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/RecordLog.scala @@ -517,42 +517,43 @@ case class RecordLog(directory: File, logSuffix:String) { log_infos.map(_._2.position).toArray } - private def get_reader[T](record_position:Long)(func: (LogReader)=>T) = { - - val lookup = log_mutex.synchronized { - val info = log_info(record_position) - info.map { info=> - if(info.position == current_appender.position) { - current_appender.retain() - (info, current_appender) - } else { - (info, null) - } + private def get_reader[T](record_position:Long)(func: (LogReader)=>T):Option[T] = { + + val (info, appender) = log_mutex.synchronized { + log_info(record_position) match { + case None => + warn("No reader available for position: %x, log_infos: %s", record_position, log_infos) + return None + case Some(info) => + if(info.position == current_appender.position) { + current_appender.retain() + (info, current_appender) + } else { + (info, null) + } } } - lookup.map { case (info, appender) => - val reader = if( appender!=null ) { - // read from the current appender. - appender - } else { - // Checkout a reader from the cache... - reader_cache.synchronized { - var reader = reader_cache.get(info.file) - if(reader==null) { - reader = LogReader(info.file, info.position) - reader_cache.put(info.file, reader) - } - reader.retain() - reader + val reader = if( appender!=null ) { + // read from the current appender. + appender + } else { + // Checkout a reader from the cache... + reader_cache.synchronized { + var reader = reader_cache.get(info.file) + if(reader==null) { + reader = LogReader(info.file, info.position) + reader_cache.put(info.file, reader) } + reader.retain() + reader } + } - try { - func(reader) - } finally { - reader.release - } + try { + Some(func(reader)) + } finally { + reader.release } }
