This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ef6c448 [hotfix] Wait for leader election to prevent OUT_OF_ORDER 
issues
7ef6c448 is described below

commit 7ef6c44859291c83ed20fcdbac667b48538de308
Author: Aleksandr Savonin <[email protected]>
AuthorDate: Tue Feb 3 11:53:36 2026 +0100

    [hotfix] Wait for leader election to prevent OUT_OF_ORDER issues
---
 .../streaming/connectors/kafka/KafkaTestEnvironmentImpl.java  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index ee55cb01..27bb8b20 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.admin.TopicListing;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -164,7 +165,15 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                             return false;
                         }
                         TopicDescription topicDescription = 
topicDescriptions.get(topic);
-                        return topicDescription.partitions().size() == 
numberOfPartitions;
+                        if (topicDescription.partitions().size() != 
numberOfPartitions) {
+                            return false;
+                        }
+                        // Ensure all partitions have a leader elected.
+                        return topicDescription.partitions().stream()
+                                .allMatch(
+                                        p ->
+                                                p.leader() != null
+                                                        && p.leader().id() != 
Node.noNode().id());
                     },
                     Duration.ofSeconds(30),
                     String.format("New topic \"%s\" is not ready within 
timeout", topicObj));

Reply via email to