This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 765b1e755e002c00f4bcf02863db49126c3d5f96 Author: Arvid Heise <ar...@apache.org> AuthorDate: Thu Apr 3 22:33:03 2025 +0200 [FLINK-37611] Deflake ExactlyOnceKafkaWriterITCase#shouldAbortLingeringTransactions The test was actually not working correctly since writer's get their unique prefix (pool rework PR). The test mostly succeeded since the partitions in which records were written was non-deterministic and more often than not the 3 records didn't meet in the 10 partitions which resulted in an incorrect pass. Now the value is also passed as a key, which makes partitions assignment deterministic and for this test we just write the exact same value 3 times. --- .../kafka/sink/ExactlyOnceKafkaWriterITCase.java | 63 +++++++++++++--------- .../connector/kafka/sink/KafkaWriterTestBase.java | 12 ++++- .../src/test/resources/log4j2-test.properties | 6 ++- 3 files changed, 54 insertions(+), 27 deletions(-) diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java index 8980af63..e04613c1 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java @@ -17,6 +17,7 @@ package org.apache.flink.connector.kafka.sink; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer; @@ -43,6 +44,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Properties; +import java.util.function.Consumer; import static org.apache.flink.connector.kafka.testutils.KafkaUtil.drainAllRecordsFromTopic; import static org.assertj.core.api.Assertions.assertThat; @@ -61,6 +63,9 @@ public class ExactlyOnceKafkaWriterITCase extends KafkaWriterTestBase { .setConfiguration(new Configuration()) .build()); + private static final Consumer<KafkaSinkBuilder<?>> EXACTLY_ONCE = + sink -> sink.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE); + @Test void testFlushAsyncErrorPropagationAndErrorCounter() throws Exception { Properties properties = getKafkaClientConfiguration(); @@ -197,36 +202,35 @@ public class ExactlyOnceKafkaWriterITCase extends KafkaWriterTestBase { /** Test that producer is not accidentally recreated or pool is used. */ @Test - void testLingeringTransaction() throws Exception { - final KafkaWriter<Integer> failedWriter = createWriter(DeliveryGuarantee.EXACTLY_ONCE); + void shouldAbortLingeringTransactions() throws Exception { + try (final ExactlyOnceKafkaWriter<Integer> failedWriter = + createWriter(DeliveryGuarantee.EXACTLY_ONCE)) { - // create two lingering transactions - failedWriter.flush(false); - failedWriter.prepareCommit(); - failedWriter.snapshotState(1); - failedWriter.flush(false); - failedWriter.prepareCommit(); - failedWriter.snapshotState(2); + // create two lingering transactions + onCheckpointBarrier(failedWriter, 1); + onCheckpointBarrier(failedWriter, 2); - try (final KafkaWriter<Integer> recoveredWriter = - createWriter(DeliveryGuarantee.EXACTLY_ONCE)) { - recoveredWriter.write(1, SINK_WRITER_CONTEXT); + // use state to ensure that the new writer knows about the old prefix + KafkaWriterState state = new KafkaWriterState(failedWriter.getTransactionalIdPrefix()); - recoveredWriter.flush(false); - Collection<KafkaCommittable> committables = recoveredWriter.prepareCommit(); - recoveredWriter.snapshotState(1); - assertThat(committables).hasSize(1); - final KafkaCommittable committable = committables.stream().findFirst().get(); - assertThat(committable.getProducer().isPresent()).isTrue(); + try (final KafkaWriter<Integer> recoveredWriter = + restoreWriter(EXACTLY_ONCE, List.of(state), createInitContext())) { + recoveredWriter.write(1, SINK_WRITER_CONTEXT); - committable.getProducer().get().commitTransaction(); + recoveredWriter.flush(false); + Collection<KafkaCommittable> committables = recoveredWriter.prepareCommit(); + recoveredWriter.snapshotState(1); + assertThat(committables).hasSize(1); + final KafkaCommittable committable = committables.stream().findFirst().get(); + assertThat(committable.getProducer().isPresent()).isTrue(); - List<ConsumerRecord<byte[], byte[]>> records = - drainAllRecordsFromTopic(topic, getKafkaClientConfiguration(), true); - assertThat(records).hasSize(1); - } + committable.getProducer().get().commitTransaction(); - failedWriter.close(); + List<ConsumerRecord<byte[], byte[]>> records = + drainAllRecordsFromTopic(topic, getKafkaClientConfiguration(), true); + assertThat(records).hasSize(1); + } + } } /** Test that producers are reused when committed. */ @@ -332,4 +336,15 @@ public class ExactlyOnceKafkaWriterITCase extends KafkaWriterTestBase { ExactlyOnceKafkaWriter<Integer> writer) { return ((ProducerPoolImpl) writer.getProducerPool()).getProducers(); } + + private Tuple2<KafkaWriterState, KafkaCommittable> onCheckpointBarrier( + KafkaWriter<Integer> failedWriter, int checkpointId) + throws IOException, InterruptedException { + // constant number to force the same partition + failedWriter.write(1, SINK_WRITER_CONTEXT); + failedWriter.flush(false); + KafkaCommittable committable = Iterables.getOnlyElement(failedWriter.prepareCommit()); + KafkaWriterState state = Iterables.getOnlyElement(failedWriter.snapshotState(checkpointId)); + return Tuple2.of(state, committable); + } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java index e4e98ea8..11de7167 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java @@ -56,6 +56,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.Map; @@ -129,6 +130,14 @@ public abstract class KafkaWriterTestBase { return (T) createSink(sinkBuilderAdjuster).createWriter(sinkInitContext); } + @SuppressWarnings("unchecked") + <T extends KafkaWriter<?>> T restoreWriter( + Consumer<KafkaSinkBuilder<?>> sinkBuilderAdjuster, + Collection<KafkaWriterState> recoveredState, + SinkInitContext initContext) { + return (T) createSink(sinkBuilderAdjuster).restoreWriter(initContext, recoveredState); + } + KafkaSink<Integer> createSink(Consumer<KafkaSinkBuilder<?>> sinkBuilderAdjuster) { KafkaSinkBuilder<Integer> builder = KafkaSink.<Integer>builder() @@ -223,7 +232,8 @@ public abstract class KafkaWriterTestBase { // in general, serializers should be allowed to skip invalid elements return null; } - return new ProducerRecord<>(topic, ByteBuffer.allocate(4).putInt(element).array()); + byte[] bytes = ByteBuffer.allocate(4).putInt(element).array(); + return new ProducerRecord<>(topic, bytes, bytes); } } diff --git a/flink-connector-kafka/src/test/resources/log4j2-test.properties b/flink-connector-kafka/src/test/resources/log4j2-test.properties index 920652c9..4fc6a34a 100644 --- a/flink-connector-kafka/src/test/resources/log4j2-test.properties +++ b/flink-connector-kafka/src/test/resources/log4j2-test.properties @@ -29,9 +29,11 @@ appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n # Overwrite the level for all Flink related loggers logger.flink.name = org.apache.flink -logger.flink.level = OFF # WARN for starting debugging +# WARN for starting debugging +logger.flink.level = OFF logger.flinkconnector.name = org.apache.flink.connector -logger.flinkconnector.level = OFF # INFO/DEBUG for starting debugging +# INFO/DEBUG for starting debugging +logger.flinkconnector.level = OFF # Kafka producer and consumer level logger.kafka.name = org.apache.kafka