This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 04559356a6dbaa9b49830b12ec8216b6e862e03f Author: Arvid Heise <[email protected]> AuthorDate: Tue May 20 10:13:49 2025 +0200 [FLINK-37818] Add NoopCommitter for non-EOS Add NoopCommitter to avoid EOS assumptions to leak into non-EOS sinks. --- .../src/test/resources/log4j2-test.properties | 6 ++- .../flink/connector/kafka/sink/KafkaSink.java | 18 ++++--- .../kafka/sink/internal/KafkaCommitter.java | 2 +- .../kafka/sink/internal/NoopCommitter.java | 41 +++++++++++++++ .../kafka/sink/IntegerRecordSerializer.java | 58 ++++++++++++++++++++++ .../connector/kafka/sink/KafkaSinkITCase.java | 27 ++++++++++ .../connector/kafka/sink/KafkaWriterTestBase.java | 31 +----------- .../src/test/resources/log4j2-test.properties | 6 ++- 8 files changed, 147 insertions(+), 42 deletions(-) diff --git a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/resources/log4j2-test.properties b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/resources/log4j2-test.properties index 9c49ae58..a42adbf0 100644 --- a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/resources/log4j2-test.properties +++ b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/resources/log4j2-test.properties @@ -37,7 +37,8 @@ appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n # If you want to investigate test failures, overwrite the level as above logger.container.name = container logger.container.level = OFF -logger.container.additivity = false # This prevents messages from being logged by the root logger +# This prevents messages from being logged by the root logger +logger.container.additivity = false logger.container.appenderRef.containerappender.ref = ContainerLogger logger.kafkacontainer.name = container.kafka @@ -48,7 +49,8 @@ logger.flinkcontainer.level = OFF logger.flinkenv.name = org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment logger.flinkenv.level = OFF -logger.flinkenv.additivity = false # This prevents messages from being logged by the root logger +# This prevents messages from being logged by the root logger +logger.flinkenv.additivity = false logger.flinkenv.appenderRef.containerappender.ref = ContainerLogger appender.containerappender.name = ContainerLogger diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java index be631bb0..629815d1 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java @@ -32,6 +32,7 @@ import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet; import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider; import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer; import org.apache.flink.connector.kafka.sink.internal.KafkaCommitter; +import org.apache.flink.connector.kafka.sink.internal.NoopCommitter; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; @@ -111,13 +112,16 @@ public class KafkaSink<IN> @Internal @Override public Committer<KafkaCommittable> createCommitter(CommitterInitContext context) { - return new KafkaCommitter( - kafkaProducerConfig, - transactionalIdPrefix, - context.getTaskInfo().getIndexOfThisSubtask(), - context.getTaskInfo().getAttemptNumber(), - transactionNamingStrategy == TransactionNamingStrategy.POOLING, - FlinkKafkaInternalProducer::new); + if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { + return new KafkaCommitter( + kafkaProducerConfig, + transactionalIdPrefix, + context.getTaskInfo().getIndexOfThisSubtask(), + context.getTaskInfo().getAttemptNumber(), + transactionNamingStrategy == TransactionNamingStrategy.POOLING, + FlinkKafkaInternalProducer::new); + } + return new NoopCommitter(); } @Internal diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java index de6abbc7..ff95b1e9 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java @@ -61,7 +61,7 @@ public class KafkaCommitter implements Committer<KafkaCommittable>, Closeable { private final WritableBackchannel<TransactionFinished> backchannel; @Nullable private FlinkKafkaInternalProducer<?, ?> committingProducer; - KafkaCommitter( + public KafkaCommitter( Properties kafkaProducerConfig, String transactionalIdPrefix, int subtaskId, diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/NoopCommitter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/NoopCommitter.java new file mode 100644 index 00000000..0b2e8079 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/NoopCommitter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.connector.kafka.sink.KafkaCommittable; + +import java.util.Collection; + +/** + * The committer to be used for non exactly-once delivery guarantees. + * + * <p>This committer does not commit any records. It is needed because the current {@link + * org.apache.flink.api.connector.sink2.Sink} design supports only either transactional or + * non-transactional operation and the {@link org.apache.flink.connector.kafka.sink.KafkaSink} is + * doing both through {@link org.apache.flink.connector.base.DeliveryGuarantee}s. + */ +@Internal +public class NoopCommitter implements Committer<KafkaCommittable> { + @Override + public void commit(Collection<CommitRequest<KafkaCommittable>> committables) {} + + @Override + public void close() {} +} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/IntegerRecordSerializer.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/IntegerRecordSerializer.java new file mode 100644 index 00000000..2dcce5da --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/IntegerRecordSerializer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Optional; + +/** mock recordSerializer for KafkaSink. */ +class IntegerRecordSerializer + implements KafkaRecordSerializationSchema<Integer>, KafkaDatasetFacetProvider { + private final String topic; + + IntegerRecordSerializer(String topic) { + this.topic = topic; + } + + @Override + public ProducerRecord<byte[], byte[]> serialize( + Integer element, KafkaSinkContext context, Long timestamp) { + if (element == null) { + // in general, serializers should be allowed to skip invalid elements + return null; + } + byte[] bytes = ByteBuffer.allocate(4).putInt(element).array(); + return new ProducerRecord<>(topic, bytes, bytes); + } + + @Override + public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() { + return Optional.of( + new DefaultKafkaDatasetFacet( + DefaultKafkaDatasetIdentifier.ofTopics( + Collections.singletonList(KafkaWriterTestBase.topic)))); + } +} diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java index fc9ad3d9..4c3ab2d6 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java @@ -64,6 +64,7 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.junit5.InjectClusterClient; import org.apache.flink.test.junit5.InjectMiniCluster; @@ -90,6 +91,7 @@ import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; @@ -127,6 +129,7 @@ import static org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_PATH import static org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak; import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for using KafkaSink writing to a Kafka cluster. */ @Testcontainers @@ -545,6 +548,30 @@ public class KafkaSinkITCase extends TestLogger { } } + @ParameterizedTest + @EnumSource(DeliveryGuarantee.class) + void ensureUniqueTransactionalIdPrefixIfNeeded(DeliveryGuarantee guarantee) throws Exception { + KafkaSinkBuilder<Integer> builder = + new KafkaSinkBuilder<Integer>() + .setDeliveryGuarantee(guarantee) + .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers()) + .setRecordSerializer(new IntegerRecordSerializer("topic")); + + Configuration config = new Configuration(); + config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + env.enableCheckpointing(100); + DataStreamSource<Integer> source = env.fromData(1, 2); + if (guarantee == DeliveryGuarantee.EXACTLY_ONCE) { + assertThatThrownBy(builder::build).hasMessageContaining("unique"); + } else { + source.sinkTo(builder.build()); + source.sinkTo(builder.build()); + + env.execute(); + } + } + private static Configuration createConfiguration(int parallelism) { final Configuration config = new Configuration(); config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java index 1046ee3c..3c13b9ad 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java @@ -23,10 +23,6 @@ import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; -import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet; -import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; -import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet; -import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory; import org.apache.flink.connector.kafka.sink.internal.TransactionFinished; import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel; @@ -43,7 +39,6 @@ import org.apache.flink.util.UserCodeClassLoader; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.jupiter.api.AfterEach; @@ -59,7 +54,6 @@ import org.testcontainers.junit.jupiter.Testcontainers; import javax.annotation.Nullable; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -151,7 +145,7 @@ public abstract class KafkaWriterTestBase { KafkaSink.<Integer>builder() .setKafkaProducerConfig(getKafkaClientConfiguration()) .setTransactionalIdPrefix(TEST_PREFIX + writerIndex++) - .setRecordSerializer(new DummyRecordSerializer()); + .setRecordSerializer(new IntegerRecordSerializer(topic)); sinkBuilderAdjuster.accept(builder); return builder.build(); } @@ -235,29 +229,6 @@ public abstract class KafkaWriterTestBase { } } - /** mock recordSerializer for KafkaSink. */ - protected static class DummyRecordSerializer - implements KafkaRecordSerializationSchema<Integer>, KafkaDatasetFacetProvider { - @Override - public ProducerRecord<byte[], byte[]> serialize( - Integer element, KafkaSinkContext context, Long timestamp) { - if (element == null) { - // in general, serializers should be allowed to skip invalid elements - return null; - } - byte[] bytes = ByteBuffer.allocate(4).putInt(element).array(); - return new ProducerRecord<>(topic, bytes, bytes); - } - - @Override - public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() { - return Optional.of( - new DefaultKafkaDatasetFacet( - DefaultKafkaDatasetIdentifier.ofTopics( - Collections.singletonList(topic)))); - } - } - /** * mock context for KafkaWriter#write(java.lang.Object, * org.apache.flink.api.connector.sink2.SinkWriter.Context). diff --git a/flink-connector-kafka/src/test/resources/log4j2-test.properties b/flink-connector-kafka/src/test/resources/log4j2-test.properties index 4fc6a34a..db0c4757 100644 --- a/flink-connector-kafka/src/test/resources/log4j2-test.properties +++ b/flink-connector-kafka/src/test/resources/log4j2-test.properties @@ -43,12 +43,14 @@ logger.kafka.level = OFF # If you want to investigate test failures, overwrite the level as above logger.container.name = container logger.container.level = OFF -logger.container.additivity = false # This prevents messages from being logged by the root logger +# This prevents messages from being logged by the root logger +logger.container.additivity = false logger.container.appenderRef.containerappender.ref = ContainerLogger logger.flinkenv.name = org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment logger.flinkenv.level = OFF -logger.flinkenv.additivity = false # This prevents messages from being logged by the root logger +# This prevents messages from being logged by the root logger +logger.flinkenv.additivity = false logger.flinkenv.appenderRef.containerappender.ref = ContainerLogger appender.containerappender.name = ContainerLogger
