Repository: kafka
Updated Branches:
  refs/heads/0.8.1 a2745382d -> b5971264f


auto rebalance last commit


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b5971264
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b5971264
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b5971264

Branch: refs/heads/0.8.1
Commit: b5971264f29c6646bc543b47c58786f6322f0bd0
Parents: a274538
Author: Sriram Subramanian <[email protected]>
Authored: Tue Feb 25 00:36:48 2014 -0800
Committer: Sriram Subramanian <[email protected]>
Committed: Tue Feb 25 00:36:48 2014 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 44 +++++++++-----------
 1 file changed, 20 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b5971264/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 00a1f98..f12ffc2 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -603,7 +603,7 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
     }
   }
 
-  def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
+  def onPreferredReplicaElection(partitions: Set[TopicAndPartition], 
isTriggeredByAutoRebalance: Boolean = true) {
     info("Starting preferred replica leader election for partitions 
%s".format(partitions.mkString(",")))
     try {
       controllerContext.partitionsUndergoingPreferredReplicaElection ++= 
partitions
@@ -612,7 +612,7 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
     } catch {
       case e: Throwable => error("Error completing preferred replica leader 
election for partitions %s".format(partitions.mkString(",")), e)
     } finally {
-      removePartitionsFromPreferredReplicaElection(partitions)
+      removePartitionsFromPreferredReplicaElection(partitions, 
isTriggeredByAutoRebalance)
       deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
     }
   }
@@ -914,7 +914,8 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
     }
   }
 
-  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: 
Set[TopicAndPartition]) {
+  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: 
Set[TopicAndPartition],
+                                                   isTriggeredByAutoRebalance 
: Boolean) {
     for(partition <- partitionsToBeRemoved) {
       // check the status
       val currentLeader = 
controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
@@ -925,7 +926,8 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
         warn("Partition %s failed to complete preferred replica leader 
election. Leader is %d".format(partition, currentLeader))
       }
     }
-    ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
+    if (!isTriggeredByAutoRebalance)
+      ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
     controllerContext.partitionsUndergoingPreferredReplicaElection --= 
partitionsToBeRemoved
   }
 
@@ -1090,6 +1092,7 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
             topicsNotInPreferredReplica =
               topicAndPartitionsForBroker.filter {
                 case(topicPartition, replicas) => {
+                  
controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
                   
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader 
!= leaderBroker
                 }
               }
@@ -1102,26 +1105,19 @@ class KafkaController(val config : KafkaConfig, 
zkClient: ZkClient) extends Logg
           // check ratio and if greater than desired ratio, trigger a 
rebalance for the topic partitions
           // that need to be on this broker
           if (imbalanceRatio > 
(config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
-            inLock(controllerContext.controllerLock) {
-              // do this check only if the broker is live and there are no 
partitions being reassigned currently
-              // and preferred replica election is not in progress
-              if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
-                  controllerContext.partitionsBeingReassigned.size == 0 &&
-                  
controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
-                val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
-                val partitionsList = topicsNotInPreferredReplica.keys.map(e => 
Map("topic" -> e.topic, "partition" -> e.partition))
-                val jsonData = Json.encode(Map("version" -> 1, "partitions" -> 
partitionsList))
-                try {
-                  ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
-                  info("Created preferred replica election path with 
%s".format(jsonData))
-                } catch {
-                  case e2: ZkNodeExistsException =>
-                    val partitionsUndergoingPreferredReplicaElection =
-                      
PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient,
 zkPath)._1)
-                    error("Preferred replica leader election currently in 
progress for " +
-                          "%s. Aborting 
operation".format(partitionsUndergoingPreferredReplicaElection));
-                  case e3: Throwable =>
-                    error("Error while trying to auto rebalance topics 
%s".format(topicsNotInPreferredReplica.keys))
+            topicsNotInPreferredReplica.foreach {
+              case(topicPartition, replicas) => {
+                inLock(controllerContext.controllerLock) {
+                  // do this check only if the broker is live and there are no 
partitions being reassigned currently
+                  // and preferred replica election is not in progress
+                  if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+                      controllerContext.partitionsBeingReassigned.size == 0 &&
+                      
controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
+                      
!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
+                      
!deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) &&
+                      
controllerContext.allTopics.contains(topicPartition.topic)) {
+                    onPreferredReplicaElection(Set(topicPartition), false)
+                  }
                 }
               }
             }

Reply via email to