Author: chirino
Date: Fri Oct 22 15:26:42 2010
New Revision: 1026373
URL: http://svn.apache.org/viewvc?rev=1026373&view=rev
Log:
More robust store start/stop logic.
Modified:
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Logging.scala
Modified:
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala?rev=1026373&r1=1026372&r2=1026373&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBClient.scala
Fri Oct 22 15:26:42 2010
@@ -138,7 +138,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
}
- val schedule_version = new AtomicInteger()
def start(onComplete:Runnable) = {
lock {
@@ -207,15 +206,10 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
recover(onComplete)
-
- // Schedule periodic jobs.. they keep executing while schedule_version
remains the same.
- scheduleCleanup(schedule_version.get())
- scheduleFlush(schedule_version.get())
}
}
def stop() = {
- schedule_version.incrementAndGet
journal.close
indexFileFactory.close
lockFile.unlock
@@ -912,17 +906,6 @@ class HawtDBClient(hawtDBStore: HawtDBSt
//
/////////////////////////////////////////////////////////////////////
- def scheduleFlush(version:Int): Unit = {
- def try_flush() = {
- if (version == schedule_version.get) {
- hawtDBStore.executor_pool {
- flush
- scheduleFlush(version)
- }
- }
- }
- dispatchQueue.dispatchAfter(config.index_flush_interval,
TimeUnit.MILLISECONDS, ^ {try_flush})
- }
def flush() = {
val start = System.currentTimeMillis()
@@ -933,25 +916,15 @@ class HawtDBClient(hawtDBStore: HawtDBSt
}
}
- def scheduleCleanup(version:Int): Unit = {
- def try_cleanup() = {
- if (version == schedule_version.get) {
- hawtDBStore.executor_pool {
- withTx {tx =>
- cleanup(tx)
- }
- scheduleCleanup(version)
- }
- }
- }
- dispatchQueue.dispatchAfter(config.cleanup_interval,
TimeUnit.MILLISECONDS, ^ {try_cleanup})
+ def cleanup():Unit = withTx {tx =>
+ cleanup(tx)
}
/**
* @param tx
* @throws IOException
*/
- def cleanup(tx:Transaction) = {
+ def cleanup(tx:Transaction):Unit = {
val helper = new TxHelper(tx)
import JavaConversions._
import helper._
Modified:
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala?rev=1026373&r1=1026372&r2=1026373&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/store/hawtdb/HawtDBStore.scala
Fri Oct 22 15:26:42 2010
@@ -16,13 +16,13 @@
*/
package org.apache.activemq.apollo.store.hawtdb
-import java.util.concurrent.atomic.AtomicLong
import collection.mutable.ListBuffer
import java.util.HashMap
import collection.{Seq}
import org.fusesource.hawtdispatch.ScalaDispatch._
import java.io.File
import java.util.concurrent._
+import atomic.{AtomicInteger, AtomicLong}
import org.apache.activemq.apollo.dto._
import org.apache.activemq.apollo.store._
import org.apache.activemq.apollo.util._
@@ -65,9 +65,13 @@ class HawtDBStore extends DelayingStoreS
var next_msg_key = new AtomicLong(1)
var executor_pool:ExecutorService = _
+ val schedule_version = new AtomicInteger()
var config:HawtDBStoreDTO = defaultConfig
val client = new HawtDBClient(this)
+ val load_source = createSource(new ListEventAggregator[(Long,
(Option[MessageRecord])=>Unit)](), dispatchQueue)
+ load_source.setEventHandler(^{drain_loads});
+
override def toString = "hawtdb store"
def flush_delay = config.flush_delay
@@ -111,14 +115,44 @@ class HawtDBStore extends DelayingStoreS
client.start(^{
next_msg_key.set( client.rootBuffer.getLastMessageKey.longValue +1 )
next_queue_key.set( client.rootBuffer.getLastQueueKey.longValue +1 )
+ val v = schedule_version.incrementAndGet
+ scheduleCleanup(v)
+ scheduleFlush(v)
+ load_source.resume
onCompleted.run
})
}
}
+ def scheduleFlush(version:Int): Unit = {
+ def try_flush() = {
+ if (version == schedule_version.get) {
+ executor_pool {
+ client.flush
+ scheduleFlush(version)
+ }
+ }
+ }
+ dispatchQueue.dispatchAfter(config.index_flush_interval,
TimeUnit.MILLISECONDS, ^ {try_flush})
+ }
+
+ def scheduleCleanup(version:Int): Unit = {
+ def try_cleanup() = {
+ if (version == schedule_version.get) {
+ executor_pool {
+ client.cleanup()
+ scheduleCleanup(version)
+ }
+ }
+ }
+ dispatchQueue.dispatchAfter(config.cleanup_interval,
TimeUnit.MILLISECONDS, ^ {try_cleanup})
+ }
+
protected def _stop(onCompleted: Runnable) = {
+ schedule_version.incrementAndGet
new Thread() {
override def run = {
+ load_source.suspend
executor_pool.shutdown
executor_pool.awaitTermination(86400, TimeUnit.SECONDS)
executor_pool = null
@@ -181,11 +215,6 @@ class HawtDBStore extends DelayingStoreS
}
}
- val load_source = createSource(new ListEventAggregator[(Long,
(Option[MessageRecord])=>Unit)](), dispatchQueue)
- load_source.setEventHandler(^{drain_loads});
- load_source.resume
-
-
def loadMessage(messageKey: Long)(callback: (Option[MessageRecord]) => Unit)
= {
message_load_latency_counter.start { end=>
load_source.merge((messageKey, { (result)=>
Modified:
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Logging.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Logging.scala?rev=1026373&r1=1026372&r2=1026373&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Logging.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/Logging.scala
Fri Oct 22 15:26:42 2010
@@ -233,7 +233,7 @@ trait DispatchLogging extends Logging {
override protected def log_map(message:String) = {
val d = getCurrentQueue
- if( d!=null ) {
+ if( d!=null && d.getLabe!=null ) {
d.getLabel+" | "+message
} else {
message