KAFKA-840 Post commit
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3ead78b9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3ead78b9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3ead78b9 Branch: refs/heads/trunk Commit: 3ead78b966a8464efa6d53fbb81fcb0efc75ba63 Parents: 5a50f7e Author: Neha Narkhede <[email protected]> Authored: Wed Apr 3 23:40:05 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Wed Apr 3 23:40:05 2013 -0700 ---------------------------------------------------------------------- .../scala/kafka/controller/PartitionStateMachine.scala | 13 +++++++------ .../scala/kafka/controller/ReplicaStateMachine.scala | 13 +++++++------ 2 files changed, 14 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3ead78b9/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 156bb10..c017727 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -44,7 +44,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private val zkClient = controllerContext.zkClient var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId, controller.clientId) - private val isRunning = new AtomicBoolean(false) + private val hasStarted = new AtomicBoolean(false) + private val hasShutdown = new AtomicBoolean(false) private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext) this.logIdent = "[Partition state machine on Controller " + controllerId + "]: " private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) @@ -57,7 +58,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { def startup() { // initialize partition state initializePartitionState() - isRunning.set(true) + hasStarted.set(true) // try to move partitions to online state triggerOnlinePartitionStateChange() info("Started partition state machine with initial state -> " + partitionState.toString()) @@ -72,7 +73,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * Invoked on controller shutdown. */ def shutdown() { - isRunning.compareAndSet(true, false) + hasShutdown.compareAndSet(false, true) partitionState.clear() } @@ -125,7 +126,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private def handleStateChange(topic: String, partition: Int, targetState: PartitionState, leaderSelector: PartitionLeaderSelector) { val topicAndPartition = TopicAndPartition(topic, partition) - if (!isRunning.get) + if (!hasStarted.get) throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " + "the partition state machine has not started") .format(controllerId, controller.epoch, topicAndPartition, targetState)) @@ -356,8 +357,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { @throws(classOf[Exception]) def handleChildChange(parentPath : String, children : java.util.List[String]) { - if(isRunning.get) { - controllerContext.controllerLock synchronized { + controllerContext.controllerLock synchronized { + if (!hasShutdown.get) { try { debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(","))) val currentChildren = JavaConversions.asBuffer(children).toSet http://git-wip-us.apache.org/repos/asf/kafka/blob/3ead78b9/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index ef2356f..bea1644 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -43,7 +43,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { private val zkClient = controllerContext.zkClient var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId, controller.clientId) - private val isRunning = new AtomicBoolean(false) + private val hasStarted = new AtomicBoolean(false) + private val hasShutdown = new AtomicBoolean(false) this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: " private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) @@ -55,7 +56,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { def startup() { // initialize replica state initializeReplicaState() - isRunning.set(true) + hasStarted.set(true) // move all Online replicas to Online handleStateChanges(ZkUtils.getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, controllerContext.liveBrokerIds.toSeq), OnlineReplica) @@ -71,7 +72,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { * Invoked on controller shutdown. */ def shutdown() { - isRunning.compareAndSet(true, false) + hasShutdown.compareAndSet(false, true) replicaState.clear() } @@ -102,7 +103,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { */ def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) { val topicAndPartition = TopicAndPartition(topic, partition) - if (!isRunning.get) + if (!hasStarted.get) throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " + "to %s failed because replica state machine has not started") .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState)) @@ -239,8 +240,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) { ControllerStats.leaderElectionTimer.time { info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(","))) - if(isRunning.get) { - controllerContext.controllerLock synchronized { + controllerContext.controllerLock synchronized { + if (!hasShutdown.get) { try { val curBrokerIds = currentBrokerList.map(_.toInt).toSet val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
