Address code review feedbacks

Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/55d77c67
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/55d77c67
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/55d77c67

Branch: refs/heads/trunk
Commit: 55d77c67c236488df2b9f6bb59d99eeb645a0553
Parents: 7f32a1c
Author: Sriram Subramanian <[email protected]>
Authored: Mon Dec 9 22:51:41 2013 -0800
Committer: Sriram Subramanian <[email protected]>
Committed: Mon Dec 9 22:51:41 2013 -0800

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 87 +++++++++++++-------
 1 file changed, 56 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/55d77c67/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 6cd58a4..522e6c7 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -143,6 +143,22 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
     }
   )
 
+  newGauge(
+    "PreferredReplicaImbalanceCount",
+    new Gauge[Int] {
+      def value(): Int = {
+        controllerContext.controllerLock synchronized {
+          if (!isActive())
+            0
+          else
+            controllerContext.partitionReplicaAssignment.count {
+              case (topicPartition, replicas) => 
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader 
!= replicas.head
+            }
+        }
+      }
+    }
+  )
+
   def epoch = controllerContext.epoch
 
   def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, 
config.hostName, config.port)
@@ -465,7 +481,7 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
     }
   }
 
-  def onPreferredReplicaElection(partitions: Set[TopicAndPartition], updateZk: 
Boolean = true) {
+  def onPreferredReplicaElection(partitions: Set[TopicAndPartition], 
isTriggeredByAutoRebalance: Boolean = true) {
     info("Starting preferred replica leader election for partitions 
%s".format(partitions.mkString(",")))
     try {
       controllerContext.partitionsUndergoingPreferredReplicaElection ++= 
partitions
@@ -473,7 +489,7 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
     } catch {
       case e: Throwable => error("Error completing preferred replica leader 
election for partitions %s".format(partitions.mkString(",")), e)
     } finally {
-      removePartitionsFromPreferredReplicaElection(partitions, updateZk)
+      removePartitionsFromPreferredReplicaElection(partitions, 
isTriggeredByAutoRebalance)
     }
   }
 
@@ -742,7 +758,8 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
     }
   }
 
-  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: 
Set[TopicAndPartition], updateZK : Boolean) {
+  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: 
Set[TopicAndPartition],
+                                                   isTriggeredByAutoRebalance 
: Boolean) {
     for(partition <- partitionsToBeRemoved) {
       // check the status
       val currentLeader = 
controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
@@ -753,7 +770,7 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
         warn("Partition %s failed to complete preferred replica leader 
election. Leader is %d".format(partition, currentLeader))
       }
     }
-    if (updateZK)
+    if (isTriggeredByAutoRebalance)
       ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
     controllerContext.partitionsUndergoingPreferredReplicaElection --= 
partitionsToBeRemoved
   }
@@ -913,42 +930,50 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
 
   private def checkAndTriggerPartitionRebalance(): Unit = {
     if (isActive()) {
-      info("checking need to trigger partition rebalance")
+      trace("checking need to trigger partition rebalance")
       // get all the active brokers
-      var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, 
Seq[Int]]] = null;
+      var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, 
Seq[Int]]] = null
       controllerContext.controllerLock synchronized {
-        preferredReplicasForTopicsByBrokers = 
controllerContext.partitionReplicaAssignment.groupBy(_._2.head)
+        preferredReplicasForTopicsByBrokers = 
controllerContext.partitionReplicaAssignment.groupBy {
+          case(topicAndPartition, assignedReplicas) => assignedReplicas.head
+        }
       }
       debug("preferred replicas by broker " + 
preferredReplicasForTopicsByBrokers)
       // for each broker, check if a preferred replica election needs to be 
triggered
-      preferredReplicasForTopicsByBrokers.foreach( brokerInfo => {
-        var imbalanceRatio: Double = 0
-        var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = 
null
-        controllerContext.controllerLock synchronized {
-          val brokerIds = controllerContext.liveBrokerIds
-          if (brokerIds.contains(brokerInfo._1) &&
-              controllerContext.partitionsBeingReassigned.size == 0) {
-            // do this check only if the broker is live and there are no 
partitions being reassigned currently
-            topicsNotInPreferredReplica =
-              brokerInfo._2.filter(item => 
controllerContext.partitionLeadershipInfo(item._1).leaderAndIsr.leader != 
brokerInfo._1);
-            debug("topics not in preferred replica " + 
topicsNotInPreferredReplica)
-            val totalTopicPartitionsForBroker = brokerInfo._2.size
-            val totalTopicPartitionsNotLedByBroker = 
topicsNotInPreferredReplica.size
-            imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / 
totalTopicPartitionsForBroker
-            info("leader imbalance ratio for broker %d is 
%f".format(brokerInfo._1, imbalanceRatio))
+      preferredReplicasForTopicsByBrokers.foreach {
+        case(leaderBroker, topicAndPartitionsForBroker) => {
+          var imbalanceRatio: Double = 0
+          var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = 
null
+          controllerContext.controllerLock synchronized {
+            if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+                controllerContext.partitionsBeingReassigned.size == 0) {
+              // do this check only if the broker is live and there are no 
partitions being reassigned currently
+              topicsNotInPreferredReplica =
+                topicAndPartitionsForBroker.filter {
+                  case(topicPartition, replicas) => {
+                    
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader 
!= leaderBroker
+                  }
+                }
+              debug("topics not in preferred replica " + 
topicsNotInPreferredReplica)
+              val totalTopicPartitionsForBroker = 
topicAndPartitionsForBroker.size
+              val totalTopicPartitionsNotLedByBroker = 
topicsNotInPreferredReplica.size
+              imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / 
totalTopicPartitionsForBroker
+              trace("leader imbalance ratio for broker %d is 
%f".format(leaderBroker, imbalanceRatio))
+            }
           }
-        }
-        // check ratio and if greater than desired ratio, trigger a rebalance 
for the topics
-        // that need to be on this broker
-        if (imbalanceRatio > 
(config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
-          topicsNotInPreferredReplica.foreach(topicPartition => {
-            controllerContext.controllerLock synchronized {
-              onPreferredReplicaElection(Set(topicPartition._1), false)
+          // check ratio and if greater than desired ratio, trigger a 
rebalance for the topic partitions
+          // that need to be on this broker
+          if (imbalanceRatio > 
(config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
+            topicsNotInPreferredReplica.foreach {
+              case(topicPartition, replicas) => {
+                controllerContext.controllerLock synchronized {
+                  onPreferredReplicaElection(Set(topicPartition), false)
+                }
+              }
             }
-          })
+          }
         }
       }
-      )
     }
   }
 }

Reply via email to