Adding assertions to make sure that we only append to the log from the write thread. Found a code path that was appending to the log from a different thread. This might have been affecting https://issues.apache.org/jira/browse/AMQ-4882
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2e5c9d59 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2e5c9d59 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2e5c9d59 Branch: refs/heads/activemq-5.9 Commit: 2e5c9d5929ffc7a1a832c831faf2467b410f98f3 Parents: 71b5523 Author: Hiram Chirino <[email protected]> Authored: Tue Dec 3 12:04:40 2013 -0500 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Mar 12 13:12:50 2014 -0400 ---------------------------------------------------------------------- .../org/apache/activemq/leveldb/DBManager.scala | 2 +- .../apache/activemq/leveldb/LevelDBClient.scala | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/2e5c9d59/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 00260d9..6b575ee 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -798,7 +798,7 @@ class DBManager(val parent:LevelDBStore) { def createTransactionContainer(id:XATransactionId) = createCollection(buffer(parent.wireFormat.marshal(id)), TRANSACTION_COLLECTION_TYPE) - def removeTransactionContainer(key:Long) = { // writeExecutor.sync { + def removeTransactionContainer(key:Long) = writeExecutor.sync { client.removeCollection(key) } http://git-wip-us.apache.org/repos/asf/activemq/blob/2e5c9d59/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index c0cedce..fe29012 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -59,6 +59,10 @@ import org.apache.activemq.broker.SuppressReplyException */ object LevelDBClient extends Log { + class WriteThread(r:Runnable) extends Thread(r) { + setDaemon(true) + } + final val STORE_SCHEMA_PREFIX = "activemq_leveldb_store:" final val STORE_SCHEMA_VERSION = 1 @@ -512,6 +516,7 @@ class LevelDBClient(store: LevelDBStore) { } def storeTrace(ascii:String, force:Boolean=false) = { + assert_write_thread_executing val time = new SimpleDateFormat("dd/MMM/yyyy:HH:mm::ss Z").format(new Date) log.appender { appender => appender.append(LOG_TRACE, new AsciiBuffer("%s: %s".format(time, ascii))) @@ -566,6 +571,8 @@ class LevelDBClient(store: LevelDBStore) { replay_write_batch = null; } + def assert_write_thread_executing = assert(Thread.currentThread().getClass == classOf[WriteThread]) + def init() ={ // Lets check store compatibility... @@ -590,11 +597,7 @@ class LevelDBClient(store: LevelDBStore) { version_file.writeText(STORE_SCHEMA_PREFIX + STORE_SCHEMA_VERSION) writeExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() { - def newThread(r: Runnable) = { - val rc = new Thread(r, "LevelDB store io write") - rc.setDaemon(true) - rc - } + def newThread(r: Runnable) = new WriteThread(r) }) val factoryNames = store.indexFactory @@ -1125,6 +1128,8 @@ class LevelDBClient(store: LevelDBStore) { } def addCollection(record: CollectionRecord.Buffer) = { + assert_write_thread_executing + val key = encodeLongKey(COLLECTION_PREFIX, record.getKey) val value = record.toUnframedBuffer might_fail_using_index { @@ -1153,6 +1158,7 @@ class LevelDBClient(store: LevelDBStore) { } def removeCollection(collectionKey: Long) = { + assert_write_thread_executing val key = encodeLongKey(COLLECTION_PREFIX, collectionKey) val value = encodeVLong(collectionKey) val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey) @@ -1181,6 +1187,7 @@ class LevelDBClient(store: LevelDBStore) { } def collectionEmpty(collectionKey: Long) = { + assert_write_thread_executing val key = encodeLongKey(COLLECTION_PREFIX, collectionKey) val value = encodeVLong(collectionKey) val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey) @@ -1366,6 +1373,7 @@ class LevelDBClient(store: LevelDBStore) { val max_index_write_latency = TimeMetric() def store(uows: Array[DelayableUOW]) { + assert_write_thread_executing might_fail_using_index { log.appender { appender => val syncNeeded = index.write(new WriteOptions, max_index_write_latency) { batch =>
