Updated Branches: refs/heads/trunk 8d4fef8af -> 1eca03135
leveldb replication Master was failing to give up being master after it's process is suspended by using ctrl-z. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1eca0313 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1eca0313 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1eca0313 Branch: refs/heads/trunk Commit: 1eca0313562c7e1d38b9d6f0c23478438b7f9943 Parents: 8d4fef8 Author: Hiram Chirino <[email protected]> Authored: Tue Sep 3 10:38:30 2013 -0400 Committer: Hiram Chirino <[email protected]> Committed: Tue Sep 3 10:41:33 2013 -0400 ---------------------------------------------------------------------- .../apache/activemq/leveldb/LevelDBClient.scala | 13 +++++++--- .../leveldb/replicated/MasterElector.scala | 4 +++- .../leveldb/replicated/MasterLevelDBStore.scala | 14 +++++++++-- .../replicated/groups/ClusteredSingleton.scala | 25 +++++++++++++++++++- .../groups/internal/ChangeListenerSupport.scala | 6 ++--- .../groups/internal/ZooKeeperGroup.scala | 18 ++++++++++---- 6 files changed, 66 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/1eca0313/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala index fb574df..4bbfda5 100755 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala @@ -511,12 +511,20 @@ class LevelDBClient(store: LevelDBStore) { def might_fail[T](func : =>T):T = { def handleFailure(e:IOException) = { + var failure:Throwable = e; if( store.broker_service !=null ) { // This should start stopping the broker but it might block, // so do it on another thread... new Thread("LevelDB IOException handler.") { override def run() { - store.broker_service.handleIOException(e); + try { + store.broker_service.handleIOException(e) + } catch { + case e:RuntimeException => + failure = e + } finally { + store.stop() + } } }.start() // Lets wait until the broker service has started stopping. Once the @@ -526,8 +534,7 @@ class LevelDBClient(store: LevelDBStore) { Thread.sleep(100); } } - store.stop() - throw e; + throw failure; } try { func http://git-wip-us.apache.org/repos/asf/activemq/blob/1eca0313/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala index 88fcc9d..b5d8e10 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala @@ -85,7 +85,9 @@ class MasterElector(store: ElectingLevelDBStore) extends ClusteredSingleton[Leve object change_listener extends ChangeListener { def connected = changed - def disconnected = changed + def disconnected = { + changed + } def changed:Unit = elector.synchronized { // info(eid+" cluster state changed: "+members) http://git-wip-us.apache.org/repos/asf/activemq/blob/1eca0313/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala index ee0d4da..0318eff 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterLevelDBStore.scala @@ -357,7 +357,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { return } - if( isStopped ) { + if( isStoppedOrStopping ) { throw new IllegalStateException("Store replication stopped") } @@ -368,13 +368,23 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait { } while( !position_sync.await(1, TimeUnit.SECONDS) ) { - if( isStopped ) { + if( isStoppedOrStopping ) { throw new IllegalStateException("Store replication stopped") } warn("Store update waiting on %d replica(s) to catch up to log position %d. %s", minSlaveAcks, position, status) } } + + def isStoppedOrStopping: Boolean = { + if( isStopped || isStopping ) + return true + if( broker_service!=null && broker_service.isStopping ) + return true + false + } + + def replicate_wal(file:File, position:Long, offset:Long, length:Long):Unit = { if( length > 0 ) { val value = new LogWrite http://git-wip-us.apache.org/repos/asf/activemq/blob/1eca0313/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala index 20b3d33..40eadf7 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala @@ -27,6 +27,7 @@ import java.util.LinkedHashMap import java.lang.{IllegalStateException, String} import reflect.BeanProperty import org.codehaus.jackson.annotate.JsonProperty +import org.apache.zookeeper.KeeperException.NoNodeException /** * @author <a href="http://hiramchirino.com">Hiram Chirino</a> @@ -108,16 +109,20 @@ class ClusteredSingletonWatcher[T <: NodeState](val stateClass:Class[T]) extends } def connected = { + onConnected changed ClusteredSingletonWatcher.this.fireConnected } def disconnected = { + onDisconnected changed ClusteredSingletonWatcher.this.fireDisconnected } } + protected def onConnected = {} + protected def onDisconnected = {} def start(group:Group) = this.synchronized { if(_group !=null ) @@ -223,8 +228,26 @@ class ClusteredSingleton[T <: NodeState ](stateClass:Class[T]) extends Clustered if(_group==null) throw new IllegalStateException("Not started.") + this._state = state - _group.update(_eid, encode(state, mapper)) + try { + _group.update(_eid, encode(state, mapper)) + } catch { + case e:NoNodeException => + this._state = null.asInstanceOf[T] + join(state) + } + } + + override protected def onDisconnected { + this._eid = null + } + + override protected def onConnected { + if( this.eid==null && this._state!=null ) { + this._state = null.asInstanceOf[T] + join(this._state) + } } def isMaster:Boolean = this.synchronized { http://git-wip-us.apache.org/repos/asf/activemq/blob/1eca0313/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala index 20c50ba..763059a 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala @@ -51,7 +51,7 @@ trait ChangeListenerSupport { } def fireConnected() = { - val listener = this.synchronized { this.listeners } + val listeners = this.synchronized { this.listeners } check_elapsed_time { for (listener <- listeners) { listener.connected @@ -60,7 +60,7 @@ trait ChangeListenerSupport { } def fireDisconnected() = { - val listener = this.synchronized { this.listeners } + val listeners = this.synchronized { this.listeners } check_elapsed_time { for (listener <- listeners) { listener.disconnected @@ -69,7 +69,7 @@ trait ChangeListenerSupport { } def fireChanged() = { - val listener = this.synchronized { this.listeners } + val listeners = this.synchronized { this.listeners } val start = System.nanoTime() check_elapsed_time { for (listener <- listeners) { http://git-wip-us.apache.org/repos/asf/activemq/blob/1eca0313/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala index 9df9125..f416013 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala @@ -91,7 +91,10 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends Group with Life def connected = zk.isConnected def onConnected() = fireConnected() - def onDisconnected() = fireDisconnected() + def onDisconnected() = { + this.members = new LinkedHashMap() + fireDisconnected() + } def join(data:Array[Byte]=null): String = this.synchronized { val id = zk.createWithParents(member_path_prefix, data, CreateMode.EPHEMERAL_SEQUENTIAL).stripPrefix(member_path_prefix) @@ -102,9 +105,16 @@ class ZooKeeperGroup(val zk: ZKClient, val root: String) extends Group with Life def update(path:String, data:Array[Byte]=null): Unit = this.synchronized { joins.get(path) match { case Some(ver) => - val stat = zk.setData(member_path_prefix+path, data, ver) - joins.put(path, stat.getVersion) - case None => throw new IllegalArgumentException("Has not joined locally: "+path) + try { + val stat = zk.setData(member_path_prefix + path, data, ver) + joins.put(path, stat.getVersion) + } + catch { + case e:NoNodeException => + joins.remove(path) + throw e; + } + case None => throw new NoNodeException("Has not joined locally: "+path) } }
