This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new a0a5290ea6 KAFKA-13572 Fix negative preferred replica imbalanced count 
metric (#12405)
a0a5290ea6 is described below

commit a0a5290ea648e3e123b567a674d7b280616aa234
Author: Okada Haruki <ocadar...@gmail.com>
AuthorDate: Mon Jul 18 15:19:01 2022 +0900

    KAFKA-13572 Fix negative preferred replica imbalanced count metric (#12405)
    
    
    Currently, preferredReplicaImbalanceCount calculation has a race that 
becomes negative when topic deletion is initiated simultaneously. This PR 
addresses the problem by fixing cleanPreferredReplicaImbalanceMetric to be 
called only once per topic-deletion procedure
    
    Reviewers: Luke Chen <show...@gmail.com>
---
 .../scala/kafka/controller/ControllerContext.scala  | 13 ++++++++++---
 .../kafka/controller/ControllerContextTest.scala    | 21 +++++++++++++++++++++
 2 files changed, 31 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala 
b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 379196aa1d..7065d87c4c 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -327,9 +327,16 @@ class ControllerContext {
     }
   }
 
-  def queueTopicDeletion(topics: Set[String]): Unit = {
-    topicsToBeDeleted ++= topics
-    topics.foreach(cleanPreferredReplicaImbalanceMetric)
+  def queueTopicDeletion(topicToBeAddedIntoDeletionList: Set[String]): Unit = {
+    // queueTopicDeletion could be called multiple times for same topic.
+    // e.g. 1) delete topic-A => 2) delete topic-B before A's deletion 
completes.
+    // In this case, at 2), queueTopicDeletion will be called with 
Set(topic-A, topic-B).
+    // However we should call cleanPreferredReplicaImbalanceMetric only once 
for same topic
+    // because otherwise, preferredReplicaImbalanceCount could be decremented 
wrongly at 2nd call.
+    // So we need to take a diff with already queued topics here.
+    val newlyDeletedTopics = 
topicToBeAddedIntoDeletionList.diff(topicsToBeDeleted)
+    topicsToBeDeleted ++= newlyDeletedTopics
+    newlyDeletedTopics.foreach(cleanPreferredReplicaImbalanceMetric)
   }
 
   def beginTopicDeletion(topics: Set[String]): Unit = {
diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
index e8efa5af79..e88bb321ad 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
@@ -203,4 +203,25 @@ class ControllerContextTest {
     context.removeTopic(tp3.topic)
     assertEquals(0, context.preferredReplicaImbalanceCount)
   }
+
+  @Test
+  def testPreferredReplicaImbalanceMetricOnConcurrentTopicDeletion(): Unit = {
+    val topicA = "A"
+    val topicB = "B"
+    val tpA = new TopicPartition(topicA, 0)
+    val tpB = new TopicPartition(topicB, 0)
+    context.updatePartitionFullReplicaAssignment(tpA, ReplicaAssignment(Seq(1, 
2, 3)))
+    context.updatePartitionFullReplicaAssignment(tpB, ReplicaAssignment(Seq(1, 
2, 3)))
+    assertEquals(0, context.preferredReplicaImbalanceCount)
+
+    context.queueTopicDeletion(Set(topicA))
+    // All partitions in topic will be marked as Offline during deletion 
procedure
+    context.putPartitionLeadershipInfo(tpA, 
LeaderIsrAndControllerEpoch(LeaderAndIsr(LeaderAndIsr.NoLeader, List(1, 2, 3)), 
0))
+    assertEquals(0, context.preferredReplicaImbalanceCount)
+
+    // Initiate topicB's topic deletion before topicA's deletion completes.
+    // Since topicA's delete-topic ZK node still exists, 
context.queueTopicDeletion will be called with Set(topicA, topicB)
+    context.queueTopicDeletion(Set(topicA, topicB))
+    assertEquals(0, context.preferredReplicaImbalanceCount)
+  }
 }

Reply via email to