Repository: kafka
Updated Branches:
  refs/heads/trunk b75245cfb -> be6056abc


KAFKA-4214; kafka-reassign-partitions fails all the time when brokers are 
bounced during reassignment

There is a corner case bug, where during partition reassignment, if the
controller and a broker receiving a new replica are bounced at the same
time, the partition reassignment is failed.

The cause of this bug is a block of code in the KafkaController which
fails the reassignment if the aliveNewReplicas != newReplicas, ie. if
some of the new replicas are offline at the time a controller fails
over.

The fix is to have the controller listen for ISR change events even for
new replicas which are not alive when the controller boots up. Once the
said replicas come online, they will be in the ISR set, and the new
controller will detect this, and then mark the reassignment as
successful.

Interestingly, the block of code in question was introduced in
KAFKA-990, where a concern about this exact scenario was raised :)

This bug was revealed in the system tests in 
https://github.com/apache/kafka/pull/1904.
The relevant tests will be enabled in either this or a followup PR when PR-1904 
is merged.

Thanks to junrao identifying the issue and providing the patch.

Author: Apurva Mehta <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes #1910 from apurvam/KAFKA-4214


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be6056ab
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be6056ab
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be6056ab

Branch: refs/heads/trunk
Commit: be6056abc9970600347c95c4c8658799b76dbe6b
Parents: b75245c
Author: Apurva Mehta <[email protected]>
Authored: Mon Sep 26 17:18:18 2016 -0700
Committer: Jun Rao <[email protected]>
Committed: Mon Sep 26 17:18:18 2016 -0700

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 21 +++++++-------------
 1 file changed, 7 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/be6056ab/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 04bd3f4..063ea6f 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -631,20 +631,13 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
             throw new KafkaException("Partition %s to be reassigned is already 
assigned to replicas".format(topicAndPartition) +
               " %s. Ignoring request for partition 
reassignment".format(newReplicas.mkString(",")))
           } else {
-            if(aliveNewReplicas == newReplicas) {
-              info("Handling reassignment of partition %s to new replicas 
%s".format(topicAndPartition, newReplicas.mkString(",")))
-              // first register ISR change listener
-              watchIsrChangesForReassignedPartition(topic, partition, 
reassignedPartitionContext)
-              
controllerContext.partitionsBeingReassigned.put(topicAndPartition, 
reassignedPartitionContext)
-              // mark topic ineligible for deletion for the partitions being 
reassigned
-              deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
-              onPartitionReassignment(topicAndPartition, 
reassignedPartitionContext)
-            } else {
-              // some replica in RAR is not alive. Fail partition reassignment
-              throw new KafkaException("Only %s replicas out of the new set of 
replicas".format(aliveNewReplicas.mkString(",")) +
-                " %s for partition %s to be reassigned are alive. 
".format(newReplicas.mkString(","), topicAndPartition) +
-                "Failing partition reassignment")
-            }
+            info("Handling reassignment of partition %s to new replicas 
%s".format(topicAndPartition, newReplicas.mkString(",")))
+            // first register ISR change listener
+            watchIsrChangesForReassignedPartition(topic, partition, 
reassignedPartitionContext)
+            controllerContext.partitionsBeingReassigned.put(topicAndPartition, 
reassignedPartitionContext)
+            // mark topic ineligible for deletion for the partitions being 
reassigned
+            deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
+            onPartitionReassignment(topicAndPartition, 
reassignedPartitionContext)
           }
         case None => throw new KafkaException("Attempt to reassign partition 
%s that doesn't exist"
           .format(topicAndPartition))

Reply via email to