This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 7ef6c448 [hotfix] Wait for leader election to prevent OUT_OF_ORDER
issues
7ef6c448 is described below
commit 7ef6c44859291c83ed20fcdbac667b48538de308
Author: Aleksandr Savonin <[email protected]>
AuthorDate: Tue Feb 3 11:53:36 2026 +0100
[hotfix] Wait for leader election to prevent OUT_OF_ORDER issues
---
.../streaming/connectors/kafka/KafkaTestEnvironmentImpl.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index ee55cb01..27bb8b20 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -164,7 +165,15 @@ public class KafkaTestEnvironmentImpl extends
KafkaTestEnvironment {
return false;
}
TopicDescription topicDescription =
topicDescriptions.get(topic);
- return topicDescription.partitions().size() ==
numberOfPartitions;
+ if (topicDescription.partitions().size() !=
numberOfPartitions) {
+ return false;
+ }
+ // Ensure all partitions have a leader elected.
+ return topicDescription.partitions().stream()
+ .allMatch(
+ p ->
+ p.leader() != null
+ && p.leader().id() !=
Node.noNode().id());
},
Duration.ofSeconds(30),
String.format("New topic \"%s\" is not ready within
timeout", topicObj));