This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new 6301954  KAFKA-13591; Fix flaky test 
`ControllerIntegrationTest.testTopicIdCreatedOnUpgrade` (#11666)
6301954 is described below

commit 630195458ccab46536d79838edbf26fc6deb3c91
Author: David Jacot <[email protected]>
AuthorDate: Fri Jan 21 17:38:40 2022 +0100

    KAFKA-13591; Fix flaky test 
`ControllerIntegrationTest.testTopicIdCreatedOnUpgrade` (#11666)
    
    The issue is that when 
`zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic)` is called after 
the new controller is brought up, there is not guarantee that the controller 
has already written the topic id to the topic znode.
    
    Reviewers: Jason Gustafson <[email protected]>
---
 .../controller/ControllerIntegrationTest.scala      | 21 ++++++++++-----------
 1 file changed, 10 insertions(+), 11 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 2302007..300db00 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -1127,27 +1127,26 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = 
assignment, servers = servers)
     waitForPartitionState(tp, firstControllerEpoch, controllerId, 
LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon topic creation")
-    val topicIdAfterCreate = 
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
-    assertEquals(None, topicIdAfterCreate)
-    val emptyTopicId = controller.controllerContext.topicIds.get("t")
-    assertEquals(None, emptyTopicId)
+    assertEquals(None, 
zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))
+    assertEquals(None, controller.controllerContext.topicIds.get(tp.topic))
 
     servers(controllerId).shutdown()
     servers(controllerId).awaitShutdown()
     servers = makeServers(1)
     TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed 
to elect a controller")
-    val topicIdAfterUpgrade = 
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
-    assertNotEquals(emptyTopicId, topicIdAfterUpgrade)
+
+    val (topicIdAfterUpgrade, _) = 
TestUtils.computeUntilTrue(zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))(_.nonEmpty)
+    assertNotEquals(None, topicIdAfterUpgrade, s"topic id for ${tp.topic} not 
found in ZK")
+
     val controller2 = getController().kafkaController
-    assertNotEquals(emptyTopicId, 
controller2.controllerContext.topicIds.get("t"))
-    val topicId = controller2.controllerContext.topicIds.get("t").get
-    assertEquals(topicIdAfterUpgrade.get, topicId)
-    assertEquals("t", controller2.controllerContext.topicNames(topicId))
+    val topicId = controller2.controllerContext.topicIds.get(tp.topic)
+    assertEquals(topicIdAfterUpgrade, topicId)
+    assertEquals(tp.topic, 
controller2.controllerContext.topicNames(topicId.get))
 
     TestUtils.waitUntilTrue(() => servers(0).logManager.getLog(tp).isDefined, 
"log was not created")
 
     val topicIdInLog = servers(0).logManager.getLog(tp).get.topicId
-    assertEquals(Some(topicId), topicIdInLog)
+    assertEquals(topicId, topicIdInLog)
 
     adminZkClient.deleteTopic(tp.topic)
     TestUtils.waitUntilTrue(() => 
!servers.head.kafkaController.controllerContext.allTopics.contains(tp.topic),

Reply via email to