This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch v4.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 55008c5905db687f7e5a6180c05395b3fb0651a3
Author: Arvid Heise <[email protected]>
AuthorDate: Tue May 20 10:13:49 2025 +0200

    [FLINK-37818] Add NoopCommitter for non-EOS
    
    Add NoopCommitter to avoid EOS assumptions to leak into non-EOS sinks.
    
    (cherry picked from commit 04559356a6dbaa9b49830b12ec8216b6e862e03f)
---
 .../src/test/resources/log4j2-test.properties      |  6 ++-
 .../flink/connector/kafka/sink/KafkaSink.java      | 18 ++++---
 .../kafka/sink/internal/KafkaCommitter.java        |  2 +-
 .../kafka/sink/internal/NoopCommitter.java         | 41 +++++++++++++++
 .../kafka/sink/IntegerRecordSerializer.java        | 58 ++++++++++++++++++++++
 .../connector/kafka/sink/KafkaSinkITCase.java      | 27 ++++++++++
 .../connector/kafka/sink/KafkaWriterTestBase.java  | 31 +-----------
 .../src/test/resources/log4j2-test.properties      |  6 ++-
 8 files changed, 147 insertions(+), 42 deletions(-)

diff --git 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/resources/log4j2-test.properties
 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/resources/log4j2-test.properties
index 9c49ae58..a42adbf0 100644
--- 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/resources/log4j2-test.properties
+++ 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/resources/log4j2-test.properties
@@ -37,7 +37,8 @@ appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - 
%m%n
 # If you want to investigate test failures, overwrite the level as above
 logger.container.name = container
 logger.container.level = OFF
-logger.container.additivity = false  # This prevents messages from being 
logged by the root logger
+# This prevents messages from being logged by the root logger
+logger.container.additivity = false
 logger.container.appenderRef.containerappender.ref = ContainerLogger
 
 logger.kafkacontainer.name = container.kafka
@@ -48,7 +49,8 @@ logger.flinkcontainer.level = OFF
 
 logger.flinkenv.name = 
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment
 logger.flinkenv.level = OFF
-logger.flinkenv.additivity = false  # This prevents messages from being logged 
by the root logger
+# This prevents messages from being logged by the root logger
+logger.flinkenv.additivity = false
 logger.flinkenv.appenderRef.containerappender.ref = ContainerLogger
 
 appender.containerappender.name = ContainerLogger
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
index be631bb0..629815d1 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
 import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
 import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
 import org.apache.flink.connector.kafka.sink.internal.KafkaCommitter;
+import org.apache.flink.connector.kafka.sink.internal.NoopCommitter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import 
org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
@@ -111,13 +112,16 @@ public class KafkaSink<IN>
     @Internal
     @Override
     public Committer<KafkaCommittable> createCommitter(CommitterInitContext 
context) {
-        return new KafkaCommitter(
-                kafkaProducerConfig,
-                transactionalIdPrefix,
-                context.getTaskInfo().getIndexOfThisSubtask(),
-                context.getTaskInfo().getAttemptNumber(),
-                transactionNamingStrategy == TransactionNamingStrategy.POOLING,
-                FlinkKafkaInternalProducer::new);
+        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+            return new KafkaCommitter(
+                    kafkaProducerConfig,
+                    transactionalIdPrefix,
+                    context.getTaskInfo().getIndexOfThisSubtask(),
+                    context.getTaskInfo().getAttemptNumber(),
+                    transactionNamingStrategy == 
TransactionNamingStrategy.POOLING,
+                    FlinkKafkaInternalProducer::new);
+        }
+        return new NoopCommitter();
     }
 
     @Internal
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java
index de6abbc7..ff95b1e9 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/KafkaCommitter.java
@@ -61,7 +61,7 @@ public class KafkaCommitter implements 
Committer<KafkaCommittable>, Closeable {
     private final WritableBackchannel<TransactionFinished> backchannel;
     @Nullable private FlinkKafkaInternalProducer<?, ?> committingProducer;
 
-    KafkaCommitter(
+    public KafkaCommitter(
             Properties kafkaProducerConfig,
             String transactionalIdPrefix,
             int subtaskId,
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/NoopCommitter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/NoopCommitter.java
new file mode 100644
index 00000000..0b2e8079
--- /dev/null
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/NoopCommitter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.connector.kafka.sink.KafkaCommittable;
+
+import java.util.Collection;
+
+/**
+ * The committer to be used for non exactly-once delivery guarantees.
+ *
+ * <p>This committer does not commit any records. It is needed because the 
current {@link
+ * org.apache.flink.api.connector.sink2.Sink} design supports only either 
transactional or
+ * non-transactional operation and the {@link 
org.apache.flink.connector.kafka.sink.KafkaSink} is
+ * doing both through {@link 
org.apache.flink.connector.base.DeliveryGuarantee}s.
+ */
+@Internal
+public class NoopCommitter implements Committer<KafkaCommittable> {
+    @Override
+    public void commit(Collection<CommitRequest<KafkaCommittable>> 
committables) {}
+
+    @Override
+    public void close() {}
+}
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/IntegerRecordSerializer.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/IntegerRecordSerializer.java
new file mode 100644
index 00000000..2dcce5da
--- /dev/null
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/IntegerRecordSerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Optional;
+
+/** mock recordSerializer for KafkaSink. */
+class IntegerRecordSerializer
+        implements KafkaRecordSerializationSchema<Integer>, 
KafkaDatasetFacetProvider {
+    private final String topic;
+
+    IntegerRecordSerializer(String topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serialize(
+            Integer element, KafkaSinkContext context, Long timestamp) {
+        if (element == null) {
+            // in general, serializers should be allowed to skip invalid 
elements
+            return null;
+        }
+        byte[] bytes = ByteBuffer.allocate(4).putInt(element).array();
+        return new ProducerRecord<>(topic, bytes, bytes);
+    }
+
+    @Override
+    public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
+        return Optional.of(
+                new DefaultKafkaDatasetFacet(
+                        DefaultKafkaDatasetIdentifier.ofTopics(
+                                
Collections.singletonList(KafkaWriterTestBase.topic))));
+    }
+}
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index fc9ad3d9..4c3ab2d6 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -64,6 +64,7 @@ import 
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.junit5.InjectClusterClient;
 import org.apache.flink.test.junit5.InjectMiniCluster;
@@ -90,6 +91,7 @@ import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
@@ -127,6 +129,7 @@ import static 
org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_PATH
 import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
 import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for using KafkaSink writing to a Kafka cluster. */
 @Testcontainers
@@ -545,6 +548,30 @@ public class KafkaSinkITCase extends TestLogger {
         }
     }
 
+    @ParameterizedTest
+    @EnumSource(DeliveryGuarantee.class)
+    void ensureUniqueTransactionalIdPrefixIfNeeded(DeliveryGuarantee 
guarantee) throws Exception {
+        KafkaSinkBuilder<Integer> builder =
+                new KafkaSinkBuilder<Integer>()
+                        .setDeliveryGuarantee(guarantee)
+                        
.setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
+                        .setRecordSerializer(new 
IntegerRecordSerializer("topic"));
+
+        Configuration config = new Configuration();
+        config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.enableCheckpointing(100);
+        DataStreamSource<Integer> source = env.fromData(1, 2);
+        if (guarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+            assertThatThrownBy(builder::build).hasMessageContaining("unique");
+        } else {
+            source.sinkTo(builder.build());
+            source.sinkTo(builder.build());
+
+            env.execute();
+        }
+    }
+
     private static Configuration createConfiguration(int parallelism) {
         final Configuration config = new Configuration();
         config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
index 1046ee3c..3c13b9ad 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
@@ -23,10 +23,6 @@ import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
-import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
-import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
-import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
-import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
 import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory;
 import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
 import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel;
@@ -43,7 +39,6 @@ import org.apache.flink.util.UserCodeClassLoader;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.junit.jupiter.api.AfterEach;
@@ -59,7 +54,6 @@ import org.testcontainers.junit.jupiter.Testcontainers;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -151,7 +145,7 @@ public abstract class KafkaWriterTestBase {
                 KafkaSink.<Integer>builder()
                         .setKafkaProducerConfig(getKafkaClientConfiguration())
                         .setTransactionalIdPrefix(TEST_PREFIX + writerIndex++)
-                        .setRecordSerializer(new DummyRecordSerializer());
+                        .setRecordSerializer(new 
IntegerRecordSerializer(topic));
         sinkBuilderAdjuster.accept(builder);
         return builder.build();
     }
@@ -235,29 +229,6 @@ public abstract class KafkaWriterTestBase {
         }
     }
 
-    /** mock recordSerializer for KafkaSink. */
-    protected static class DummyRecordSerializer
-            implements KafkaRecordSerializationSchema<Integer>, 
KafkaDatasetFacetProvider {
-        @Override
-        public ProducerRecord<byte[], byte[]> serialize(
-                Integer element, KafkaSinkContext context, Long timestamp) {
-            if (element == null) {
-                // in general, serializers should be allowed to skip invalid 
elements
-                return null;
-            }
-            byte[] bytes = ByteBuffer.allocate(4).putInt(element).array();
-            return new ProducerRecord<>(topic, bytes, bytes);
-        }
-
-        @Override
-        public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
-            return Optional.of(
-                    new DefaultKafkaDatasetFacet(
-                            DefaultKafkaDatasetIdentifier.ofTopics(
-                                    Collections.singletonList(topic))));
-        }
-    }
-
     /**
      * mock context for KafkaWriter#write(java.lang.Object,
      * org.apache.flink.api.connector.sink2.SinkWriter.Context).
diff --git a/flink-connector-kafka/src/test/resources/log4j2-test.properties 
b/flink-connector-kafka/src/test/resources/log4j2-test.properties
index 816170c6..25c60099 100644
--- a/flink-connector-kafka/src/test/resources/log4j2-test.properties
+++ b/flink-connector-kafka/src/test/resources/log4j2-test.properties
@@ -43,12 +43,14 @@ logger.kafka.level = OFF
 # If you want to investigate test failures, overwrite the level as above
 logger.container.name = container
 logger.container.level = OFF
-logger.container.additivity = false  # This prevents messages from being 
logged by the root logger
+# This prevents messages from being logged by the root logger
+logger.container.additivity = false
 logger.container.appenderRef.containerappender.ref = ContainerLogger
 
 logger.flinkenv.name = 
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment
 logger.flinkenv.level = OFF
-logger.flinkenv.additivity = false  # This prevents messages from being logged 
by the root logger
+# This prevents messages from being logged by the root logger
+logger.flinkenv.additivity = false
 logger.flinkenv.appenderRef.containerappender.ref = ContainerLogger
 
 appender.containerappender.name = ContainerLogger

Reply via email to