This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9ec232fde8 KAFKA-13832: Fix flaky testAlterAssignment (#12060)
9ec232fde8 is described below
commit 9ec232fde850fc410d4adb07e1f259cab21336ad
Author: dengziming <[email protected]>
AuthorDate: Tue Apr 19 15:08:29 2022 +0800
KAFKA-13832: Fix flaky testAlterAssignment (#12060)
In KRaft mode the metadata is not propagate in time, so we should should
wait for it before make assertions.
Reviewers: Luke Chen <[email protected]>
---
.../integration/kafka/admin/TopicCommandIntegrationTest.scala | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
index 9abf7487d7..99b2f3e0b7 100644
---
a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
@@ -306,7 +306,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
TestUtils.waitUntilTrue(
() =>
brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size == 3),
- "Timeout waiting new assignment propagate to broker")
+ "Timeout waiting for new assignment propagating to broker")
val topicDescription =
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get()
assertTrue(topicDescription.partitions().size() == 3)
}
@@ -320,6 +320,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2",
"--partitions", "3")))
+ TestUtils.waitUntilTrue(
+ () =>
brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size == 3),
+ "Timeout waiting for new assignment propagating to broker")
val topicDescription =
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get()
assertTrue(topicDescription.partitions().size() == 3)
@@ -404,7 +407,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
TestUtils.waitUntilTrue(
() =>
brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size ==
alteredNumPartitions),
- "Timeout waiting new assignment propagate to broker")
+ "Timeout waiting for new assignment propagating to broker")
assignment =
adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName).partitions()
.asScala.map(info => info.partition() ->
info.replicas().asScala.map(_.id())).toMap