This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 765b1e755e002c00f4bcf02863db49126c3d5f96
Author: Arvid Heise <ar...@apache.org>
AuthorDate: Thu Apr 3 22:33:03 2025 +0200

    [FLINK-37611] Deflake 
ExactlyOnceKafkaWriterITCase#shouldAbortLingeringTransactions
    
    The test was actually not working correctly since writer's get their unique 
prefix (pool rework PR). The test mostly succeeded since the partitions in 
which records were written was non-deterministic and more often than not the 3 
records didn't meet in the 10 partitions which resulted in an incorrect pass.
    
    Now the value is also passed as a key, which makes partitions assignment 
deterministic and for this test we just write the exact same value 3 times.
---
 .../kafka/sink/ExactlyOnceKafkaWriterITCase.java   | 63 +++++++++++++---------
 .../connector/kafka/sink/KafkaWriterTestBase.java  | 12 ++++-
 .../src/test/resources/log4j2-test.properties      |  6 ++-
 3 files changed, 54 insertions(+), 27 deletions(-)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java
index 8980af63..e04613c1 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.connector.kafka.sink;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
@@ -43,6 +44,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
+import java.util.function.Consumer;
 
 import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.drainAllRecordsFromTopic;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -61,6 +63,9 @@ public class ExactlyOnceKafkaWriterITCase extends 
KafkaWriterTestBase {
                             .setConfiguration(new Configuration())
                             .build());
 
+    private static final Consumer<KafkaSinkBuilder<?>> EXACTLY_ONCE =
+            sink -> sink.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE);
+
     @Test
     void testFlushAsyncErrorPropagationAndErrorCounter() throws Exception {
         Properties properties = getKafkaClientConfiguration();
@@ -197,36 +202,35 @@ public class ExactlyOnceKafkaWriterITCase extends 
KafkaWriterTestBase {
 
     /** Test that producer is not accidentally recreated or pool is used. */
     @Test
-    void testLingeringTransaction() throws Exception {
-        final KafkaWriter<Integer> failedWriter = 
createWriter(DeliveryGuarantee.EXACTLY_ONCE);
+    void shouldAbortLingeringTransactions() throws Exception {
+        try (final ExactlyOnceKafkaWriter<Integer> failedWriter =
+                createWriter(DeliveryGuarantee.EXACTLY_ONCE)) {
 
-        // create two lingering transactions
-        failedWriter.flush(false);
-        failedWriter.prepareCommit();
-        failedWriter.snapshotState(1);
-        failedWriter.flush(false);
-        failedWriter.prepareCommit();
-        failedWriter.snapshotState(2);
+            // create two lingering transactions
+            onCheckpointBarrier(failedWriter, 1);
+            onCheckpointBarrier(failedWriter, 2);
 
-        try (final KafkaWriter<Integer> recoveredWriter =
-                createWriter(DeliveryGuarantee.EXACTLY_ONCE)) {
-            recoveredWriter.write(1, SINK_WRITER_CONTEXT);
+            // use state to ensure that the new writer knows about the old 
prefix
+            KafkaWriterState state = new 
KafkaWriterState(failedWriter.getTransactionalIdPrefix());
 
-            recoveredWriter.flush(false);
-            Collection<KafkaCommittable> committables = 
recoveredWriter.prepareCommit();
-            recoveredWriter.snapshotState(1);
-            assertThat(committables).hasSize(1);
-            final KafkaCommittable committable = 
committables.stream().findFirst().get();
-            assertThat(committable.getProducer().isPresent()).isTrue();
+            try (final KafkaWriter<Integer> recoveredWriter =
+                    restoreWriter(EXACTLY_ONCE, List.of(state), 
createInitContext())) {
+                recoveredWriter.write(1, SINK_WRITER_CONTEXT);
 
-            committable.getProducer().get().commitTransaction();
+                recoveredWriter.flush(false);
+                Collection<KafkaCommittable> committables = 
recoveredWriter.prepareCommit();
+                recoveredWriter.snapshotState(1);
+                assertThat(committables).hasSize(1);
+                final KafkaCommittable committable = 
committables.stream().findFirst().get();
+                assertThat(committable.getProducer().isPresent()).isTrue();
 
-            List<ConsumerRecord<byte[], byte[]>> records =
-                    drainAllRecordsFromTopic(topic, 
getKafkaClientConfiguration(), true);
-            assertThat(records).hasSize(1);
-        }
+                committable.getProducer().get().commitTransaction();
 
-        failedWriter.close();
+                List<ConsumerRecord<byte[], byte[]>> records =
+                        drainAllRecordsFromTopic(topic, 
getKafkaClientConfiguration(), true);
+                assertThat(records).hasSize(1);
+            }
+        }
     }
 
     /** Test that producers are reused when committed. */
@@ -332,4 +336,15 @@ public class ExactlyOnceKafkaWriterITCase extends 
KafkaWriterTestBase {
             ExactlyOnceKafkaWriter<Integer> writer) {
         return ((ProducerPoolImpl) writer.getProducerPool()).getProducers();
     }
+
+    private Tuple2<KafkaWriterState, KafkaCommittable> onCheckpointBarrier(
+            KafkaWriter<Integer> failedWriter, int checkpointId)
+            throws IOException, InterruptedException {
+        // constant number to force the same partition
+        failedWriter.write(1, SINK_WRITER_CONTEXT);
+        failedWriter.flush(false);
+        KafkaCommittable committable = 
Iterables.getOnlyElement(failedWriter.prepareCommit());
+        KafkaWriterState state = 
Iterables.getOnlyElement(failedWriter.snapshotState(checkpointId));
+        return Tuple2.of(state, committable);
+    }
 }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
index e4e98ea8..11de7167 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
@@ -56,6 +56,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Map;
@@ -129,6 +130,14 @@ public abstract class KafkaWriterTestBase {
         return (T) 
createSink(sinkBuilderAdjuster).createWriter(sinkInitContext);
     }
 
+    @SuppressWarnings("unchecked")
+    <T extends KafkaWriter<?>> T restoreWriter(
+            Consumer<KafkaSinkBuilder<?>> sinkBuilderAdjuster,
+            Collection<KafkaWriterState> recoveredState,
+            SinkInitContext initContext) {
+        return (T) createSink(sinkBuilderAdjuster).restoreWriter(initContext, 
recoveredState);
+    }
+
     KafkaSink<Integer> createSink(Consumer<KafkaSinkBuilder<?>> 
sinkBuilderAdjuster) {
         KafkaSinkBuilder<Integer> builder =
                 KafkaSink.<Integer>builder()
@@ -223,7 +232,8 @@ public abstract class KafkaWriterTestBase {
                 // in general, serializers should be allowed to skip invalid 
elements
                 return null;
             }
-            return new ProducerRecord<>(topic, 
ByteBuffer.allocate(4).putInt(element).array());
+            byte[] bytes = ByteBuffer.allocate(4).putInt(element).array();
+            return new ProducerRecord<>(topic, bytes, bytes);
         }
     }
 
diff --git a/flink-connector-kafka/src/test/resources/log4j2-test.properties 
b/flink-connector-kafka/src/test/resources/log4j2-test.properties
index 920652c9..4fc6a34a 100644
--- a/flink-connector-kafka/src/test/resources/log4j2-test.properties
+++ b/flink-connector-kafka/src/test/resources/log4j2-test.properties
@@ -29,9 +29,11 @@ appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - 
%m%n
 
 # Overwrite the level for all Flink related loggers
 logger.flink.name = org.apache.flink
-logger.flink.level = OFF # WARN for starting debugging
+# WARN for starting debugging
+logger.flink.level = OFF
 logger.flinkconnector.name = org.apache.flink.connector
-logger.flinkconnector.level = OFF # INFO/DEBUG for starting debugging
+# INFO/DEBUG for starting debugging
+logger.flinkconnector.level = OFF
 
 # Kafka producer and consumer level
 logger.kafka.name = org.apache.kafka

Reply via email to