This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit d43fcbd1583bd60e1ca12147d68b04adeb53d72f
Author: Arvid Heise <ar...@apache.org>
AuthorDate: Sat Feb 8 17:29:13 2025 +0100

    [FLINK-34554] Introduce transaction strategies
    
    Move existing way of naming and aborting transactions into specific 
strategies. Next commit will add new strategies.
---
 .../kafka/sink/ExactlyOnceKafkaWriter.java         |  73 ++++++++---
 .../flink/connector/kafka/sink/KafkaSink.java      |   7 +-
 .../connector/kafka/sink/KafkaSinkBuilder.java     |  27 +++-
 .../connector/kafka/sink/TransactionAborter.java   | 128 -------------------
 .../kafka/sink/TransactionNamingStrategy.java      |  77 ++++++++++++
 .../connector/kafka/sink/internal/Backchannel.java |   3 +
 .../sink/internal/FlinkKafkaInternalProducer.java  |   7 +-
 .../kafka/sink/internal/ProducerPool.java          |   3 +
 .../kafka/sink/internal/ProducerPoolImpl.java      |   2 +
 .../kafka/sink/internal/ReadableBackchannel.java   |   3 +
 .../TransactionAbortStrategyContextImpl.java       |  76 +++++++++++
 .../internal/TransactionAbortStrategyImpl.java     | 139 +++++++++++++++++++++
 .../TransactionNamingStrategyContextImpl.java      |  80 ++++++++++++
 .../internal/TransactionNamingStrategyImpl.java    |  79 ++++++++++++
 .../kafka/sink/internal/WritableBackchannel.java   |   3 +
 .../connector/kafka/sink/KafkaSinkBuilderTest.java |   4 +-
 .../flink/connector/kafka/sink/KafkaSinkTest.java  |  40 ++++--
 17 files changed, 586 insertions(+), 165 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
index 741d00f7..32131056 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
@@ -26,8 +26,11 @@ import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer
 import org.apache.flink.connector.kafka.sink.internal.ProducerPool;
 import org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl;
 import org.apache.flink.connector.kafka.sink.internal.ReadableBackchannel;
+import 
org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyContextImpl;
+import 
org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyImpl;
 import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
-import org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory;
+import 
org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyContextImpl;
+import 
org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyImpl;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -54,7 +57,17 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
     private static final Logger LOG = 
LoggerFactory.getLogger(ExactlyOnceKafkaWriter.class);
+    /**
+     * Prefix for the transactional id. Must be unique across all sinks 
writing to the same broker.
+     */
     private final String transactionalIdPrefix;
+    /**
+     * Strategy to abort lingering transactions from previous executions 
during writer
+     * initialization.
+     */
+    private final TransactionAbortStrategyImpl transactionAbortStrategy;
+    /** Strategy to name transactions. */
+    private final TransactionNamingStrategyImpl transactionNamingStrategy;
 
     private final KafkaWriterState kafkaWriterState;
     private final Collection<KafkaWriterState> recoveredStates;
@@ -72,6 +85,8 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
      * related methods.
      */
     private final ReadableBackchannel<TransactionFinished> backchannel;
+    /** The context used to name transactions. */
+    private final TransactionNamingStrategyContextImpl namingContext;
 
     /**
      * Constructor creating a kafka writer.
@@ -95,6 +110,8 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
             WriterInitContext sinkInitContext,
             KafkaRecordSerializationSchema<IN> recordSerializer,
             SerializationSchema.InitializationContext schemaContext,
+            TransactionAbortStrategyImpl transactionAbortStrategy,
+            TransactionNamingStrategyImpl transactionNamingStrategy,
             Collection<KafkaWriterState> recoveredStates) {
         super(
                 deliveryGuarantee,
@@ -104,6 +121,11 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
                 schemaContext);
         this.transactionalIdPrefix =
                 checkNotNull(transactionalIdPrefix, "transactionalIdPrefix 
must not be null");
+        this.transactionAbortStrategy =
+                checkNotNull(transactionAbortStrategy, 
"transactionAbortStrategy must not be null");
+        this.transactionNamingStrategy =
+                checkNotNull(
+                        transactionNamingStrategy, "transactionNamingStrategy 
must not be null");
 
         try {
             recordSerializer.open(schemaContext, kafkaSinkContext);
@@ -127,6 +149,9 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
                                 subtaskId,
                                 
sinkInitContext.getTaskInfo().getAttemptNumber(),
                                 transactionalIdPrefix);
+        this.namingContext =
+                new TransactionNamingStrategyContextImpl(
+                        transactionalIdPrefix, subtaskId, 
restoredCheckpointId, producerPool);
     }
 
     @Override
@@ -147,13 +172,10 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
     }
 
     private FlinkKafkaInternalProducer<byte[], byte[]> startTransaction(long 
checkpointId) {
+        namingContext.setNextCheckpointId(checkpointId);
         FlinkKafkaInternalProducer<byte[], byte[]> producer =
-                producerPool.getTransactionalProducer(
-                        TransactionalIdFactory.buildTransactionalId(
-                                transactionalIdPrefix,
-                                kafkaSinkContext.getParallelInstanceId(),
-                                checkpointId),
-                        checkpointId);
+                
transactionNamingStrategy.getTransactionalProducer(namingContext);
+        namingContext.setLastCheckpointId(checkpointId);
         producer.beginTransaction();
         return producer;
     }
@@ -236,13 +258,34 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
             }
         }
 
-        try (TransactionAborter transactionAborter =
-                new TransactionAborter(
-                        kafkaSinkContext.getParallelInstanceId(),
-                        kafkaSinkContext.getNumberOfParallelInstances(),
-                        id -> producerPool.getTransactionalProducer(id, 
startCheckpointId),
-                        producerPool::recycle)) {
-            transactionAborter.abortLingeringTransactions(prefixesToAbort, 
startCheckpointId);
-        }
+        LOG.info(
+                "Aborting lingering transactions with prefixes {} using {}",
+                prefixesToAbort,
+                transactionAbortStrategy);
+        TransactionAbortStrategyContextImpl context =
+                getTransactionAbortStrategyContext(startCheckpointId, 
prefixesToAbort);
+        transactionAbortStrategy.abortTransactions(context);
+    }
+
+    private TransactionAbortStrategyContextImpl 
getTransactionAbortStrategyContext(
+            long startCheckpointId, List<String> prefixesToAbort) {
+        TransactionAbortStrategyImpl.TransactionAborter aborter =
+                transactionalId -> {
+                    // getTransactionalProducer already calls 
initTransactions, which cancels the
+                    // transaction
+                    FlinkKafkaInternalProducer<byte[], byte[]> producer =
+                            
producerPool.getTransactionalProducer(transactionalId, 0);
+                    LOG.debug("Aborting transaction {}", transactionalId);
+                    producer.flush();
+                    short epoch = producer.getEpoch();
+                    producerPool.recycle(producer);
+                    return epoch;
+                };
+        return new TransactionAbortStrategyContextImpl(
+                kafkaSinkContext.getParallelInstanceId(),
+                kafkaSinkContext.getNumberOfParallelInstances(),
+                prefixesToAbort,
+                startCheckpointId,
+                aborter);
     }
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
index c05ff8d0..30002021 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
@@ -82,16 +82,19 @@ public class KafkaSink<IN>
     private final KafkaRecordSerializationSchema<IN> recordSerializer;
     private final Properties kafkaProducerConfig;
     private final String transactionalIdPrefix;
+    private final TransactionNamingStrategy transactionNamingStrategy;
 
     KafkaSink(
             DeliveryGuarantee deliveryGuarantee,
             Properties kafkaProducerConfig,
             String transactionalIdPrefix,
-            KafkaRecordSerializationSchema<IN> recordSerializer) {
+            KafkaRecordSerializationSchema<IN> recordSerializer,
+            TransactionNamingStrategy transactionNamingStrategy) {
         this.deliveryGuarantee = deliveryGuarantee;
         this.kafkaProducerConfig = kafkaProducerConfig;
         this.transactionalIdPrefix = transactionalIdPrefix;
         this.recordSerializer = recordSerializer;
+        this.transactionNamingStrategy = transactionNamingStrategy;
     }
 
     /**
@@ -141,6 +144,8 @@ public class KafkaSink<IN>
                             context,
                             recordSerializer,
                             
context.asSerializationSchemaInitializationContext(),
+                            transactionNamingStrategy.getAbortImpl(),
+                            transactionNamingStrategy.getImpl(),
                             recoveredState);
         } else {
             writer =
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
index ac033f17..80d67b9d 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
@@ -74,6 +74,7 @@ public class KafkaSinkBuilder<IN> {
 
     private final Properties kafkaProducerConfig;
     private KafkaRecordSerializationSchema<IN> recordSerializer;
+    private TransactionNamingStrategy transactionNamingStrategy = 
TransactionNamingStrategy.DEFAULT;
 
     KafkaSinkBuilder() {
         kafkaProducerConfig = new Properties();
@@ -125,6 +126,20 @@ public class KafkaSinkBuilder<IN> {
         return this;
     }
 
+    /**
+     * Sets the {@link TransactionNamingStrategy} that is used to name the 
transactions.
+     *
+     * <p>By default {@link TransactionNamingStrategy#DEFAULT} is used. It's 
recommended to change
+     * the strategy only if specific issues occur.
+     */
+    public KafkaSinkBuilder<IN> setTransactionNamingStrategy(
+            TransactionNamingStrategy transactionNamingStrategy) {
+        this.transactionNamingStrategy =
+                checkNotNull(
+                        transactionNamingStrategy, "transactionNamingStrategy 
must not be null");
+        return this;
+    }
+
     /**
      * Sets the {@link KafkaRecordSerializationSchema} that transforms 
incoming records to {@link
      * org.apache.kafka.clients.producer.ProducerRecord}s.
@@ -162,7 +177,7 @@ public class KafkaSinkBuilder<IN> {
         checkState(
                 transactionalIdPrefix.getBytes(StandardCharsets.UTF_8).length
                         <= MAXIMUM_PREFIX_BYTES,
-                "The configured prefix is too long and the resulting 
transactionalId might exceed Kafka's transactionalIds size.");
+                "The configured prefix is too long and the resulting 
transactionalIdPrefix might exceed Kafka's transactionalIdPrefix size.");
         return this;
     }
 
@@ -180,12 +195,12 @@ public class KafkaSinkBuilder<IN> {
         checkNotNull(
                 
kafkaProducerConfig.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
                 "bootstrapServers");
+        checkNotNull(recordSerializer, "recordSerializer");
         if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
             checkState(
                     transactionalIdPrefix != null,
-                    "EXACTLY_ONCE delivery guarantee requires a 
transactionIdPrefix to be set to provide unique transaction names across 
multiple KafkaSinks writing to the same Kafka cluster.");
+                    "EXACTLY_ONCE delivery guarantee requires a 
transactionalIdPrefix to be set to provide unique transaction names across 
multiple KafkaSinks writing to the same Kafka cluster.");
         }
-        checkNotNull(recordSerializer, "recordSerializer");
     }
 
     /**
@@ -196,6 +211,10 @@ public class KafkaSinkBuilder<IN> {
     public KafkaSink<IN> build() {
         sanityCheck();
         return new KafkaSink<>(
-                deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, 
recordSerializer);
+                deliveryGuarantee,
+                kafkaProducerConfig,
+                transactionalIdPrefix,
+                recordSerializer,
+                transactionNamingStrategy);
     }
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java
deleted file mode 100644
index 353cfea4..00000000
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.kafka.sink;
-
-import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
-import org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.Closeable;
-import java.util.List;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Aborts lingering transactions on restart.
- *
- * <p>Transactions are lingering if they are not tracked anywhere. For 
example, if a job is started
- * transactions are opened. A restart without checkpoint would not allow Flink 
to abort old
- * transactions. Since Kafka's transactions are sequential, newly produced 
data does not become
- * visible for read_committed consumers. However, Kafka has no API for 
querying open transactions,
- * so they become lingering.
- *
- * <p>Flink solves this by assuming consecutive transaction ids. On restart of 
checkpoint C on
- * subtask S, it will sequentially cancel transaction C+1, C+2, ... of S until 
it finds the first
- * unused transaction.
- *
- * <p>Additionally, to cover for weird downscaling cases without checkpoints, 
it also checks for
- * transactions of subtask S+P where P is the current parallelism until it 
finds a subtask without
- * transactions.
- */
-class TransactionAborter implements Closeable {
-    private final int subtaskId;
-    private final int parallelism;
-    private final Function<String, FlinkKafkaInternalProducer<byte[], byte[]>> 
producerFactory;
-    private final Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> 
recycler;
-    @Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
-
-    public TransactionAborter(
-            int subtaskId,
-            int parallelism,
-            Function<String, FlinkKafkaInternalProducer<byte[], byte[]>> 
producerFactory,
-            Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> recycler) {
-        this.subtaskId = subtaskId;
-        this.parallelism = parallelism;
-        this.producerFactory = checkNotNull(producerFactory);
-        this.recycler = recycler;
-    }
-
-    void abortLingeringTransactions(List<String> prefixesToAbort, long 
startCheckpointId) {
-        for (String prefix : prefixesToAbort) {
-            abortTransactionsWithPrefix(prefix, startCheckpointId);
-        }
-    }
-
-    /**
-     * Aborts all transactions that have been created by this subtask in a 
previous run.
-     *
-     * <p>It also aborts transactions from subtasks that may have been removed 
because of
-     * downscaling.
-     *
-     * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is 
responsible for cleaning
-     * all subtasks j in [0; X), where j % Y = i. For example, if we downscale 
to 2, then subtask 0
-     * is responsible for all even and subtask 1 for all odd subtasks.
-     */
-    private void abortTransactionsWithPrefix(String prefix, long 
startCheckpointId) {
-        for (int subtaskId = this.subtaskId; ; subtaskId += parallelism) {
-            if (abortTransactionOfSubtask(prefix, startCheckpointId, 
subtaskId) == 0) {
-                // If Flink didn't abort any transaction for current subtask, 
then we assume that no
-                // such subtask existed and no subtask with a higher number as 
well.
-                break;
-            }
-        }
-    }
-
-    /**
-     * Aborts all transactions that have been created by a subtask in a 
previous run after the given
-     * checkpoint id.
-     *
-     * <p>We assume that transaction ids are consecutively used and thus Flink 
can stop aborting as
-     * soon as Flink notices that a particular transaction id was unused.
-     */
-    private int abortTransactionOfSubtask(String prefix, long 
startCheckpointId, int subtaskId) {
-        int numTransactionAborted = 0;
-        for (long checkpointId = startCheckpointId; ; checkpointId++, 
numTransactionAborted++) {
-            // initTransactions fences all old transactions with the same id 
by bumping the epoch
-            String transactionalId =
-                    TransactionalIdFactory.buildTransactionalId(prefix, 
subtaskId, checkpointId);
-            producer = producerFactory.apply(transactionalId);
-            producer.flush();
-            // An epoch of 0 indicates that the id was unused before
-            short epoch = producer.getEpoch();
-            recycler.accept(producer);
-            if (epoch == 0) {
-                // Note that the check works beyond transaction log timeouts 
and just depends on the
-                // retention of the transaction topic (typically 7d). Any 
transaction that is not in
-                // the that topic anymore is also not lingering (i.e., it will 
not block downstream
-                // from reading)
-                // This method will only cease to work if transaction log 
timeout = topic retention
-                // and a user didn't restart the application for that period 
of time. Then the first
-                // transactions would vanish from the topic while later 
transactions are still
-                // lingering until they are cleaned up by Kafka. Then the user 
has to wait until the
-                // other transactions are timed out (which shouldn't take too 
long).
-                break;
-            }
-        }
-        return numTransactionAborted;
-    }
-
-    public void close() {}
-}
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionNamingStrategy.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionNamingStrategy.java
new file mode 100644
index 00000000..6c09bc92
--- /dev/null
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionNamingStrategy.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import 
org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyImpl;
+import 
org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyImpl;
+
+/**
+ * The strategy to name transactions. Naming strategy has implications on the 
resource consumption
+ * on the broker because each unique transaction name requires the broker to 
keep some metadata in
+ * memory for 7 days.
+ *
+ * <p>All naming strategies use the format {@code 
transactionalIdPrefix-subtask-offset} where offset
+ * is calculated differently.
+ */
+@PublicEvolving
+public enum TransactionNamingStrategy {
+    /**
+     * The offset of the transaction name is a monotonically increasing number 
that mostly
+     * corresponds to the checkpoint id. This strategy is wasteful in terms of 
resource consumption
+     * on the broker.
+     *
+     * <p>This is exactly the same behavior as in flink-connector-kafka 3.X.
+     */
+    INCREMENTING(TransactionNamingStrategyImpl.INCREMENTING, 
TransactionAbortStrategyImpl.PROBING);
+
+    /**
+     * The default transaction naming strategy. Currently set to {@link 
#INCREMENTING}, which is the
+     * same behavior of flink-connector-kafka 3.X.
+     */
+    public static final TransactionNamingStrategy DEFAULT = INCREMENTING;
+
+    /**
+     * The backing implementation of the transaction naming strategy. 
Separation allows to avoid
+     * leaks of internal classes in signatures.
+     */
+    private final TransactionNamingStrategyImpl impl;
+    /**
+     * The set of supported abort strategies for this naming strategy. Some 
naming strategies may
+     * not support all abort strategies.
+     */
+    private final TransactionAbortStrategyImpl abortImpl;
+
+    TransactionNamingStrategy(
+            TransactionNamingStrategyImpl impl, TransactionAbortStrategyImpl 
abortImpl) {
+        this.impl = impl;
+        this.abortImpl = abortImpl;
+    }
+
+    @Internal
+    TransactionAbortStrategyImpl getAbortImpl() {
+        return abortImpl;
+    }
+
+    @Internal
+    TransactionNamingStrategyImpl getImpl() {
+        return impl;
+    }
+}
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/Backchannel.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/Backchannel.java
index 244b7668..af7b9a71 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/Backchannel.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/Backchannel.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.kafka.sink.internal;
 
+import org.apache.flink.annotation.Internal;
+
 import java.io.Closeable;
 
 /**
@@ -34,6 +36,7 @@ import java.io.Closeable;
  * both instances will run inside the same JVM and we can establish a 
backchannel between them. The
  * latter case requires some synchronization in the buffer.
  */
+@Internal
 public interface Backchannel extends Closeable {
     /** Check if the backchannel is fully established. */
     boolean isEstablished();
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java
index 6f52a9db..ee5b823d 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.kafka.sink.internal;
 
+import org.apache.flink.annotation.Internal;
+
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -44,6 +46,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * A {@link KafkaProducer} that exposes private fields to allow resume 
producing from a given state.
  */
+@Internal
 public class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaInternalProducer.class);
     private static final String TRANSACTION_MANAGER_FIELD_NAME = 
"transactionManager";
@@ -58,13 +61,13 @@ public class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
 
     public FlinkKafkaInternalProducer(Properties properties) {
         super(properties);
-        LOG.debug("Created non-transactional {}", this);
+        LOG.info("Created non-transactional {}", this);
     }
 
     public FlinkKafkaInternalProducer(Properties properties, String 
transactionalId) {
         super(withTransactionalId(properties, transactionalId));
         this.transactionalId = transactionalId;
-        LOG.debug("Created transactional {}", this);
+        LOG.info("Created transactional {}", this);
     }
 
     private static Properties withTransactionalId(Properties properties, 
String transactionalId) {
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java
index 0a51ff09..d47cca28 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.connector.kafka.sink.internal;
 
+import org.apache.flink.annotation.Internal;
+
 /** A pool of producers that can be recycled. */
+@Internal
 public interface ProducerPool extends AutoCloseable {
     /**
      * Notify the pool that a transaction has finished. The producer with the 
given transactional id
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java
index cccb6bb0..38a159ed 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.kafka.sink.internal;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 
 import org.slf4j.Logger;
@@ -68,6 +69,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * it.
  */
 @NotThreadSafe
+@Internal
 public class ProducerPoolImpl implements ProducerPool {
     private static final Logger LOG = 
LoggerFactory.getLogger(ProducerPoolImpl.class);
 
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ReadableBackchannel.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ReadableBackchannel.java
index 0f2cbf17..69d205ad 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ReadableBackchannel.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ReadableBackchannel.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.kafka.sink.internal;
 
+import org.apache.flink.annotation.Internal;
+
 import javax.annotation.Nullable;
 
 /**
@@ -25,6 +27,7 @@ import javax.annotation.Nullable;
  * to signal that certain transactions have been committed and respective 
producers are good to be
  * reused.
  */
+@Internal
 public interface ReadableBackchannel<T> extends Backchannel {
     /**
      * Poll the next message from the backchannel. This method is non-blocking 
and returns {@code
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java
new file mode 100644
index 00000000..80f6386e
--- /dev/null
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyContextImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyImpl.TransactionAborter;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Implementation of {@link TransactionAbortStrategyImpl.Context}. */
+@Internal
+public class TransactionAbortStrategyContextImpl implements 
TransactionAbortStrategyImpl.Context {
+    private final int currentSubtaskId;
+    private final int currentParallelism;
+    private final Set<String> prefixesToAbort;
+    private final long startCheckpointId;
+    private final TransactionAborter transactionAborter;
+
+    /** Creates a new {@link TransactionAbortStrategyContextImpl}. */
+    public TransactionAbortStrategyContextImpl(
+            int currentSubtaskId,
+            int currentParallelism,
+            List<String> prefixesToAbort,
+            long startCheckpointId,
+            TransactionAborter transactionAborter) {
+        this.currentSubtaskId = currentSubtaskId;
+        this.currentParallelism = currentParallelism;
+        this.prefixesToAbort = Set.copyOf(prefixesToAbort);
+        this.startCheckpointId = startCheckpointId;
+        this.transactionAborter =
+                checkNotNull(transactionAborter, "transactionAborter must not 
be null");
+    }
+
+    @Override
+    public int getCurrentSubtaskId() {
+        return currentSubtaskId;
+    }
+
+    @Override
+    public int getCurrentParallelism() {
+        return currentParallelism;
+    }
+
+    @Override
+    public Set<String> getPrefixesToAbort() {
+        return prefixesToAbort;
+    }
+
+    @Override
+    public long getStartCheckpointId() {
+        return startCheckpointId;
+    }
+
+    public TransactionAborter getTransactionAborter() {
+        return transactionAborter;
+    }
+}
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java
new file mode 100644
index 00000000..6ea69586
--- /dev/null
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+/** Implementations of an abort strategy for transactions left over from 
previous runs. */
+@Internal
+public enum TransactionAbortStrategyImpl {
+    /**
+     * The probing strategy starts with aborting a set of known transactional 
ids from the recovered
+     * state and then continues guessing if more transactions may have been 
opened between this run
+     * and the last successful checkpoint. This also accounts for rescaling in 
between the attempts.
+     *
+     * <p>However, the probing is not side-effect free, which leads to an 
ever-increasing search
+     * space for the next probing attempt in case of a restart loop. It will 
fix eventually on the
+     * next successful checkpoints. It's recommended to use this strategy only 
with a strict restart
+     * policy that prevents tight restart loops (e.g. incremental backoff or 
hard failure after X
+     * attempts).
+     *
+     * <p>This is exactly the same behavior as in flink-connector-kafka 3.X.
+     */
+    PROBING {
+        @Override
+        public void abortTransactions(Context context) {
+            for (String prefix : context.getPrefixesToAbort()) {
+                abortTransactionsWithPrefix(prefix, context);
+            }
+        }
+
+        /**
+         * Aborts all transactions that have been created by this subtask in a 
previous run.
+         *
+         * <p>It also aborts transactions from subtasks that may have been 
removed because of
+         * downscaling.
+         *
+         * <p>When Flink downscales X subtasks to Y subtasks, then subtask i 
is responsible for
+         * cleaning all subtasks j in [0; X), where j % Y = i. For example, if 
we downscale to 2,
+         * then subtask 0 is responsible for all even and subtask 1 for all 
odd subtasks.
+         */
+        private void abortTransactionsWithPrefix(String prefix, Context 
context) {
+            for (int subtaskId = context.getCurrentSubtaskId();
+                    ;
+                    subtaskId += context.getCurrentParallelism()) {
+                if (abortTransactionOfSubtask(prefix, subtaskId, context) == 
0) {
+                    // If Flink didn't abort any transaction for current 
subtask, then we assume
+                    // that no
+                    // such subtask existed and no subtask with a higher 
number as well.
+                    break;
+                }
+            }
+        }
+
+        /**
+         * Aborts all transactions that have been created by a subtask in a 
previous run after the
+         * given checkpoint id.
+         *
+         * <p>We assume that transaction ids are consecutively used and thus 
Flink can stop aborting
+         * as soon as Flink notices that a particular transaction id was 
unused.
+         */
+        private int abortTransactionOfSubtask(String prefix, int subtaskId, 
Context context) {
+            int numTransactionAborted = 0;
+            for (long checkpointId = context.getStartCheckpointId();
+                    ;
+                    checkpointId++, numTransactionAborted++) {
+                // initTransactions fences all old transactions with the same 
id by bumping the
+                // epoch
+                String transactionalId =
+                        TransactionalIdFactory.buildTransactionalId(
+                                prefix, subtaskId, checkpointId);
+                int epoch = 
context.getTransactionAborter().abortTransaction(transactionalId);
+                // An epoch of 0 indicates that the id was unused before
+                if (epoch == 0) {
+                    // Note that the check works beyond transaction log 
timeouts and just depends on
+                    // the
+                    // retention of the transaction topic (typically 7d). Any 
transaction that is
+                    // not in
+                    // the that topic anymore is also not lingering (i.e., it 
will not block
+                    // downstream
+                    // from reading)
+                    // This method will only cease to work if transaction log 
timeout = topic
+                    // retention
+                    // and a user didn't restart the application for that 
period of time. Then the
+                    // first
+                    // transactions would vanish from the topic while later 
transactions are still
+                    // lingering until they are cleaned up by Kafka. Then the 
user has to wait until
+                    // the
+                    // other transactions are timed out (which shouldn't take 
too long).
+                    break;
+                }
+            }
+            return numTransactionAborted;
+        }
+    };
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TransactionAbortStrategyImpl.class);
+
+    /** Aborts all transactions that have been created by this subtask in a 
previous run. */
+    public abstract void abortTransactions(Context context);
+
+    /** Injects the actual abortion of the transactional id generated by one 
of the strategies. */
+    public interface TransactionAborter {
+        int abortTransaction(String transactionalId);
+    }
+
+    /** Context for the {@link TransactionAbortStrategyImpl}. */
+    public interface Context {
+        int getCurrentSubtaskId();
+
+        int getCurrentParallelism();
+
+        Set<String> getPrefixesToAbort();
+
+        long getStartCheckpointId();
+
+        TransactionAborter getTransactionAborter();
+    }
+}
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyContextImpl.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyContextImpl.java
new file mode 100644
index 00000000..2ac26c87
--- /dev/null
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyContextImpl.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Implementation of {@link TransactionNamingStrategyImpl.Context}. */
+@Internal
+public class TransactionNamingStrategyContextImpl implements 
TransactionNamingStrategyImpl.Context {
+    private final String transactionalIdPrefix;
+    private final int subtaskId;
+    private final ProducerPool producerPool;
+    private long lastCheckpointId;
+    private long nextCheckpointId;
+
+    /** Creates a new {@link TransactionNamingStrategyContextImpl}. */
+    public TransactionNamingStrategyContextImpl(
+            String transactionalIdPrefix,
+            int subtaskId,
+            long lastCheckpointId,
+            ProducerPool producerPool) {
+        this.transactionalIdPrefix =
+                checkNotNull(transactionalIdPrefix, "transactionalIdPrefix 
must not be null");
+        this.subtaskId = subtaskId;
+        this.producerPool = checkNotNull(producerPool, "producerPool must not 
be null");
+        this.lastCheckpointId = lastCheckpointId;
+    }
+
+    @Override
+    public String buildTransactionalId(long offset) {
+        return TransactionalIdFactory.buildTransactionalId(
+                transactionalIdPrefix, subtaskId, offset);
+    }
+
+    @Override
+    public long getNextCheckpointId() {
+        return nextCheckpointId;
+    }
+
+    public void setNextCheckpointId(long nextCheckpointId) {
+        this.nextCheckpointId = nextCheckpointId;
+    }
+
+    public void setLastCheckpointId(long lastCheckpointId) {
+        this.lastCheckpointId = lastCheckpointId;
+    }
+
+    @Override
+    public long getLastCheckpointId() {
+        return lastCheckpointId;
+    }
+
+    @Override
+    public FlinkKafkaInternalProducer<byte[], byte[]> getProducer(String 
transactionalId) {
+        return producerPool.getTransactionalProducer(transactionalId, 
nextCheckpointId);
+    }
+
+    @Override
+    public void recycle(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
+        producerPool.recycle(producer);
+    }
+}
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java
new file mode 100644
index 00000000..f1be95a0
--- /dev/null
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Implementation of {@link TransactionNamingStrategy}. */
+@Internal
+public enum TransactionNamingStrategyImpl {
+    INCREMENTING {
+        /**
+         * For each checkpoint we create new {@link 
FlinkKafkaInternalProducer} so that new
+         * transactions will not clash with transactions created during 
previous checkpoints ({@code
+         * producer.initTransactions()} assures that we obtain new producerId 
and epoch counters).
+         *
+         * <p>Ensures that all transaction ids in between lastCheckpointId and 
checkpointId are
+         * initialized.
+         */
+        @Override
+        public FlinkKafkaInternalProducer<byte[], byte[]> 
getTransactionalProducer(
+                Context context) {
+            long expectedCheckpointId = context.getNextCheckpointId();
+            long lastCheckpointId = context.getLastCheckpointId();
+            checkState(
+                    expectedCheckpointId > lastCheckpointId,
+                    "Expected %s > %s",
+                    expectedCheckpointId,
+                    lastCheckpointId);
+            // in case checkpoints have been aborted, Flink would create 
non-consecutive transaction
+            // ids
+            // this loop ensures that all gaps are filled with initialized 
(empty) transactions
+            for (long checkpointId = lastCheckpointId + 1;
+                    checkpointId < expectedCheckpointId;
+                    checkpointId++) {
+                
context.recycle(context.getProducer(context.buildTransactionalId(checkpointId)));
+            }
+            return 
context.getProducer(context.buildTransactionalId(expectedCheckpointId));
+        }
+    };
+
+    /**
+     * Returns a {@link FlinkKafkaInternalProducer} that will not clash with 
any ongoing
+     * transactions.
+     */
+    public abstract FlinkKafkaInternalProducer<byte[], byte[]> 
getTransactionalProducer(
+            Context context);
+
+    /** Context for the transaction naming strategy. */
+    public interface Context {
+        String buildTransactionalId(long offset);
+
+        long getNextCheckpointId();
+
+        long getLastCheckpointId();
+
+        FlinkKafkaInternalProducer<byte[], byte[]> getProducer(String 
transactionalId);
+
+        void recycle(FlinkKafkaInternalProducer<byte[], byte[]> producer);
+    }
+}
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/WritableBackchannel.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/WritableBackchannel.java
index 97a0b2c8..5311c804 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/WritableBackchannel.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/WritableBackchannel.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.kafka.sink.internal;
 
+import org.apache.flink.annotation.Internal;
+
 /**
  * The writable portion of a {@link Backchannel} for communication between the 
commiter -> writer.
  * It's used to signal that certain transactions have been committed and 
respective producers are
@@ -26,6 +28,7 @@ package org.apache.flink.connector.kafka.sink.internal;
  * <p>Messages can be sent before the backchannel is established. They will be 
consumed once the
  * backchannel is established.
  */
+@Internal
 public interface WritableBackchannel<T> extends Backchannel {
     /** Send a message to the backchannel. */
     void send(T message);
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
index 9e26cd4e..b2591f5d 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java
@@ -89,7 +89,7 @@ public class KafkaSinkBuilderTest extends TestLogger {
     }
 
     @Test
-    void testTransactionIdSanityCheck() {
+    void testTransactionalIdSanityCheck() {
         assertThatThrownBy(
                         () ->
                                 getBasicBuilder()
@@ -97,7 +97,7 @@ public class KafkaSinkBuilderTest extends TestLogger {
                                         .build())
                 .isExactlyInstanceOf(IllegalStateException.class)
                 .hasMessageContaining(
-                        "EXACTLY_ONCE delivery guarantee requires a 
transactionIdPrefix to be set to provide unique transaction names across 
multiple KafkaSinks writing to the same Kafka cluster.");
+                        "EXACTLY_ONCE delivery guarantee requires a 
transactionalIdPrefix to be set to provide unique transaction names across 
multiple KafkaSinks writing to the same Kafka cluster.");
     }
 
     private void validateProducerConfig(
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
index 84c1e429..8eca6592 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkTest.java
@@ -43,7 +43,11 @@ public class KafkaSinkTest {
                 new 
KafkaRecordSerializationSchemaWithoutKafkaDatasetProvider();
         KafkaSink<Object> sink =
                 new KafkaSink<>(
-                        DeliveryGuarantee.EXACTLY_ONCE, new Properties(), "", 
recordSerializer);
+                        DeliveryGuarantee.EXACTLY_ONCE,
+                        new Properties(),
+                        "",
+                        recordSerializer,
+                        TransactionNamingStrategy.DEFAULT);
 
         assertThat(sink.getLineageVertex().datasets()).isEmpty();
     }
@@ -55,7 +59,11 @@ public class KafkaSinkTest {
 
         KafkaSink<Object> sink =
                 new KafkaSink<>(
-                        DeliveryGuarantee.EXACTLY_ONCE, new Properties(), "", 
recordSerializer);
+                        DeliveryGuarantee.EXACTLY_ONCE,
+                        new Properties(),
+                        "",
+                        recordSerializer,
+                        TransactionNamingStrategy.DEFAULT);
 
         assertThat(sink.getLineageVertex().datasets()).isEmpty();
     }
@@ -67,7 +75,11 @@ public class KafkaSinkTest {
 
         KafkaSink<Object> sink =
                 new KafkaSink<>(
-                        DeliveryGuarantee.EXACTLY_ONCE, kafkaProperties, "", 
recordSerializer);
+                        DeliveryGuarantee.EXACTLY_ONCE,
+                        kafkaProperties,
+                        "",
+                        recordSerializer,
+                        TransactionNamingStrategy.DEFAULT);
 
         LineageVertex lineageVertex = sink.getLineageVertex();
 
@@ -99,11 +111,12 @@ public class KafkaSinkTest {
     public void testCoLocation() {
         String colocationKey = "testCoLocation";
         KafkaSink<Object> sink =
-                new KafkaSink<>(
-                        DeliveryGuarantee.EXACTLY_ONCE,
-                        kafkaProperties,
-                        colocationKey,
-                        new TestingKafkaRecordSerializationSchema());
+                KafkaSink.builder()
+                        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
+                        .setKafkaProducerConfig(kafkaProperties)
+                        .setTransactionalIdPrefix(colocationKey)
+                        .setRecordSerializer(new 
TestingKafkaRecordSerializationSchema())
+                        .build();
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -122,11 +135,12 @@ public class KafkaSinkTest {
         String colocationKey = "testPreserveCustomCoLocation";
         String customColocationKey = "customCoLocation";
         KafkaSink<Object> sink =
-                new KafkaSink<>(
-                        DeliveryGuarantee.EXACTLY_ONCE,
-                        kafkaProperties,
-                        colocationKey,
-                        new TestingKafkaRecordSerializationSchema());
+                KafkaSink.builder()
+                        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
+                        .setKafkaProducerConfig(kafkaProperties)
+                        .setTransactionalIdPrefix(colocationKey)
+                        .setRecordSerializer(new 
TestingKafkaRecordSerializationSchema())
+                        .build();
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 

Reply via email to