Author: chirino
Date: Fri Jan 20 18:52:46 2012
New Revision: 1234067
URL: http://svn.apache.org/viewvc?rev=1234067&view=rev
Log:
Fixes APLO-134 : LevelDB store should use a file lock so a broker instance get
exclusive access to the store data.
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java?rev=1234067&r1=1234066&r2=1234067&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/dto/LevelDBStoreDTO.java
Fri Jan 20 18:52:46 2012
@@ -42,6 +42,9 @@ public class LevelDBStoreDTO extends Sto
@XmlAttribute(name="paranoid_checks")
public Boolean paranoid_checks;
+
+ @XmlAttribute(name="fail_if_locked")
+ public Boolean fail_if_locked;
@XmlAttribute(name="verify_checksums")
public Boolean verify_checksums;
Modified:
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1234067&r1=1234066&r2=1234067&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Fri Jan 20 18:52:46 2012
@@ -282,12 +282,17 @@ class LevelDBClient(store: LevelDBStore)
}
}
+ lock_file = new LockFile(directory / "lock", true)
+
def time[T](func: =>T):Long = {
val start = System.nanoTime()
func
System.nanoTime() - start
}
-
+
+ // Lock before we open anything..
+ lock_store
+
val log_open_duration = time {
retry {
log.open
@@ -505,7 +510,23 @@ class LevelDBClient(store: LevelDBStore)
}
}
-
+ var lock_file:LockFile = _
+
+ def lock_store = {
+ import OptionSupport._
+ if (config.fail_if_locked.getOrElse(false)) {
+ lock_file.lock()
+ } else {
+ retry {
+ lock_file.lock()
+ }
+ }
+ }
+
+ def unlock_store = {
+ lock_file.unlock()
+ }
+
private def store_log_refs = {
index.put(log_refs_index_key,
JsonCodec.encode(collection.JavaConversions.mapAsJavaMap(log_refs.mapValues(_.get()))).toByteArray)
}
@@ -530,6 +551,7 @@ class LevelDBClient(store: LevelDBStore)
}
copy_dirty_index_to_snapshot
log = null
+ unlock_store
}
def using_index[T](func: =>T):T = {
@@ -639,7 +661,7 @@ class LevelDBClient(store: LevelDBStore)
if (!rc.isDefined) {
// We may need to give up if the store is being stopped.
- if ( !store.service_state.is_started ) {
+ if ( !store.service_state.is_starting_or_started ) {
throw error
}
Thread.sleep(1000)