This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 6301954 KAFKA-13591; Fix flaky test
`ControllerIntegrationTest.testTopicIdCreatedOnUpgrade` (#11666)
6301954 is described below
commit 630195458ccab46536d79838edbf26fc6deb3c91
Author: David Jacot <[email protected]>
AuthorDate: Fri Jan 21 17:38:40 2022 +0100
KAFKA-13591; Fix flaky test
`ControllerIntegrationTest.testTopicIdCreatedOnUpgrade` (#11666)
The issue is that when
`zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic)` is called after
the new controller is brought up, there is not guarantee that the controller
has already written the topic id to the topic znode.
Reviewers: Jason Gustafson <[email protected]>
---
.../controller/ControllerIntegrationTest.scala | 21 ++++++++++-----------
1 file changed, 10 insertions(+), 11 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 2302007..300db00 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -1127,27 +1127,26 @@ class ControllerIntegrationTest extends
QuorumTestHarness {
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment =
assignment, servers = servers)
waitForPartitionState(tp, firstControllerEpoch, controllerId,
LeaderAndIsr.initialLeaderEpoch,
"failed to get expected partition state upon topic creation")
- val topicIdAfterCreate =
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
- assertEquals(None, topicIdAfterCreate)
- val emptyTopicId = controller.controllerContext.topicIds.get("t")
- assertEquals(None, emptyTopicId)
+ assertEquals(None,
zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))
+ assertEquals(None, controller.controllerContext.topicIds.get(tp.topic))
servers(controllerId).shutdown()
servers(controllerId).awaitShutdown()
servers = makeServers(1)
TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed
to elect a controller")
- val topicIdAfterUpgrade =
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
- assertNotEquals(emptyTopicId, topicIdAfterUpgrade)
+
+ val (topicIdAfterUpgrade, _) =
TestUtils.computeUntilTrue(zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))(_.nonEmpty)
+ assertNotEquals(None, topicIdAfterUpgrade, s"topic id for ${tp.topic} not
found in ZK")
+
val controller2 = getController().kafkaController
- assertNotEquals(emptyTopicId,
controller2.controllerContext.topicIds.get("t"))
- val topicId = controller2.controllerContext.topicIds.get("t").get
- assertEquals(topicIdAfterUpgrade.get, topicId)
- assertEquals("t", controller2.controllerContext.topicNames(topicId))
+ val topicId = controller2.controllerContext.topicIds.get(tp.topic)
+ assertEquals(topicIdAfterUpgrade, topicId)
+ assertEquals(tp.topic,
controller2.controllerContext.topicNames(topicId.get))
TestUtils.waitUntilTrue(() => servers(0).logManager.getLog(tp).isDefined,
"log was not created")
val topicIdInLog = servers(0).logManager.getLog(tp).get.topicId
- assertEquals(Some(topicId), topicIdInLog)
+ assertEquals(topicId, topicIdInLog)
adminZkClient.deleteTopic(tp.topic)
TestUtils.waitUntilTrue(() =>
!servers.head.kafkaController.controllerContext.allTopics.contains(tp.topic),