This is an automated email from the ASF dual-hosted git repository.

MartijnVisser pushed a commit to branch v4.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 9b30d771b1cc9344059c2e192e8a72d083440197
Author: Aleksandr Savonin <[email protected]>
AuthorDate: Mon May 18 23:24:26 2026 +0200

    [FLINK-39699][tests] Stabilize KafkaWriterFaultToleranceITCase 
exception-on-unavailable tests
    
    testFlush/testWrite/testCloseExceptionWhenKafkaUnavailable all rely on
    the producer still having undelivered work when KAFKA_CONTAINER.stop()
    takes effect. Under CI load the sender thread can ship and ack the
    buffered record before stop() returns, so the operation under test
    (flush/write/close) has nothing to fail on and the .rootCause()
    assertion fires with "Expecting actual not to be null" instead of
    seeing the expected NetworkException / TimeoutException.
    
    Drain a warm-up record before stopping the broker, then issue the real
    write while the broker is down. The producer's metadata is already
    cached so write() returns immediately; the sender fails to deliver
    (retries=0); the operation under test reliably surfaces the underlying
    exception.
    
    (cherry picked from commit 887d5941d5b6dee2044b55cac629f040336f7435)
    Generated-by: Claude Code (Opus 4.8)
---
 .../flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java
index 4a7d25c6..1d83cde6 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java
@@ -61,10 +61,12 @@ public class KafkaWriterFaultToleranceITCase extends 
KafkaWriterTestBase {
                         new SinkInitContext(metricGroup, timeService, null))) {
 
             writer.write(1, SINK_WRITER_CONTEXT);
+            writer.getCurrentProducer().flush();
 
             KAFKA_CONTAINER.stop();
 
             try {
+                writer.write(1, SINK_WRITER_CONTEXT);
                 writer.getCurrentProducer().flush();
                 assertThatCode(() -> writer.write(1, SINK_WRITER_CONTEXT))
                         .rootCause()
@@ -84,9 +86,11 @@ public class KafkaWriterFaultToleranceITCase extends 
KafkaWriterTestBase {
                         DeliveryGuarantee.AT_LEAST_ONCE,
                         new SinkInitContext(metricGroup, timeService, null))) {
             writer.write(1, SINK_WRITER_CONTEXT);
+            writer.flush(false);
 
             KAFKA_CONTAINER.stop();
             try {
+                writer.write(1, SINK_WRITER_CONTEXT);
                 assertThatCode(() -> writer.flush(false))
                         .rootCause()
                         .isInstanceOfAny(NetworkException.class, 
TimeoutException.class);
@@ -106,10 +110,12 @@ public class KafkaWriterFaultToleranceITCase extends 
KafkaWriterTestBase {
                         new SinkInitContext(metricGroup, timeService, null));
 
         writer.write(1, SINK_WRITER_CONTEXT);
+        writer.getCurrentProducer().flush();
 
         KAFKA_CONTAINER.stop();
 
         try {
+            writer.write(1, SINK_WRITER_CONTEXT);
             writer.getCurrentProducer().flush();
             // closing producer resource throws exception first
             assertThatCode(() -> writer.close())

Reply via email to