This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9ec232fde8 KAFKA-13832: Fix flaky testAlterAssignment (#12060)
9ec232fde8 is described below

commit 9ec232fde850fc410d4adb07e1f259cab21336ad
Author: dengziming <[email protected]>
AuthorDate: Tue Apr 19 15:08:29 2022 +0800

    KAFKA-13832: Fix flaky testAlterAssignment (#12060)
    
    In KRaft mode the metadata is not propagate in time, so we should should 
wait for it before make assertions.
    
    Reviewers:  Luke Chen <[email protected]>
---
 .../integration/kafka/admin/TopicCommandIntegrationTest.scala      | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
index 9abf7487d7..99b2f3e0b7 100644
--- 
a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
@@ -306,7 +306,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 
     TestUtils.waitUntilTrue(
       () => 
brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size == 3),
-      "Timeout waiting new assignment propagate to broker")
+      "Timeout waiting for new assignment propagating to broker")
     val topicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get()
     assertTrue(topicDescription.partitions().size() == 3)
   }
@@ -320,6 +320,9 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 
     topicService.alterTopic(new TopicCommandOptions(
       Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", 
"--partitions", "3")))
+    TestUtils.waitUntilTrue(
+      () => 
brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size == 3),
+      "Timeout waiting for new assignment propagating to broker")
 
     val topicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get()
     assertTrue(topicDescription.partitions().size() == 3)
@@ -404,7 +407,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 
     TestUtils.waitUntilTrue(
       () => 
brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size == 
alteredNumPartitions),
-      "Timeout waiting new assignment propagate to broker")
+      "Timeout waiting for new assignment propagating to broker")
     assignment = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
       .allTopicNames().get().get(testTopicName).partitions()
       .asScala.map(info => info.partition() -> 
info.replicas().asScala.map(_.id())).toMap

Reply via email to