Updated Branches: refs/heads/0.8 06911384f -> a9ce73cfd
KAFKA-840 Controller tries to perform preferred replica election on failover before state machines have started up; reviewed by Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a9ce73cf Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a9ce73cf Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a9ce73cf Branch: refs/heads/0.8 Commit: a9ce73cfd36bdb197e722ce8b828f27153f9e4ed Parents: 0691138 Author: Swapnil Ghike <sgh...@linkedin.com> Authored: Mon Apr 1 09:17:12 2013 -0700 Committer: Neha Narkhede <neha.narkh...@gmail.com> Committed: Mon Apr 1 09:17:20 2013 -0700 ---------------------------------------------------------------------- .../scala/kafka/controller/KafkaController.scala | 4 ++-- .../kafka/controller/PartitionStateMachine.scala | 12 ++++++++---- .../kafka/controller/ReplicaStateMachine.scala | 12 ++++++++---- 3 files changed, 18 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ce73cf/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 47d4d7b..74614d8 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -242,6 +242,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg replicaStateMachine.startup() Utils.registerMBean(this, KafkaController.MBeanName) info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) + initializeAndMaybeTriggerPartitionReassignment() + initializeAndMaybeTriggerPreferredReplicaElection() } else info("Controller has been shut down, aborting startup/failover") @@ -483,8 +485,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds)) info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds)) info("Current list of topics in the cluster: %s".format(controllerContext.allTopics)) - initializeAndMaybeTriggerPartitionReassignment() - initializeAndMaybeTriggerPreferredReplicaElection() } private def initializeAndMaybeTriggerPartitionReassignment() { http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ce73cf/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index da47ac8..156bb10 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -44,7 +44,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private val zkClient = controllerContext.zkClient var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId, controller.clientId) - private val isShuttingDown = new AtomicBoolean(false) + private val isRunning = new AtomicBoolean(false) private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext) this.logIdent = "[Partition state machine on Controller " + controllerId + "]: " private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) @@ -55,9 +55,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * the OnlinePartition state change for all new or offline partitions. */ def startup() { - isShuttingDown.set(false) // initialize partition state initializePartitionState() + isRunning.set(true) // try to move partitions to online state triggerOnlinePartitionStateChange() info("Started partition state machine with initial state -> " + partitionState.toString()) @@ -72,7 +72,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * Invoked on controller shutdown. */ def shutdown() { - isShuttingDown.compareAndSet(false, true) + isRunning.compareAndSet(true, false) partitionState.clear() } @@ -125,6 +125,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private def handleStateChange(topic: String, partition: Int, targetState: PartitionState, leaderSelector: PartitionLeaderSelector) { val topicAndPartition = TopicAndPartition(topic, partition) + if (!isRunning.get) + throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " + + "the partition state machine has not started") + .format(controllerId, controller.epoch, topicAndPartition, targetState)) val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition) try { targetState match { @@ -352,7 +356,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { @throws(classOf[Exception]) def handleChildChange(parentPath : String, children : java.util.List[String]) { - if(!isShuttingDown.get()) { + if(isRunning.get) { controllerContext.controllerLock synchronized { try { debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(","))) http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ce73cf/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 3cf1da3..ef2356f 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -43,7 +43,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { private val zkClient = controllerContext.zkClient var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId, controller.clientId) - private val isShuttingDown = new AtomicBoolean(false) + private val isRunning = new AtomicBoolean(false) this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: " private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) @@ -53,9 +53,9 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { * Then triggers the OnlineReplica state change for all replicas. */ def startup() { - isShuttingDown.set(false) // initialize replica state initializeReplicaState() + isRunning.set(true) // move all Online replicas to Online handleStateChanges(ZkUtils.getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, controllerContext.liveBrokerIds.toSeq), OnlineReplica) @@ -71,7 +71,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { * Invoked on controller shutdown. */ def shutdown() { - isShuttingDown.compareAndSet(false, true) + isRunning.compareAndSet(true, false) replicaState.clear() } @@ -102,6 +102,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { */ def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) { val topicAndPartition = TopicAndPartition(topic, partition) + if (!isRunning.get) + throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " + + "to %s failed because replica state machine has not started") + .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState)) try { replicaState.getOrElseUpdate((topic, partition, replicaId), NonExistentReplica) val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition) @@ -235,7 +239,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) { ControllerStats.leaderElectionTimer.time { info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(","))) - if(!isShuttingDown.get()) { + if(isRunning.get) { controllerContext.controllerLock synchronized { try { val curBrokerIds = currentBrokerList.map(_.toInt).toSet