Updated Branches: refs/heads/trunk f0334862a -> 5fa462a08
Adding assertions to make sure that we only append to the log from the write thread. Found a code path that was appending to the log from a different thread. This might have been affecting https://issues.apache.org/jira/browse/AMQ-4882 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5fa462a0 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5fa462a0 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5fa462a0 Branch: refs/heads/trunk Commit: 5fa462a08acd40b130fb98ad359a838def690450 Parents: f033486 Author: Hiram Chirino <[email protected]> Authored: Tue Dec 3 12:04:40 2013 -0500 Committer: Hiram Chirino <[email protected]> Committed: Tue Dec 3 12:05:16 2013 -0500 ---------------------------------------------------------------------- .../org/apache/activemq/leveldb/DBManager.scala | 2 +- .../apache/activemq/leveldb/LevelDBClient.scala | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/5fa462a0/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index 00260d9..6b575ee 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -798,7 +798,7 @@ class DBManager(val parent:LevelDBStore) { def createTransactionContainer(id:XATransactionId) = createCollection(buffer(parent.wireFormat.marshal(id)), TRANSACTION_COLLECTION_TYPE) - def removeTransactionContainer(key:Long) = { // writeExecutor.sync { + def removeTransactionContainer(key:Long) = writeExecutor.sync { client.removeCollection(key) } http://git-wip-us.apache.org/repos/asf/activemq/blob/5fa462a0/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index c0cedce..fe29012 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -59,6 +59,10 @@ import org.apache.activemq.broker.SuppressReplyException */ object LevelDBClient extends Log { + class WriteThread(r:Runnable) extends Thread(r) { + setDaemon(true) + } + final val STORE_SCHEMA_PREFIX = "activemq_leveldb_store:" final val STORE_SCHEMA_VERSION = 1 @@ -512,6 +516,7 @@ class LevelDBClient(store: LevelDBStore) { } def storeTrace(ascii:String, force:Boolean=false) = { + assert_write_thread_executing val time = new SimpleDateFormat("dd/MMM/yyyy:HH:mm::ss Z").format(new Date) log.appender { appender => appender.append(LOG_TRACE, new AsciiBuffer("%s: %s".format(time, ascii))) @@ -566,6 +571,8 @@ class LevelDBClient(store: LevelDBStore) { replay_write_batch = null; } + def assert_write_thread_executing = assert(Thread.currentThread().getClass == classOf[WriteThread]) + def init() ={ // Lets check store compatibility... @@ -590,11 +597,7 @@ class LevelDBClient(store: LevelDBStore) { version_file.writeText(STORE_SCHEMA_PREFIX + STORE_SCHEMA_VERSION) writeExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() { - def newThread(r: Runnable) = { - val rc = new Thread(r, "LevelDB store io write") - rc.setDaemon(true) - rc - } + def newThread(r: Runnable) = new WriteThread(r) }) val factoryNames = store.indexFactory @@ -1125,6 +1128,8 @@ class LevelDBClient(store: LevelDBStore) { } def addCollection(record: CollectionRecord.Buffer) = { + assert_write_thread_executing + val key = encodeLongKey(COLLECTION_PREFIX, record.getKey) val value = record.toUnframedBuffer might_fail_using_index { @@ -1153,6 +1158,7 @@ class LevelDBClient(store: LevelDBStore) { } def removeCollection(collectionKey: Long) = { + assert_write_thread_executing val key = encodeLongKey(COLLECTION_PREFIX, collectionKey) val value = encodeVLong(collectionKey) val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey) @@ -1181,6 +1187,7 @@ class LevelDBClient(store: LevelDBStore) { } def collectionEmpty(collectionKey: Long) = { + assert_write_thread_executing val key = encodeLongKey(COLLECTION_PREFIX, collectionKey) val value = encodeVLong(collectionKey) val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey) @@ -1366,6 +1373,7 @@ class LevelDBClient(store: LevelDBStore) { val max_index_write_latency = TimeMetric() def store(uows: Array[DelayableUOW]) { + assert_write_thread_executing might_fail_using_index { log.appender { appender => val syncNeeded = index.write(new WriteOptions, max_index_write_latency) { batch =>
