Updated Branches:
  refs/heads/0.8 06911384f -> a9ce73cfd

KAFKA-840 Controller tries to perform preferred replica election on failover 
before state machines have started up; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a9ce73cf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a9ce73cf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a9ce73cf

Branch: refs/heads/0.8
Commit: a9ce73cfd36bdb197e722ce8b828f27153f9e4ed
Parents: 0691138
Author: Swapnil Ghike <sgh...@linkedin.com>
Authored: Mon Apr 1 09:17:12 2013 -0700
Committer: Neha Narkhede <neha.narkh...@gmail.com>
Committed: Mon Apr 1 09:17:20 2013 -0700

----------------------------------------------------------------------
 .../scala/kafka/controller/KafkaController.scala   |    4 ++--
 .../kafka/controller/PartitionStateMachine.scala   |   12 ++++++++----
 .../kafka/controller/ReplicaStateMachine.scala     |   12 ++++++++----
 3 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ce73cf/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 47d4d7b..74614d8 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -242,6 +242,8 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
       replicaStateMachine.startup()
       Utils.registerMBean(this, KafkaController.MBeanName)
       info("Broker %d is ready to serve as the new controller with epoch 
%d".format(config.brokerId, epoch))
+      initializeAndMaybeTriggerPartitionReassignment()
+      initializeAndMaybeTriggerPreferredReplicaElection()
     }
     else
       info("Controller has been shut down, aborting startup/failover")
@@ -483,8 +485,6 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
     info("Currently active brokers in the cluster: 
%s".format(controllerContext.liveBrokerIds))
     info("Currently shutting brokers in the cluster: 
%s".format(controllerContext.shuttingDownBrokerIds))
     info("Current list of topics in the cluster: 
%s".format(controllerContext.allTopics))
-    initializeAndMaybeTriggerPartitionReassignment()
-    initializeAndMaybeTriggerPreferredReplicaElection()
   }
 
   private def initializeAndMaybeTriggerPartitionReassignment() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ce73cf/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index da47ac8..156bb10 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -44,7 +44,7 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
   private val zkClient = controllerContext.zkClient
   var partitionState: mutable.Map[TopicAndPartition, PartitionState] = 
mutable.Map.empty
   val brokerRequestBatch = new 
ControllerBrokerRequestBatch(controller.sendRequest, controllerId, 
controller.clientId)
-  private val isShuttingDown = new AtomicBoolean(false)
+  private val isRunning = new AtomicBoolean(false)
   private val noOpPartitionLeaderSelector = new 
NoOpLeaderSelector(controllerContext)
   this.logIdent = "[Partition state machine on Controller " + controllerId + 
"]: "
   private val stateChangeLogger = 
Logger.getLogger(KafkaController.stateChangeLogger)
@@ -55,9 +55,9 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
    * the OnlinePartition state change for all new or offline partitions.
    */
   def startup() {
-    isShuttingDown.set(false)
     // initialize partition state
     initializePartitionState()
+    isRunning.set(true)
     // try to move partitions to online state
     triggerOnlinePartitionStateChange()
     info("Started partition state machine with initial state -> " + 
partitionState.toString())
@@ -72,7 +72,7 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
    * Invoked on controller shutdown.
    */
   def shutdown() {
-    isShuttingDown.compareAndSet(false, true)
+    isRunning.compareAndSet(true, false)
     partitionState.clear()
   }
 
@@ -125,6 +125,10 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
   private def handleStateChange(topic: String, partition: Int, targetState: 
PartitionState,
                                 leaderSelector: PartitionLeaderSelector) {
     val topicAndPartition = TopicAndPartition(topic, partition)
+    if (!isRunning.get)
+      throw new StateChangeFailedException(("Controller %d epoch %d initiated 
state change for partition %s to %s failed because " +
+                                            "the partition state machine has 
not started")
+                                              .format(controllerId, 
controller.epoch, topicAndPartition, targetState))
     val currState = partitionState.getOrElseUpdate(topicAndPartition, 
NonExistentPartition)
     try {
       targetState match {
@@ -352,7 +356,7 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, children : 
java.util.List[String]) {
-      if(!isShuttingDown.get()) {
+      if(isRunning.get) {
         controllerContext.controllerLock synchronized {
           try {
             debug("Topic change listener fired for path %s with children 
%s".format(parentPath, children.mkString(",")))

http://git-wip-us.apache.org/repos/asf/kafka/blob/a9ce73cf/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 3cf1da3..ef2356f 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -43,7 +43,7 @@ class ReplicaStateMachine(controller: KafkaController) 
extends Logging {
   private val zkClient = controllerContext.zkClient
   var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = 
mutable.Map.empty
   val brokerRequestBatch = new 
ControllerBrokerRequestBatch(controller.sendRequest, controllerId, 
controller.clientId)
-  private val isShuttingDown = new AtomicBoolean(false)
+  private val isRunning = new AtomicBoolean(false)
   this.logIdent = "[Replica state machine on controller " + 
controller.config.brokerId + "]: "
   private val stateChangeLogger = 
Logger.getLogger(KafkaController.stateChangeLogger)
 
@@ -53,9 +53,9 @@ class ReplicaStateMachine(controller: KafkaController) 
extends Logging {
    * Then triggers the OnlineReplica state change for all replicas.
    */
   def startup() {
-    isShuttingDown.set(false)
     // initialize replica state
     initializeReplicaState()
+    isRunning.set(true)
     // move all Online replicas to Online
     handleStateChanges(ZkUtils.getAllReplicasOnBroker(zkClient, 
controllerContext.allTopics.toSeq,
       controllerContext.liveBrokerIds.toSeq), OnlineReplica)
@@ -71,7 +71,7 @@ class ReplicaStateMachine(controller: KafkaController) 
extends Logging {
    * Invoked on controller shutdown.
    */
   def shutdown() {
-    isShuttingDown.compareAndSet(false, true)
+    isRunning.compareAndSet(true, false)
     replicaState.clear()
   }
 
@@ -102,6 +102,10 @@ class ReplicaStateMachine(controller: KafkaController) 
extends Logging {
    */
   def handleStateChange(topic: String, partition: Int, replicaId: Int, 
targetState: ReplicaState) {
     val topicAndPartition = TopicAndPartition(topic, partition)
+    if (!isRunning.get)
+      throw new StateChangeFailedException(("Controller %d epoch %d initiated 
state change of replica %d for partition %s " +
+                                            "to %s failed because replica 
state machine has not started")
+                                              .format(controllerId, 
controller.epoch, replicaId, topicAndPartition, targetState))
     try {
       replicaState.getOrElseUpdate((topic, partition, replicaId), 
NonExistentReplica)
       val replicaAssignment = 
controllerContext.partitionReplicaAssignment(topicAndPartition)
@@ -235,7 +239,7 @@ class ReplicaStateMachine(controller: KafkaController) 
extends Logging {
     def handleChildChange(parentPath : String, currentBrokerList : 
java.util.List[String]) {
       ControllerStats.leaderElectionTimer.time {
         info("Broker change listener fired for path %s with children 
%s".format(parentPath, currentBrokerList.mkString(",")))
-        if(!isShuttingDown.get()) {
+        if(isRunning.get) {
           controllerContext.controllerLock synchronized {
             try {
               val curBrokerIds = currentBrokerList.map(_.toInt).toSet

Reply via email to