This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 0999393  KAFKA-13010: Retry getting tasks incase of rebalance for 
TaskMetadata tests (#11083)
0999393 is described below

commit 09993933cb9c9291a916d59ec3c6d9544ad1527a
Author: Walker Carlson <[email protected]>
AuthorDate: Tue Jul 20 16:56:07 2021 -0500

    KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests 
(#11083)
    
    If there is a cooperative rebalance the tasks might not be assigned to a 
thread at all for a very short timeframe, causing this test to fail. We can 
just retry getting the metadata until the group has finished rebalancing and 
all tasks are assigned
    
    Reviewers: Bruno Cadonna <[email protected]>, Anna Sophie Blee-Goldman 
<[email protected]>, Josep Prat <[email protected]>
---
 .../streams/integration/TaskMetadataIntegrationTest.java     | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
index 8639242..6f35d12 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
@@ -45,6 +45,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -158,10 +159,13 @@ public class TaskMetadataIntegrationTest {
         }
     }
 
-    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) {
-        final List<TaskMetadata> taskMetadataList = 
kafkaStreams.metadataForLocalThreads().stream().flatMap(t -> 
t.activeTasks().stream()).collect(Collectors.toList());
-        assertThat("only one task", taskMetadataList.size() == 1);
-        return taskMetadataList.get(0);
+    private TaskMetadata getTaskMetadata(final KafkaStreams kafkaStreams) 
throws InterruptedException {
+        final AtomicReference<List<TaskMetadata>> taskMetadataList = new 
AtomicReference<>();
+        TestUtils.waitForCondition(() -> {
+            
taskMetadataList.set(kafkaStreams.metadataForLocalThreads().stream().flatMap(t 
-> t.activeTasks().stream()).collect(Collectors.toList()));
+            return taskMetadataList.get().size() == 1;
+        }, "The number of active tasks returned in the allotted time was not 
one.");
+        return taskMetadataList.get().get(0);
     }
 
     @After

Reply via email to