This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 8a08ca676f1 MINOR: Deflake EligibleLeaderReplicasIntegrationTest
(#18923)
8a08ca676f1 is described below
commit 8a08ca676f1c81a9c4bba05be40174016da12884
Author: Calvin Liu <[email protected]>
AuthorDate: Thu Feb 20 05:14:15 2025 -0800
MINOR: Deflake EligibleLeaderReplicasIntegrationTest (#18923)
Make sure to give enough time for the partition ISR updates.
Reviewers: David Jacot <[email protected]>
---
.../EligibleLeaderReplicasIntegrationTest.java | 32 ++++++++++++++--------
1 file changed, 21 insertions(+), 11 deletions(-)
diff --git
a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
index 90503e66b1a..a3dafed5e4c 100644
---
a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
+++
b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java
@@ -70,6 +70,7 @@ import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.mutable.HashMap;
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -202,7 +203,7 @@ public class EligibleLeaderReplicasIntegrationTest extends
KafkaServerTestHarnes
}
void waitUntilOneMessageIsConsumed(Consumer consumer) {
- kafka.utils.TestUtils.waitUntilTrue(
+ TestUtils.waitUntilTrue(
() -> {
try {
ConsumerRecords record =
consumer.poll(Duration.ofMillis(100L));
@@ -212,7 +213,7 @@ public class EligibleLeaderReplicasIntegrationTest extends
KafkaServerTestHarnes
}
},
() -> "fail to consume messages",
- org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L
+ DEFAULT_MAX_WAIT_MS, 100L
);
}
@@ -417,30 +418,39 @@ public class EligibleLeaderReplicasIntegrationTest
extends KafkaServerTestHarnes
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize > 0 && elrSize == 0;
});
- topicPartitionInfo =
adminClient.describeTopics(Collections.singletonList(testTopicName))
- .allTopicNames().get().get(testTopicName).partitions().get(0);
- assertEquals(0, topicPartitionInfo.lastKnownElr().size());
- assertEquals(0, topicPartitionInfo.elr().size());
- assertEquals(lastKnownLeader, topicPartitionInfo.leader().id());
+
+ TestUtils.waitUntilTrue(
+ () -> {
+ try {
+ TopicPartitionInfo partition =
adminClient.describeTopics(Collections.singletonList(testTopicName))
+
.allTopicNames().get().get(testTopicName).partitions().get(0);
+ if (partition.leader() == null) return false;
+ return partition.lastKnownElr().isEmpty() &&
partition.elr().isEmpty() && partition.leader().id() == lastKnownLeader;
+ } catch (Exception e) {
+ return false;
+ }
+ },
+ () -> String.format("Partition metadata for %s is not
correct", testTopicName),
+ DEFAULT_MAX_WAIT_MS, 100L
+ );
} finally {
restartDeadBrokers(false);
}
}
void waitForIsrAndElr(BiFunction<Integer, Integer, Boolean>
isIsrAndElrSizeSatisfied) {
- kafka.utils.TestUtils.waitUntilTrue(
+ TestUtils.waitUntilTrue(
() -> {
try {
TopicDescription topicDescription =
adminClient.describeTopics(Collections.singletonList(testTopicName))
.allTopicNames().get().get(testTopicName);
TopicPartitionInfo partition =
topicDescription.partitions().get(0);
- if
(!isIsrAndElrSizeSatisfied.apply(partition.isr().size(),
partition.elr().size())) return false;
+ return
isIsrAndElrSizeSatisfied.apply(partition.isr().size(), partition.elr().size());
} catch (Exception e) {
return false;
}
- return true;
},
() -> String.format("Partition metadata for %s is not propagated",
testTopicName),
- org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
+ DEFAULT_MAX_WAIT_MS, 100L);
}
}