Repository: kafka Updated Branches: refs/heads/trunk 20d9adb17 -> b5b266eee
KAFKA-5879; Controller should read the latest IsrChangeNotification znodes when handling IsrChangeNotification event Author: Dong Lin <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]> Closes #3840 from lindong28/KAFKA-5879 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b5b266ee Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b5b266ee Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b5b266ee Branch: refs/heads/trunk Commit: b5b266eeeccc18c5cfbd37fa59ab5d9cf3676e34 Parents: 20d9adb Author: Dong Lin <[email protected]> Authored: Wed Oct 4 17:37:35 2017 -0700 Committer: Jun Rao <[email protected]> Committed: Wed Oct 4 17:37:35 2017 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/controller/KafkaController.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b5b266ee/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 4ea61ab..811ff67 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -993,7 +993,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met // remove this partition from that list val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition // write the new list to zookeeper - zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas)) + if (updatedPartitionsBeingReassigned.size < partitionsBeingReassigned.size) + zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas)) // update the cache. NO-OP if the partition's reassignment was never started controllerContext.partitionsBeingReassigned.remove(topicAndPartition) } @@ -1366,16 +1367,20 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met def state = ControllerState.IsrChange override def process(): Unit = { + // Read the current isr change notification znodes from ZK again instead of using sequenceNumbers + // to increase the odds of processing recent isr changes in a single ControllerEvent + // and to reduce the odds of trying to access znodes that have already been deleted (KAFKA-5879). + val currentSequenceNumbers = zkUtils.getChildrenParentMayNotExist(ZkUtils.IsrChangeNotificationPath) if (!isActive) return try { - val topicAndPartitions = sequenceNumbers.flatMap(getTopicAndPartition).toSet + val topicAndPartitions = currentSequenceNumbers.flatMap(getTopicAndPartition).toSet if (topicAndPartitions.nonEmpty) { updateLeaderAndIsrCache(topicAndPartitions) processUpdateNotifications(topicAndPartitions) } } finally { // delete the notifications - sequenceNumbers.map(x => controllerContext.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath + "/" + x)) + currentSequenceNumbers.map(x => controllerContext.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath + "/" + x)) } }
