Adding assertions to make sure that we only append to the log from the write 
thread.  Found a code path that was appending to the log from a different 
thread.  This might have been affecting 
https://issues.apache.org/jira/browse/AMQ-4882


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2e5c9d59
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2e5c9d59
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2e5c9d59

Branch: refs/heads/activemq-5.9
Commit: 2e5c9d5929ffc7a1a832c831faf2467b410f98f3
Parents: 71b5523
Author: Hiram Chirino <[email protected]>
Authored: Tue Dec 3 12:04:40 2013 -0500
Committer: Hadrian Zbarcea <[email protected]>
Committed: Wed Mar 12 13:12:50 2014 -0400

----------------------------------------------------------------------
 .../org/apache/activemq/leveldb/DBManager.scala   |  2 +-
 .../apache/activemq/leveldb/LevelDBClient.scala   | 18 +++++++++++++-----
 2 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2e5c9d59/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index 00260d9..6b575ee 100644
--- 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -798,7 +798,7 @@ class DBManager(val parent:LevelDBStore) {
   def createTransactionContainer(id:XATransactionId) =
     createCollection(buffer(parent.wireFormat.marshal(id)), 
TRANSACTION_COLLECTION_TYPE)
 
-  def removeTransactionContainer(key:Long) = { // writeExecutor.sync {
+  def removeTransactionContainer(key:Long) = writeExecutor.sync {
     client.removeCollection(key)
   }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/2e5c9d59/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
----------------------------------------------------------------------
diff --git 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
index c0cedce..fe29012 100755
--- 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
+++ 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
@@ -59,6 +59,10 @@ import org.apache.activemq.broker.SuppressReplyException
  */
 object LevelDBClient extends Log {
 
+  class WriteThread(r:Runnable) extends Thread(r) {
+    setDaemon(true)
+  }
+
   final val STORE_SCHEMA_PREFIX = "activemq_leveldb_store:"
   final val STORE_SCHEMA_VERSION = 1
 
@@ -512,6 +516,7 @@ class LevelDBClient(store: LevelDBStore) {
   }
 
   def storeTrace(ascii:String, force:Boolean=false) = {
+    assert_write_thread_executing
     val time = new SimpleDateFormat("dd/MMM/yyyy:HH:mm::ss Z").format(new Date)
     log.appender { appender =>
       appender.append(LOG_TRACE, new AsciiBuffer("%s: %s".format(time, ascii)))
@@ -566,6 +571,8 @@ class LevelDBClient(store: LevelDBStore) {
     replay_write_batch = null;
   }
 
+  def assert_write_thread_executing = assert(Thread.currentThread().getClass 
== classOf[WriteThread])
+
   def init() ={
 
     // Lets check store compatibility...
@@ -590,11 +597,7 @@ class LevelDBClient(store: LevelDBStore) {
     version_file.writeText(STORE_SCHEMA_PREFIX + STORE_SCHEMA_VERSION)
 
     writeExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
-      def newThread(r: Runnable) = {
-        val rc = new Thread(r, "LevelDB store io write")
-        rc.setDaemon(true)
-        rc
-      }
+      def newThread(r: Runnable) = new WriteThread(r)
     })
 
     val factoryNames = store.indexFactory
@@ -1125,6 +1128,8 @@ class LevelDBClient(store: LevelDBStore) {
   }
 
   def addCollection(record: CollectionRecord.Buffer) = {
+    assert_write_thread_executing
+
     val key = encodeLongKey(COLLECTION_PREFIX, record.getKey)
     val value = record.toUnframedBuffer
     might_fail_using_index {
@@ -1153,6 +1158,7 @@ class LevelDBClient(store: LevelDBStore) {
   }
 
   def removeCollection(collectionKey: Long) = {
+    assert_write_thread_executing
     val key = encodeLongKey(COLLECTION_PREFIX, collectionKey)
     val value = encodeVLong(collectionKey)
     val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
@@ -1181,6 +1187,7 @@ class LevelDBClient(store: LevelDBStore) {
   }
 
   def collectionEmpty(collectionKey: Long) = {
+    assert_write_thread_executing
     val key = encodeLongKey(COLLECTION_PREFIX, collectionKey)
     val value = encodeVLong(collectionKey)
     val entryKeyPrefix = encodeLongKey(ENTRY_PREFIX, collectionKey)
@@ -1366,6 +1373,7 @@ class LevelDBClient(store: LevelDBStore) {
   val max_index_write_latency = TimeMetric()
 
   def store(uows: Array[DelayableUOW]) {
+    assert_write_thread_executing
     might_fail_using_index {
       log.appender { appender =>
         val syncNeeded = index.write(new WriteOptions, 
max_index_write_latency) { batch =>

Reply via email to