This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new a0a5290ea6 KAFKA-13572 Fix negative preferred replica imbalanced count metric (#12405) a0a5290ea6 is described below commit a0a5290ea648e3e123b567a674d7b280616aa234 Author: Okada Haruki <ocadar...@gmail.com> AuthorDate: Mon Jul 18 15:19:01 2022 +0900 KAFKA-13572 Fix negative preferred replica imbalanced count metric (#12405) Currently, preferredReplicaImbalanceCount calculation has a race that becomes negative when topic deletion is initiated simultaneously. This PR addresses the problem by fixing cleanPreferredReplicaImbalanceMetric to be called only once per topic-deletion procedure Reviewers: Luke Chen <show...@gmail.com> --- .../scala/kafka/controller/ControllerContext.scala | 13 ++++++++++--- .../kafka/controller/ControllerContextTest.scala | 21 +++++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala index 379196aa1d..7065d87c4c 100644 --- a/core/src/main/scala/kafka/controller/ControllerContext.scala +++ b/core/src/main/scala/kafka/controller/ControllerContext.scala @@ -327,9 +327,16 @@ class ControllerContext { } } - def queueTopicDeletion(topics: Set[String]): Unit = { - topicsToBeDeleted ++= topics - topics.foreach(cleanPreferredReplicaImbalanceMetric) + def queueTopicDeletion(topicToBeAddedIntoDeletionList: Set[String]): Unit = { + // queueTopicDeletion could be called multiple times for same topic. + // e.g. 1) delete topic-A => 2) delete topic-B before A's deletion completes. + // In this case, at 2), queueTopicDeletion will be called with Set(topic-A, topic-B). + // However we should call cleanPreferredReplicaImbalanceMetric only once for same topic + // because otherwise, preferredReplicaImbalanceCount could be decremented wrongly at 2nd call. + // So we need to take a diff with already queued topics here. + val newlyDeletedTopics = topicToBeAddedIntoDeletionList.diff(topicsToBeDeleted) + topicsToBeDeleted ++= newlyDeletedTopics + newlyDeletedTopics.foreach(cleanPreferredReplicaImbalanceMetric) } def beginTopicDeletion(topics: Set[String]): Unit = { diff --git a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala index e8efa5af79..e88bb321ad 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala @@ -203,4 +203,25 @@ class ControllerContextTest { context.removeTopic(tp3.topic) assertEquals(0, context.preferredReplicaImbalanceCount) } + + @Test + def testPreferredReplicaImbalanceMetricOnConcurrentTopicDeletion(): Unit = { + val topicA = "A" + val topicB = "B" + val tpA = new TopicPartition(topicA, 0) + val tpB = new TopicPartition(topicB, 0) + context.updatePartitionFullReplicaAssignment(tpA, ReplicaAssignment(Seq(1, 2, 3))) + context.updatePartitionFullReplicaAssignment(tpB, ReplicaAssignment(Seq(1, 2, 3))) + assertEquals(0, context.preferredReplicaImbalanceCount) + + context.queueTopicDeletion(Set(topicA)) + // All partitions in topic will be marked as Offline during deletion procedure + context.putPartitionLeadershipInfo(tpA, LeaderIsrAndControllerEpoch(LeaderAndIsr(LeaderAndIsr.NoLeader, List(1, 2, 3)), 0)) + assertEquals(0, context.preferredReplicaImbalanceCount) + + // Initiate topicB's topic deletion before topicA's deletion completes. + // Since topicA's delete-topic ZK node still exists, context.queueTopicDeletion will be called with Set(topicA, topicB) + context.queueTopicDeletion(Set(topicA, topicB)) + assertEquals(0, context.preferredReplicaImbalanceCount) + } }