Address code review feedbacks
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/55d77c67 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/55d77c67 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/55d77c67 Branch: refs/heads/trunk Commit: 55d77c67c236488df2b9f6bb59d99eeb645a0553 Parents: 7f32a1c Author: Sriram Subramanian <[email protected]> Authored: Mon Dec 9 22:51:41 2013 -0800 Committer: Sriram Subramanian <[email protected]> Committed: Mon Dec 9 22:51:41 2013 -0800 ---------------------------------------------------------------------- .../kafka/controller/KafkaController.scala | 87 +++++++++++++------- 1 file changed, 56 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/55d77c67/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 6cd58a4..522e6c7 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -143,6 +143,22 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } ) + newGauge( + "PreferredReplicaImbalanceCount", + new Gauge[Int] { + def value(): Int = { + controllerContext.controllerLock synchronized { + if (!isActive()) + 0 + else + controllerContext.partitionReplicaAssignment.count { + case (topicPartition, replicas) => controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != replicas.head + } + } + } + } + ) + def epoch = controllerContext.epoch def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, config.hostName, config.port) @@ -465,7 +481,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } - def onPreferredReplicaElection(partitions: Set[TopicAndPartition], updateZk: Boolean = true) { + def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = true) { info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(","))) try { controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions @@ -473,7 +489,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } catch { case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e) } finally { - removePartitionsFromPreferredReplicaElection(partitions, updateZk) + removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance) } } @@ -742,7 +758,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } - def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition], updateZK : Boolean) { + def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition], + isTriggeredByAutoRebalance : Boolean) { for(partition <- partitionsToBeRemoved) { // check the status val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader @@ -753,7 +770,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader)) } } - if (updateZK) + if (isTriggeredByAutoRebalance) ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath) controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved } @@ -913,42 +930,50 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg private def checkAndTriggerPartitionRebalance(): Unit = { if (isActive()) { - info("checking need to trigger partition rebalance") + trace("checking need to trigger partition rebalance") // get all the active brokers - var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null; + var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null controllerContext.controllerLock synchronized { - preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy(_._2.head) + preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy { + case(topicAndPartition, assignedReplicas) => assignedReplicas.head + } } debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers) // for each broker, check if a preferred replica election needs to be triggered - preferredReplicasForTopicsByBrokers.foreach( brokerInfo => { - var imbalanceRatio: Double = 0 - var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null - controllerContext.controllerLock synchronized { - val brokerIds = controllerContext.liveBrokerIds - if (brokerIds.contains(brokerInfo._1) && - controllerContext.partitionsBeingReassigned.size == 0) { - // do this check only if the broker is live and there are no partitions being reassigned currently - topicsNotInPreferredReplica = - brokerInfo._2.filter(item => controllerContext.partitionLeadershipInfo(item._1).leaderAndIsr.leader != brokerInfo._1); - debug("topics not in preferred replica " + topicsNotInPreferredReplica) - val totalTopicPartitionsForBroker = brokerInfo._2.size - val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size - imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker - info("leader imbalance ratio for broker %d is %f".format(brokerInfo._1, imbalanceRatio)) + preferredReplicasForTopicsByBrokers.foreach { + case(leaderBroker, topicAndPartitionsForBroker) => { + var imbalanceRatio: Double = 0 + var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null + controllerContext.controllerLock synchronized { + if (controllerContext.liveBrokerIds.contains(leaderBroker) && + controllerContext.partitionsBeingReassigned.size == 0) { + // do this check only if the broker is live and there are no partitions being reassigned currently + topicsNotInPreferredReplica = + topicAndPartitionsForBroker.filter { + case(topicPartition, replicas) => { + controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker + } + } + debug("topics not in preferred replica " + topicsNotInPreferredReplica) + val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size + val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size + imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker + trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio)) + } } - } - // check ratio and if greater than desired ratio, trigger a rebalance for the topics - // that need to be on this broker - if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { - topicsNotInPreferredReplica.foreach(topicPartition => { - controllerContext.controllerLock synchronized { - onPreferredReplicaElection(Set(topicPartition._1), false) + // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions + // that need to be on this broker + if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { + topicsNotInPreferredReplica.foreach { + case(topicPartition, replicas) => { + controllerContext.controllerLock synchronized { + onPreferredReplicaElection(Set(topicPartition), false) + } + } } - }) + } } } - ) } } }
