Repository: kafka Updated Branches: refs/heads/1.0.0 0222a35db -> 5dab884d7
KAFKA-5879; Controller should read the latest IsrChangeNotification znodes when handling IsrChangeNotification event Author: Dong Lin <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]> Closes #3840 from lindong28/KAFKA-5879 (cherry picked from commit b5b266eeeccc18c5cfbd37fa59ab5d9cf3676e34) Signed-off-by: Jun Rao <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5dab884d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5dab884d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5dab884d Branch: refs/heads/1.0.0 Commit: 5dab884d74183ef0409c8efab465d4223827a604 Parents: 0222a35 Author: Dong Lin <[email protected]> Authored: Wed Oct 4 17:37:35 2017 -0700 Committer: Jun Rao <[email protected]> Committed: Wed Oct 4 17:37:46 2017 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/controller/KafkaController.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5dab884d/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 4ea61ab..811ff67 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -993,7 +993,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met // remove this partition from that list val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition // write the new list to zookeeper - zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas)) + if (updatedPartitionsBeingReassigned.size < partitionsBeingReassigned.size) + zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas)) // update the cache. NO-OP if the partition's reassignment was never started controllerContext.partitionsBeingReassigned.remove(topicAndPartition) } @@ -1366,16 +1367,20 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met def state = ControllerState.IsrChange override def process(): Unit = { + // Read the current isr change notification znodes from ZK again instead of using sequenceNumbers + // to increase the odds of processing recent isr changes in a single ControllerEvent + // and to reduce the odds of trying to access znodes that have already been deleted (KAFKA-5879). + val currentSequenceNumbers = zkUtils.getChildrenParentMayNotExist(ZkUtils.IsrChangeNotificationPath) if (!isActive) return try { - val topicAndPartitions = sequenceNumbers.flatMap(getTopicAndPartition).toSet + val topicAndPartitions = currentSequenceNumbers.flatMap(getTopicAndPartition).toSet if (topicAndPartitions.nonEmpty) { updateLeaderAndIsrCache(topicAndPartitions) processUpdateNotifications(topicAndPartitions) } } finally { // delete the notifications - sequenceNumbers.map(x => controllerContext.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath + "/" + x)) + currentSequenceNumbers.map(x => controllerContext.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath + "/" + x)) } }
