This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 8a08ca676f1 MINOR: Deflake EligibleLeaderReplicasIntegrationTest 
(#18923)
8a08ca676f1 is described below

commit 8a08ca676f1c81a9c4bba05be40174016da12884
Author: Calvin Liu <[email protected]>
AuthorDate: Thu Feb 20 05:14:15 2025 -0800

    MINOR: Deflake EligibleLeaderReplicasIntegrationTest (#18923)
    
    Make sure to give enough time for the partition ISR updates.
    
    Reviewers: David Jacot <[email protected]>
---
 .../EligibleLeaderReplicasIntegrationTest.java     | 32 ++++++++++++++--------
 1 file changed, 21 insertions(+), 11 deletions(-)

diff --git 
a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
 
b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
index 90503e66b1a..a3dafed5e4c 100644
--- 
a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
+++ 
b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
@@ -70,6 +70,7 @@ import scala.collection.JavaConverters;
 import scala.collection.Seq;
 import scala.collection.mutable.HashMap;
 
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -202,7 +203,7 @@ public class EligibleLeaderReplicasIntegrationTest extends 
KafkaServerTestHarnes
     }
 
     void waitUntilOneMessageIsConsumed(Consumer consumer) {
-        kafka.utils.TestUtils.waitUntilTrue(
+        TestUtils.waitUntilTrue(
             () -> {
                 try {
                     ConsumerRecords record = 
consumer.poll(Duration.ofMillis(100L));
@@ -212,7 +213,7 @@ public class EligibleLeaderReplicasIntegrationTest extends 
KafkaServerTestHarnes
                 }
             },
             () -> "fail to consume messages",
-            org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L
+            DEFAULT_MAX_WAIT_MS, 100L
         );
     }
 
@@ -417,30 +418,39 @@ public class EligibleLeaderReplicasIntegrationTest 
extends KafkaServerTestHarnes
             waitForIsrAndElr((isrSize, elrSize) -> {
                 return isrSize > 0 && elrSize == 0;
             });
-            topicPartitionInfo = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
-                .allTopicNames().get().get(testTopicName).partitions().get(0);
-            assertEquals(0, topicPartitionInfo.lastKnownElr().size());
-            assertEquals(0, topicPartitionInfo.elr().size());
-            assertEquals(lastKnownLeader, topicPartitionInfo.leader().id());
+
+            TestUtils.waitUntilTrue(
+                () -> {
+                    try {
+                        TopicPartitionInfo partition = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                            
.allTopicNames().get().get(testTopicName).partitions().get(0);
+                        if (partition.leader() == null) return false;
+                        return partition.lastKnownElr().isEmpty() && 
partition.elr().isEmpty() && partition.leader().id() == lastKnownLeader;
+                    } catch (Exception e) {
+                        return false;
+                    }
+                },
+                () -> String.format("Partition metadata for %s is not 
correct", testTopicName),
+                DEFAULT_MAX_WAIT_MS, 100L
+            );
         } finally {
             restartDeadBrokers(false);
         }
     }
 
     void waitForIsrAndElr(BiFunction<Integer, Integer, Boolean> 
isIsrAndElrSizeSatisfied) {
-        kafka.utils.TestUtils.waitUntilTrue(
+        TestUtils.waitUntilTrue(
             () -> {
                 try {
                     TopicDescription topicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
                         .allTopicNames().get().get(testTopicName);
                     TopicPartitionInfo partition = 
topicDescription.partitions().get(0);
-                    if 
(!isIsrAndElrSizeSatisfied.apply(partition.isr().size(), 
partition.elr().size())) return false;
+                    return 
isIsrAndElrSizeSatisfied.apply(partition.isr().size(), partition.elr().size());
                 } catch (Exception e) {
                     return false;
                 }
-                return true;
             },
             () -> String.format("Partition metadata for %s is not propagated", 
testTopicName),
-            org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
+            DEFAULT_MAX_WAIT_MS, 100L);
     }
 }

Reply via email to