some more changes
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e88f1acd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e88f1acd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e88f1acd Branch: refs/heads/trunk Commit: e88f1acdaac9fe738787ccb8375519293c913a37 Parents: 425af9b Author: Sriram Subramanian <[email protected]> Authored: Fri Dec 20 11:22:26 2013 -0800 Committer: Sriram Subramanian <[email protected]> Committed: Fri Dec 20 11:22:26 2013 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/controller/KafkaController.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e88f1acd/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 74e2ea4..8017abb 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -481,7 +481,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } - def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = true) { + def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) { info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(","))) try { controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions @@ -489,7 +489,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } catch { case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e) } finally { - removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance) + removePartitionsFromPreferredReplicaElection(partitions) } } @@ -758,8 +758,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } - def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition], - isTriggeredByAutoRebalance : Boolean) { + def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) { for(partition <- partitionsToBeRemoved) { // check the status val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader @@ -770,8 +769,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader)) } } - if (isTriggeredByAutoRebalance) - ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath) + ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath) controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved } @@ -973,11 +971,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) info("Created preferred replica election path with %s".format(jsonData)) } catch { - case e2: Throwable => + case e2: ZkNodeExistsException => val partitionsUndergoingPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1) error("Preferred replica leader election currently in progress for " + "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection)); + case e3: Throwable => + error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys)) } } }
