Repository: kafka Updated Branches: refs/heads/trunk b75245cfb -> be6056abc
KAFKA-4214; kafka-reassign-partitions fails all the time when brokers are bounced during reassignment There is a corner case bug, where during partition reassignment, if the controller and a broker receiving a new replica are bounced at the same time, the partition reassignment is failed. The cause of this bug is a block of code in the KafkaController which fails the reassignment if the aliveNewReplicas != newReplicas, ie. if some of the new replicas are offline at the time a controller fails over. The fix is to have the controller listen for ISR change events even for new replicas which are not alive when the controller boots up. Once the said replicas come online, they will be in the ISR set, and the new controller will detect this, and then mark the reassignment as successful. Interestingly, the block of code in question was introduced in KAFKA-990, where a concern about this exact scenario was raised :) This bug was revealed in the system tests in https://github.com/apache/kafka/pull/1904. The relevant tests will be enabled in either this or a followup PR when PR-1904 is merged. Thanks to junrao identifying the issue and providing the patch. Author: Apurva Mehta <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #1910 from apurvam/KAFKA-4214 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be6056ab Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be6056ab Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be6056ab Branch: refs/heads/trunk Commit: be6056abc9970600347c95c4c8658799b76dbe6b Parents: b75245c Author: Apurva Mehta <[email protected]> Authored: Mon Sep 26 17:18:18 2016 -0700 Committer: Jun Rao <[email protected]> Committed: Mon Sep 26 17:18:18 2016 -0700 ---------------------------------------------------------------------- .../kafka/controller/KafkaController.scala | 21 +++++++------------- 1 file changed, 7 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/be6056ab/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 04bd3f4..063ea6f 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -631,20 +631,13 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) + " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(","))) } else { - if(aliveNewReplicas == newReplicas) { - info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(","))) - // first register ISR change listener - watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext) - controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext) - // mark topic ineligible for deletion for the partitions being reassigned - deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) - onPartitionReassignment(topicAndPartition, reassignedPartitionContext) - } else { - // some replica in RAR is not alive. Fail partition reassignment - throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) + - " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) + - "Failing partition reassignment") - } + info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(","))) + // first register ISR change listener + watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext) + controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext) + // mark topic ineligible for deletion for the partitions being reassigned + deleteTopicManager.markTopicIneligibleForDeletion(Set(topic)) + onPartitionReassignment(topicAndPartition, reassignedPartitionContext) } case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist" .format(topicAndPartition))
