This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 2d4f402f303a80b75f057e4cc932aae495a5e259
Author: Arvid Heise <ar...@apache.org>
AuthorDate: Sat Feb 8 18:42:31 2025 +0100

    [FLINK-34554] Adding pooling name strategy
    
    Reduce the load on Kafka broker by reusing transaction ids as soon as they 
have been committed.
---
 .../flink/tests/util/kafka/KafkaSinkE2ECase.java   |  25 +-
 .../kafka/sink/ExactlyOnceKafkaWriter.java         |   4 +
 .../flink/connector/kafka/sink/KafkaCommitter.java |  51 +++-
 .../flink/connector/kafka/sink/KafkaSink.java      |   1 +
 .../connector/kafka/sink/KafkaSinkBuilder.java     |   7 +
 .../kafka/sink/TransactionNamingStrategy.java      |  18 +-
 .../TransactionNamingStrategyContextImpl.java      |  13 +
 .../internal/TransactionNamingStrategyImpl.java    |  36 ++-
 .../sink/internal/TransactionalIdFactory.java      |   4 +-
 .../kafka/sink/ExactlyOnceKafkaWriterITCase.java   |  92 ++++++
 .../connector/kafka/sink/KafkaCommitterTest.java   |  15 +-
 .../connector/kafka/sink/KafkaSinkITCase.java      | 330 +++++++++++++++++----
 .../connector/kafka/sink/KafkaWriterTestBase.java  |  33 ++-
 .../sink/internal/ProducerPoolImplITCase.java      |   2 +-
 .../internal/TransactionAbortStrategyImplTest.java | 235 +++++++++++++++
 .../sink/internal/TransactionOwnershipTest.java    | 165 +++++++++++
 .../sink/testutils/KafkaSinkExternalContext.java   |  12 +-
 .../testutils/KafkaSinkExternalContextFactory.java |  11 +-
 18 files changed, 950 insertions(+), 104 deletions(-)

diff --git 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
index e18c035b..ea9a0079 100644
--- 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
+++ 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.tests.util.kafka;
 
+import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
 import 
org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
 import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
 import 
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
@@ -62,7 +63,7 @@ public class KafkaSinkE2ECase extends 
SinkTestSuiteBase<String> {
     // Defines 2 External context Factories, so test cases will be invoked 
twice using these two
     // kinds of external contexts.
     @TestContext
-    KafkaSinkExternalContextFactory contextFactory =
+    KafkaSinkExternalContextFactory incrementing =
             new KafkaSinkExternalContextFactory(
                     kafka.getContainer(),
                     Arrays.asList(
@@ -77,7 +78,27 @@ public class KafkaSinkE2ECase extends 
SinkTestSuiteBase<String> {
                             
ResourceTestUtils.getResource("flink-connector-testing.jar")
                                     .toAbsolutePath()
                                     .toUri()
-                                    .toURL()));
+                                    .toURL()),
+                    TransactionNamingStrategy.INCREMENTING);
+
+    @TestContext
+    KafkaSinkExternalContextFactory pooling =
+            new KafkaSinkExternalContextFactory(
+                    kafka.getContainer(),
+                    Arrays.asList(
+                            
ResourceTestUtils.getResource("kafka-connector.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            ResourceTestUtils.getResource("kafka-clients.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL(),
+                            
ResourceTestUtils.getResource("flink-connector-testing.jar")
+                                    .toAbsolutePath()
+                                    .toUri()
+                                    .toURL()),
+                    TransactionNamingStrategy.POOLING);
 
     public KafkaSinkE2ECase() throws Exception {}
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
index e75436d8..c7aee3c4 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java
@@ -200,6 +200,10 @@ class ExactlyOnceKafkaWriter<IN> extends KafkaWriter<IN> {
 
     private FlinkKafkaInternalProducer<byte[], byte[]> startTransaction(long 
checkpointId) {
         namingContext.setNextCheckpointId(checkpointId);
+        namingContext.setOngoingTransactions(
+                producerPool.getOngoingTransactions().stream()
+                        .map(CheckpointTransaction::getTransactionalId)
+                        .collect(Collectors.toSet()));
         FlinkKafkaInternalProducer<byte[], byte[]> producer =
                 
transactionNamingStrategy.getTransactionalProducer(namingContext);
         namingContext.setLastCheckpointId(checkpointId);
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
index 70d67150..91e724e2 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
@@ -56,6 +56,7 @@ class KafkaCommitter implements Committer<KafkaCommittable>, 
Closeable {
                     + "To avoid data loss, the application will restart.";
 
     private final Properties kafkaProducerConfig;
+    private final boolean reusesTransactionalIds;
     private final BiFunction<Properties, String, FlinkKafkaInternalProducer<?, 
?>> producerFactory;
     private final WritableBackchannel<TransactionFinished> backchannel;
     @Nullable private FlinkKafkaInternalProducer<?, ?> committingProducer;
@@ -65,8 +66,10 @@ class KafkaCommitter implements Committer<KafkaCommittable>, 
Closeable {
             String transactionalIdPrefix,
             int subtaskId,
             int attemptNumber,
+            boolean reusesTransactionalIds,
             BiFunction<Properties, String, FlinkKafkaInternalProducer<?, ?>> 
producerFactory) {
         this.kafkaProducerConfig = kafkaProducerConfig;
+        this.reusesTransactionalIds = reusesTransactionalIds;
         this.producerFactory = producerFactory;
         backchannel =
                 BackchannelFactory.getInstance()
@@ -102,19 +105,7 @@ class KafkaCommitter implements 
Committer<KafkaCommittable>, Closeable {
                         "Encountered retriable exception while committing 
{}.", transactionalId, e);
                 request.retryLater();
             } catch (ProducerFencedException e) {
-                // initTransaction has been called on this transaction before
-                LOG.error(
-                        "Unable to commit transaction ({}) because its 
producer is already fenced."
-                                + " This means that you either have a 
different producer with the same '{}' (this is"
-                                + " unlikely with the '{}' as all generated 
ids are unique and shouldn't be reused)"
-                                + " or recovery took longer than '{}' ({}ms). 
In both cases this most likely signals data loss,"
-                                + " please consult the Flink documentation for 
more details.",
-                        request,
-                        ProducerConfig.TRANSACTIONAL_ID_CONFIG,
-                        KafkaSink.class.getSimpleName(),
-                        ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
-                        
kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
-                        e);
+                logFencedRequest(request, e);
                 handleFailedTransaction(producer);
                 request.signalFailedWithKnownReason(e);
             } catch (InvalidTxnStateException e) {
@@ -146,6 +137,40 @@ class KafkaCommitter implements 
Committer<KafkaCommittable>, Closeable {
         }
     }
 
+    private void logFencedRequest(
+            CommitRequest<KafkaCommittable> request, ProducerFencedException 
e) {
+        if (reusesTransactionalIds) {
+            // If checkpoint 1 succeeds, checkpoint 2 is aborted, and 
checkpoint 3 may reuse the id
+            // of checkpoint 1. A recovery of checkpoint 1 would show that the 
transaction has been
+            // fenced.
+            LOG.warn(
+                    "Unable to commit transaction ({}) because its producer is 
already fenced."
+                            + " If this warning appears as part of the 
recovery of a checkpoint, it is expected in some cases (e.g., aborted 
checkpoints in previous attempt)."
+                            + " If it's outside of recovery, this means that 
you either have a different sink with the same '{}'"
+                            + " or recovery took longer than '{}' ({}ms). In 
both cases this most likely signals data loss,"
+                            + " please consult the Flink documentation for 
more details.",
+                    request,
+                    ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+                    ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
+                    
kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
+                    e);
+        } else {
+            // initTransaction has been called on this transaction before
+            LOG.error(
+                    "Unable to commit transaction ({}) because its producer is 
already fenced."
+                            + " This means that you either have a different 
producer with the same '{}' (this is"
+                            + " unlikely with the '{}' as all generated ids 
are unique and shouldn't be reused)"
+                            + " or recovery took longer than '{}' ({}ms). In 
both cases this most likely signals data loss,"
+                            + " please consult the Flink documentation for 
more details.",
+                    request,
+                    ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+                    KafkaSink.class.getSimpleName(),
+                    ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
+                    
kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
+                    e);
+        }
+    }
+
     private void handleFailedTransaction(FlinkKafkaInternalProducer<?, ?> 
producer) {
         if (producer == null) {
             return;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
index 30002021..77ddc87d 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java
@@ -115,6 +115,7 @@ public class KafkaSink<IN>
                 transactionalIdPrefix,
                 context.getTaskInfo().getIndexOfThisSubtask(),
                 context.getTaskInfo().getAttemptNumber(),
+                transactionNamingStrategy == TransactionNamingStrategy.POOLING,
                 FlinkKafkaInternalProducer::new);
     }
 
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
index 80d67b9d..f4190421 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -200,6 +201,12 @@ public class KafkaSinkBuilder<IN> {
             checkState(
                     transactionalIdPrefix != null,
                     "EXACTLY_ONCE delivery guarantee requires a 
transactionalIdPrefix to be set to provide unique transaction names across 
multiple KafkaSinks writing to the same Kafka cluster.");
+            if (transactionNamingStrategy.getImpl().requiresKnownTopics()) {
+                checkState(
+                        recordSerializer instanceof KafkaDatasetFacetProvider,
+                        "For %s naming strategy, the recordSerializer needs to 
expose the target topics though implementing KafkaDatasetFacetProvider.",
+                        transactionNamingStrategy);
+            }
         }
     }
 
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionNamingStrategy.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionNamingStrategy.java
index 6c09bc92..6ebfb1fe 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionNamingStrategy.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionNamingStrategy.java
@@ -39,8 +39,24 @@ public enum TransactionNamingStrategy {
      * on the broker.
      *
      * <p>This is exactly the same behavior as in flink-connector-kafka 3.X.
+     *
+     * <p>Switching to this strategy from {@link #POOLING} is not supported.
+     */
+    INCREMENTING(TransactionNamingStrategyImpl.INCREMENTING, 
TransactionAbortStrategyImpl.PROBING),
+
+    /**
+     * This strategy reuses transaction names. It is more resource-friendly 
than {@link
+     * #INCREMENTING} on the Kafka broker.
+     *
+     * <p>It's a new strategy introduced in flink-connector-kafka 4.X. It 
requires Kafka 3.0+ and
+     * additional read permissions on the target topics.
+     *
+     * <p>The recommended way to switch to this strategy is to first take a 
checkpoint with
+     * flink-connector-kafka 4.X and then switch to this strategy. This will 
ensure that no
+     * transactions are left open from the previous run. Alternatively, you 
can use a savepoint from
+     * any version.
      */
-    INCREMENTING(TransactionNamingStrategyImpl.INCREMENTING, 
TransactionAbortStrategyImpl.PROBING);
+    POOLING(TransactionNamingStrategyImpl.POOLING, 
TransactionAbortStrategyImpl.LISTING);
 
     /**
      * The default transaction naming strategy. Currently set to {@link 
#INCREMENTING}, which is the
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyContextImpl.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyContextImpl.java
index 2ac26c87..29103327 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyContextImpl.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyContextImpl.java
@@ -20,6 +20,9 @@ package org.apache.flink.connector.kafka.sink.internal;
 
 import org.apache.flink.annotation.Internal;
 
+import java.util.Collection;
+import java.util.Set;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Implementation of {@link TransactionNamingStrategyImpl.Context}. */
@@ -27,6 +30,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class TransactionNamingStrategyContextImpl implements 
TransactionNamingStrategyImpl.Context {
     private final String transactionalIdPrefix;
     private final int subtaskId;
+    private Set<String> ongoingTransactions;
     private final ProducerPool producerPool;
     private long lastCheckpointId;
     private long nextCheckpointId;
@@ -63,6 +67,15 @@ public class TransactionNamingStrategyContextImpl implements 
TransactionNamingSt
         this.lastCheckpointId = lastCheckpointId;
     }
 
+    @Override
+    public Set<String> getOngoingTransactions() {
+        return ongoingTransactions;
+    }
+
+    public void setOngoingTransactions(Collection<String> ongoingTransactions) 
{
+        this.ongoingTransactions = Set.copyOf(ongoingTransactions);
+    }
+
     @Override
     public long getLastCheckpointId() {
         return lastCheckpointId;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java
index 278ee9d9..a85f3074 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java
@@ -21,12 +21,14 @@ package org.apache.flink.connector.kafka.sink.internal;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
 
+import java.util.Set;
+
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** Implementation of {@link TransactionNamingStrategy}. */
 @Internal
 public enum TransactionNamingStrategyImpl {
-    INCREMENTING(TransactionOwnership.IMPLICIT_BY_SUBTASK_ID) {
+    INCREMENTING(TransactionOwnership.IMPLICIT_BY_SUBTASK_ID, false) {
         /**
          * For each checkpoint we create new {@link 
FlinkKafkaInternalProducer} so that new
          * transactions will not clash with transactions created during 
previous checkpoints ({@code
@@ -55,12 +57,36 @@ public enum TransactionNamingStrategyImpl {
             }
             return 
context.getProducer(context.buildTransactionalId(expectedCheckpointId));
         }
+    },
+    POOLING(TransactionOwnership.EXPLICIT_BY_WRITER_STATE, true) {
+        @Override
+        public FlinkKafkaInternalProducer<byte[], byte[]> 
getTransactionalProducer(
+                Context context) {
+            Set<String> usedTransactionalIds = 
context.getOngoingTransactions();
+            for (int offset = 0; ; offset++) {
+                String transactionalIdCandidate = 
context.buildTransactionalId(offset);
+                if (usedTransactionalIds.contains(transactionalIdCandidate)) {
+                    continue;
+                }
+                return context.getProducer(transactionalIdCandidate);
+            }
+        }
     };
 
     private final TransactionOwnership ownership;
+    private final boolean requiresKnownTopics;
 
-    TransactionNamingStrategyImpl(TransactionOwnership ownership) {
+    TransactionNamingStrategyImpl(TransactionOwnership ownership, boolean 
requiresKnownTopics) {
         this.ownership = ownership;
+        this.requiresKnownTopics = requiresKnownTopics;
+    }
+
+    public boolean requiresKnownTopics() {
+        return requiresKnownTopics;
+    }
+
+    public TransactionOwnership getOwnership() {
+        return ownership;
     }
 
     /**
@@ -70,16 +96,14 @@ public enum TransactionNamingStrategyImpl {
     public abstract FlinkKafkaInternalProducer<byte[], byte[]> 
getTransactionalProducer(
             Context context);
 
-    public TransactionOwnership getOwnership() {
-        return ownership;
-    }
-
     /** Context for the transaction naming strategy. */
     public interface Context {
         String buildTransactionalId(long offset);
 
         long getNextCheckpointId();
 
+        Set<String> getOngoingTransactions();
+
         long getLastCheckpointId();
 
         FlinkKafkaInternalProducer<byte[], byte[]> getProducer(String 
transactionalId);
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java
index 3208483f..bd9b6cce 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionalIdFactory.java
@@ -44,11 +44,11 @@ public class TransactionalIdFactory {
                 + checkpointOffset;
     }
 
-    public static long extractSubtaskId(String name) {
+    public static int extractSubtaskId(String name) {
         int lastSep = name.lastIndexOf("-");
         int secondLastSep = name.lastIndexOf("-", lastSep - 1);
         String subtaskString = name.substring(secondLastSep + 1, lastSep);
-        return Long.parseLong(subtaskString);
+        return Integer.parseInt(subtaskString);
     }
 
     public static String extractPrefix(String name) {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java
index 83c5b560..66706f96 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriterITCase.java
@@ -20,11 +20,13 @@ package org.apache.flink.connector.kafka.sink;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.internal.CheckpointTransaction;
 import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
 import org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl;
 import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
 import org.apache.flink.connector.kafka.sink.internal.TransactionOwnership;
 import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel;
+import org.apache.flink.connector.kafka.util.AdminUtils;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
@@ -32,6 +34,7 @@ import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.util.TestLoggerExtension;
 
 import com.google.common.collect.Iterables;
+import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.ProducerFencedException;
@@ -42,11 +45,14 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.function.Consumer;
 
+import static 
org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory.buildTransactionalId;
 import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.drainAllRecordsFromTopic;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
@@ -240,6 +246,38 @@ public class ExactlyOnceKafkaWriterITCase extends 
KafkaWriterTestBase {
         }
     }
 
+    /** Test that writer does not abort those transactions that are passed in 
as writer state. */
+    @ParameterizedTest
+    @ValueSource(ints = {1, 2, 3})
+    void shouldNotAbortPrecommittedTransactions(int numCheckpointed) throws 
Exception {
+        try (final KafkaWriter<Integer> failedWriter =
+                createWriter(DeliveryGuarantee.EXACTLY_ONCE)) {
+
+            // create three precommitted transactions
+            List<KafkaWriterState> states =
+                    Arrays.asList(
+                            onCheckpointBarrier(failedWriter, 1).f0, // 1 
transaction
+                            onCheckpointBarrier(failedWriter, 2).f0, // 2 
transactions
+                            onCheckpointBarrier(failedWriter, 3).f0); // 3 
transactions
+
+            // assume a varying number of states that have been checkpointed
+            try (final ExactlyOnceKafkaWriter<Integer> recoveredWriter =
+                    restoreWriter(
+                            this::withPooling,
+                            List.of(states.get(numCheckpointed - 1)),
+                            createInitContext())) {
+                // test abort of recoveredWriter; this should abort all 
transactions that have
+                // not been in the part of the checkpoint
+                try (AdminClient admin = 
AdminClient.create(getKafkaClientConfiguration())) {
+                    assertThat(
+                                    AdminUtils.getOpenTransactionsForTopics(
+                                            admin, 
Collections.singleton(topic)))
+                            .hasSize(numCheckpointed);
+                }
+            }
+        }
+    }
+
     /** Test that producers are reused when committed. */
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
@@ -339,11 +377,65 @@ public class ExactlyOnceKafkaWriterITCase extends 
KafkaWriterTestBase {
         }
     }
 
+    /** Test that producers are reused when committed. */
+    @Test
+    void shouldSkipIdsOfCommitterForPooledTransactions() throws Exception {
+        String prefix = getTransactionalPrefix();
+        CheckpointTransaction t1 = new 
CheckpointTransaction(buildTransactionalId(prefix, 0, 2), 2);
+        CheckpointTransaction t2 = new 
CheckpointTransaction(buildTransactionalId(prefix, 0, 4), 4);
+        final KafkaWriterState writerState =
+                new KafkaWriterState(
+                        prefix,
+                        0,
+                        1,
+                        TransactionOwnership.EXPLICIT_BY_WRITER_STATE,
+                        Arrays.asList(t1, t2));
+
+        SinkInitContext initContext = createInitContext();
+        int checkpointId = 9;
+        initContext.setRestoredCheckpointId(checkpointId);
+        try (final ExactlyOnceKafkaWriter<Integer> writer =
+                        restoreWriter(
+                                this::withPooling,
+                                Collections.singletonList(writerState),
+                                initContext);
+                WritableBackchannel<TransactionFinished> backchannel = 
getBackchannel(writer)) {
+            // offsets leave out the used 2 and 4
+            for (int expectedOffset : new int[] {0, 1, 3, 5, 6}) {
+                assertThat(writer.getCurrentProducer().getTransactionalId())
+                        .isEqualTo(buildTransactionalId(prefix, 0, 
expectedOffset));
+                writer.write(checkpointId, SINK_WRITER_CONTEXT);
+                writer.flush(false);
+                writer.prepareCommit();
+                writer.snapshotState(++checkpointId);
+            }
+
+            // free 4, which also frees 2; 2 is returned first and then 4
+            backchannel.send(new TransactionFinished(t2.getTransactionalId(), 
true));
+            writer.write(checkpointId, SINK_WRITER_CONTEXT);
+            writer.prepareCommit();
+            writer.snapshotState(++checkpointId);
+            assertThat(writer.getCurrentProducer().getTransactionalId())
+                    .isEqualTo(t1.getTransactionalId());
+
+            writer.write(checkpointId, SINK_WRITER_CONTEXT);
+            writer.prepareCommit();
+            writer.snapshotState(++checkpointId);
+            assertThat(writer.getCurrentProducer().getTransactionalId())
+                    .isEqualTo(t2.getTransactionalId());
+        }
+    }
+
     private static Collection<FlinkKafkaInternalProducer<byte[], byte[]>> 
getProducers(
             ExactlyOnceKafkaWriter<Integer> writer) {
         return ((ProducerPoolImpl) writer.getProducerPool()).getProducers();
     }
 
+    private KafkaSinkBuilder<?> withPooling(KafkaSinkBuilder<?> builder) {
+        return builder.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
+                
.setTransactionNamingStrategy(TransactionNamingStrategy.POOLING);
+    }
+
     private Tuple2<KafkaWriterState, KafkaCommittable> onCheckpointBarrier(
             KafkaWriter<Integer> failedWriter, int checkpointId)
             throws IOException, InterruptedException {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
index 7ddbdda6..c87bfd50 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
@@ -66,7 +66,8 @@ class KafkaCommitterTest {
     public void testRetryCommittableOnRetriableError() throws IOException, 
InterruptedException {
         Properties properties = getProperties();
         try (final KafkaCommitter committer =
-                        new KafkaCommitter(properties, TRANS_ID, SUB_ID, 
ATTEMPT, MOCK_FACTORY);
+                        new KafkaCommitter(
+                                properties, TRANS_ID, SUB_ID, ATTEMPT, false, 
MOCK_FACTORY);
                 FlinkKafkaInternalProducer<Object, Object> producer =
                         new FlinkKafkaInternalProducer<>(properties, TRANS_ID);
                 ReadableBackchannel<TransactionFinished> backchannel =
@@ -87,7 +88,8 @@ class KafkaCommitterTest {
     public void testFailJobOnUnknownFatalError() throws IOException, 
InterruptedException {
         Properties properties = getProperties();
         try (final KafkaCommitter committer =
-                        new KafkaCommitter(properties, TRANS_ID, SUB_ID, 
ATTEMPT, MOCK_FACTORY);
+                        new KafkaCommitter(
+                                properties, TRANS_ID, SUB_ID, ATTEMPT, false, 
MOCK_FACTORY);
                 FlinkKafkaInternalProducer<Object, Object> producer =
                         new FlinkKafkaInternalProducer<>(properties, TRANS_ID);
                 ReadableBackchannel<TransactionFinished> backchannel =
@@ -111,7 +113,8 @@ class KafkaCommitterTest {
     public void testFailJobOnKnownFatalError() throws IOException, 
InterruptedException {
         Properties properties = getProperties();
         try (final KafkaCommitter committer =
-                        new KafkaCommitter(properties, TRANS_ID, SUB_ID, 
ATTEMPT, MOCK_FACTORY);
+                        new KafkaCommitter(
+                                properties, TRANS_ID, SUB_ID, ATTEMPT, false, 
MOCK_FACTORY);
                 FlinkKafkaInternalProducer<?, ?> producer =
                         new MockProducer(properties, new 
ProducerFencedException("test"));
                 ReadableBackchannel<TransactionFinished> backchannel =
@@ -135,7 +138,8 @@ class KafkaCommitterTest {
                     return new MockProducer(props, new 
ProducerFencedException("test"));
                 };
         try (final KafkaCommitter committer =
-                        new KafkaCommitter(properties, TRANS_ID, SUB_ID, 
ATTEMPT, failingFactory);
+                        new KafkaCommitter(
+                                properties, TRANS_ID, SUB_ID, ATTEMPT, false, 
failingFactory);
                 ReadableBackchannel<TransactionFinished> backchannel =
                         BackchannelFactory.getInstance()
                                 .getReadableBackchannel(SUB_ID, ATTEMPT, 
TRANS_ID)) {
@@ -164,7 +168,8 @@ class KafkaCommitterTest {
         Properties properties = getProperties();
         try (FlinkKafkaInternalProducer<?, ?> producer = new 
MockProducer(properties, null);
                 final KafkaCommitter committer =
-                        new KafkaCommitter(properties, TRANS_ID, SUB_ID, 
ATTEMPT, MOCK_FACTORY);
+                        new KafkaCommitter(
+                                properties, TRANS_ID, SUB_ID, ATTEMPT, false, 
MOCK_FACTORY);
                 ReadableBackchannel<TransactionFinished> backchannel =
                         BackchannelFactory.getInstance()
                                 .getReadableBackchannel(SUB_ID, ATTEMPT, 
TRANS_ID)) {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index b2c182e1..c91b9bbb 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -27,6 +28,8 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
@@ -35,6 +38,7 @@ import 
org.apache.flink.configuration.ExternalizedCheckpointRetention;
 import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
 import 
org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
 import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
 import org.apache.flink.connector.kafka.testutils.KafkaUtil;
@@ -47,7 +51,10 @@ import 
org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -56,9 +63,7 @@ import 
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 import org.apache.flink.test.junit5.InjectClusterClient;
 import org.apache.flink.test.junit5.InjectMiniCluster;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -72,6 +77,7 @@ import org.apache.kafka.clients.admin.CreateTopicsResult;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.assertj.core.api.InstanceOfAssertFactories;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
@@ -81,6 +87,9 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,6 +104,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -103,12 +113,14 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_PATH;
 import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.checkProducerLeak;
@@ -196,8 +208,18 @@ public class KafkaSinkITCase extends TestLogger {
                 };
 
         @TestContext
-        KafkaSinkExternalContextFactory sinkContext =
-                new KafkaSinkExternalContextFactory(kafka.getContainer(), 
Collections.emptyList());
+        KafkaSinkExternalContextFactory incrementing =
+                new KafkaSinkExternalContextFactory(
+                        kafka.getContainer(),
+                        Collections.emptyList(),
+                        TransactionNamingStrategy.INCREMENTING);
+
+        @TestContext
+        KafkaSinkExternalContextFactory pooling =
+                new KafkaSinkExternalContextFactory(
+                        kafka.getContainer(),
+                        Collections.emptyList(),
+                        TransactionNamingStrategy.POOLING);
     }
 
     @Test
@@ -210,10 +232,19 @@ public class KafkaSinkITCase extends TestLogger {
         writeRecordsToKafka(DeliveryGuarantee.NONE);
     }
 
-    @ParameterizedTest(name = "chained={0}")
-    @ValueSource(booleans = {true, false})
-    public void testWriteRecordsToKafkaWithExactlyOnceGuarantee(boolean 
chained) throws Exception {
-        writeRecordsToKafka(DeliveryGuarantee.EXACTLY_ONCE, chained);
+    @ParameterizedTest(name = "{0}, chained={1}")
+    @MethodSource("getEOSParameters")
+    public void testWriteRecordsToKafkaWithExactlyOnceGuarantee(
+            TransactionNamingStrategy namingStrategy, boolean chained) throws 
Exception {
+        writeRecordsToKafka(DeliveryGuarantee.EXACTLY_ONCE, namingStrategy, 
chained);
+    }
+
+    static Stream<Arguments> getEOSParameters() {
+        return Arrays.stream(TransactionNamingStrategy.values())
+                .flatMap(
+                        strategy ->
+                                Stream.of(true, false)
+                                        .map(chained -> Arguments.of(strategy, 
chained)));
     }
 
     @Test
@@ -221,22 +252,24 @@ public class KafkaSinkITCase extends TestLogger {
         testRecoveryWithAssertion(DeliveryGuarantee.AT_LEAST_ONCE, 1);
     }
 
-    @ParameterizedTest(name = "chained={0}")
-    @ValueSource(booleans = {true, false})
-    public void testRecoveryWithExactlyOnceGuarantee(boolean chained) throws 
Exception {
-        testRecoveryWithAssertion(DeliveryGuarantee.EXACTLY_ONCE, 1, chained);
+    @ParameterizedTest(name = "{0}, chained={1}")
+    @MethodSource("getEOSParameters")
+    public void testRecoveryWithExactlyOnceGuarantee(
+            TransactionNamingStrategy namingStrategy, boolean chained) throws 
Exception {
+        testRecoveryWithAssertion(DeliveryGuarantee.EXACTLY_ONCE, 1, 
namingStrategy, chained);
     }
 
-    @ParameterizedTest(name = "chained={0}")
-    @ValueSource(booleans = {true, false})
-    public void 
testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints(boolean chained)
-            throws Exception {
-        testRecoveryWithAssertion(DeliveryGuarantee.EXACTLY_ONCE, 2, chained);
+    @ParameterizedTest(name = "{0}, chained={1}")
+    @MethodSource("getEOSParameters")
+    public void testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints(
+            TransactionNamingStrategy namingStrategy, boolean chained) throws 
Exception {
+        testRecoveryWithAssertion(DeliveryGuarantee.EXACTLY_ONCE, 2, 
namingStrategy, chained);
     }
 
-    @ParameterizedTest(name = "chained={0}")
-    @ValueSource(booleans = {true, false})
+    @ParameterizedTest(name = "{0}, chained={1}")
+    @MethodSource("getEOSParameters")
     public void testAbortTransactionsOfPendingCheckpointsAfterFailure(
+            TransactionNamingStrategy namingStrategy,
             boolean chained,
             @TempDir File checkpointDir,
             @InjectMiniCluster MiniCluster miniCluster,
@@ -260,6 +293,7 @@ public class KafkaSinkITCase extends TestLogger {
                             new FailAsyncCheckpointMapper(1),
                             checkpointedRecords,
                             config,
+                            namingStrategy,
                             chained,
                             "firstPrefix",
                             clusterClient);
@@ -279,6 +313,7 @@ public class KafkaSinkITCase extends TestLogger {
                 new FailingCheckpointMapper(failed),
                 checkpointedRecords,
                 config,
+                namingStrategy,
                 chained,
                 "newPrefix",
                 clusterClient);
@@ -287,10 +322,13 @@ public class KafkaSinkITCase extends TestLogger {
         
assertThat(committedRecords).containsExactlyInAnyOrderElementsOf(checkpointedRecords.get());
     }
 
-    @ParameterizedTest(name = "chained={0}")
-    @ValueSource(booleans = {true, false})
+    @ParameterizedTest(name = "{0}, chained={1}")
+    @MethodSource("getEOSParameters")
     public void testAbortTransactionsAfterScaleInBeforeFirstCheckpoint(
-            boolean chained, @InjectClusterClient ClusterClient<?> 
clusterClient) throws Exception {
+            TransactionNamingStrategy namingStrategy,
+            boolean chained,
+            @InjectClusterClient ClusterClient<?> clusterClient)
+            throws Exception {
         // Run a first job opening 5 transactions one per subtask and fail in 
async checkpoint phase
         try {
             SharedReference<Set<Long>> checkpointedRecords =
@@ -300,6 +338,7 @@ public class KafkaSinkITCase extends TestLogger {
                     new FailAsyncCheckpointMapper(0),
                     checkpointedRecords,
                     config,
+                    namingStrategy,
                     chained,
                     null,
                     clusterClient);
@@ -318,6 +357,7 @@ public class KafkaSinkITCase extends TestLogger {
                 new FailingCheckpointMapper(failed),
                 checkpointedRecords,
                 config,
+                namingStrategy,
                 chained,
                 null,
                 clusterClient);
@@ -326,6 +366,158 @@ public class KafkaSinkITCase extends TestLogger {
         
assertThat(committedRecords).containsExactlyInAnyOrderElementsOf(checkpointedRecords.get());
     }
 
+    @ParameterizedTest(name = "{0}->{1}")
+    @CsvSource({"1,2", "2,3", "2,5", "3,5", "5,6", "6,5", "5,2", "5,3", "3,2", 
"2,1"})
+    public void rescaleListing(
+            int oldParallelism,
+            int newParallelsm,
+            @TempDir File checkpointDir,
+            @InjectMiniCluster MiniCluster miniCluster,
+            @InjectClusterClient ClusterClient<?> clusterClient)
+            throws Exception {
+        // Run a first job failing during the async phase of a checkpoint to 
leave some
+        // lingering transactions
+        final Configuration config = createConfiguration(oldParallelism);
+        config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
+        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
+        config.set(
+                CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
+                ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
+        config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2);
+        SharedReference<Set<Long>> checkpointedRecords =
+                sharedObjects.add(new ConcurrentSkipListSet<>());
+
+        JobID firstJobId =
+                executeWithMapper(
+                        new FailAsyncCheckpointMapper(1),
+                        checkpointedRecords,
+                        config,
+                        TransactionNamingStrategy.POOLING,
+                        false,
+                        "firstPrefix",
+                        clusterClient);
+
+        config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, firstJobId));
+        config.set(CoreOptions.DEFAULT_PARALLELISM, newParallelsm);
+
+        // Run a second job which aborts all lingering transactions and new 
consumer should
+        // immediately see the newly written records
+        JobID secondJobId =
+                executeWithMapper(
+                        new FailAsyncCheckpointMapper(1),
+                        checkpointedRecords,
+                        config,
+                        TransactionNamingStrategy.POOLING,
+                        false,
+                        "secondPrefix",
+                        clusterClient);
+
+        config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, 
secondJobId));
+        config.set(CoreOptions.DEFAULT_PARALLELISM, oldParallelism);
+
+        SharedReference<AtomicBoolean> failed = sharedObjects.add(new 
AtomicBoolean(true));
+        executeWithMapper(
+                new FailingCheckpointMapper(failed),
+                checkpointedRecords,
+                config,
+                TransactionNamingStrategy.POOLING,
+                false,
+                "thirdPrefix",
+                clusterClient);
+
+        final List<Long> committedRecords =
+                deserializeValues(drainAllRecordsFromTopic(topic, true));
+        
assertThat(committedRecords).containsExactlyInAnyOrderElementsOf(checkpointedRecords.get());
+    }
+
+    private String getCheckpointPath(MiniCluster miniCluster, JobID 
secondJobId)
+            throws InterruptedException, ExecutionException, 
FlinkJobNotFoundException {
+        final Optional<String> completedCheckpoint =
+                CommonTestUtils.getLatestCompletedCheckpointPath(secondJobId, 
miniCluster);
+
+        assertThat(completedCheckpoint).isPresent();
+        return completedCheckpoint.get();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void checkMigration(
+            boolean supportedMigration,
+            @TempDir File checkpointDir,
+            @InjectMiniCluster MiniCluster miniCluster,
+            @InjectClusterClient ClusterClient<?> clusterClient)
+            throws Exception {
+        // Run a first job failing during the async phase of a checkpoint to 
leave some
+        // lingering transactions
+        final Configuration config = createConfiguration(5);
+        config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
+        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
+        config.set(
+                CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
+                ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
+        config.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 2);
+        SharedReference<Set<Long>> checkpointedRecords =
+                sharedObjects.add(new ConcurrentSkipListSet<>());
+
+        JobID firstJobId =
+                executeWithMapper(
+                        new FailAsyncCheckpointMapper(1),
+                        checkpointedRecords,
+                        config,
+                        TransactionNamingStrategy.INCREMENTING,
+                        true,
+                        "firstPrefix",
+                        clusterClient);
+
+        // Run a second job which switching to POOLING
+        config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, firstJobId));
+        config.set(CoreOptions.DEFAULT_PARALLELISM, 5);
+        JobID secondJobId2 =
+                executeWithMapper(
+                        new FailAsyncCheckpointMapper(1),
+                        checkpointedRecords,
+                        config,
+                        TransactionNamingStrategy.POOLING,
+                        true,
+                        "secondPrefix",
+                        clusterClient);
+
+        // Run a third job with downscaling
+        config.set(SAVEPOINT_PATH, getCheckpointPath(miniCluster, 
secondJobId2));
+        config.set(CoreOptions.DEFAULT_PARALLELISM, 3);
+        JobID thirdJobId =
+                executeWithMapper(
+                        v -> v,
+                        checkpointedRecords,
+                        config,
+                        supportedMigration
+                                ? TransactionNamingStrategy.POOLING
+                                : TransactionNamingStrategy.INCREMENTING,
+                        true,
+                        "thirdPrefix",
+                        clusterClient);
+
+        JobResult jobResult = clusterClient.requestJobResult(thirdJobId).get();
+        assertThat(jobResult.getApplicationStatus())
+                .isEqualTo(
+                        supportedMigration
+                                ? ApplicationStatus.SUCCEEDED
+                                : ApplicationStatus.FAILED);
+
+        if (supportedMigration) {
+            final List<Long> committedRecords =
+                    deserializeValues(drainAllRecordsFromTopic(topic, true));
+            assertThat(committedRecords)
+                    
.containsExactlyInAnyOrderElementsOf(checkpointedRecords.get());
+        } else {
+            assertThat(jobResult.getSerializedThrowable())
+                    .get()
+                    .asInstanceOf(InstanceOfAssertFactories.THROWABLE)
+                    .rootCause()
+                    .hasMessageContaining("Attempted to switch back to 
INCREMENTING");
+        }
+    }
+
     private static Configuration createConfiguration(int parallelism) {
         final Configuration config = new Configuration();
         config.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
@@ -336,9 +528,10 @@ public class KafkaSinkITCase extends TestLogger {
             MapFunction<Long, Long> mapper,
             SharedReference<Set<Long>> checkpointedRecords,
             Configuration config,
+            TransactionNamingStrategy namingStrategy,
             boolean chained,
             @Nullable String transactionalIdPrefix,
-            @InjectClusterClient ClusterClient<?> clusterClient)
+            ClusterClient<?> clusterClient)
             throws Exception {
 
         config.set(RestartStrategyOptions.RESTART_STRATEGY, "disable");
@@ -347,7 +540,7 @@ public class KafkaSinkITCase extends TestLogger {
         if (!chained) {
             env.disableOperatorChaining();
         }
-        final DataStreamSource<Long> source = env.fromSequence(1, 10);
+        final DataStream<Long> source = createThrottlingSource(env);
         final DataStream<Long> stream =
                 source.map(mapper).map(new 
RecordFetcher(checkpointedRecords)).uid("fetcher");
         final KafkaSinkBuilder<Long> builder =
@@ -358,7 +551,8 @@ public class KafkaSinkITCase extends TestLogger {
                                 KafkaRecordSerializationSchema.builder()
                                         .setTopic(topic)
                                         .setValueSerializationSchema(new 
RecordSerializer())
-                                        .build());
+                                        .build())
+                        .setTransactionNamingStrategy(namingStrategy);
         if (transactionalIdPrefix == null) {
             transactionalIdPrefix = "kafka-sink";
         }
@@ -372,11 +566,15 @@ public class KafkaSinkITCase extends TestLogger {
 
     private void testRecoveryWithAssertion(
             DeliveryGuarantee guarantee, int maxConcurrentCheckpoints) throws 
Exception {
-        testRecoveryWithAssertion(guarantee, maxConcurrentCheckpoints, true);
+        testRecoveryWithAssertion(
+                guarantee, maxConcurrentCheckpoints, 
TransactionNamingStrategy.DEFAULT, true);
     }
 
     private void testRecoveryWithAssertion(
-            DeliveryGuarantee guarantee, int maxConcurrentCheckpoints, boolean 
chained)
+            DeliveryGuarantee guarantee,
+            int maxConcurrentCheckpoints,
+            TransactionNamingStrategy namingStrategy,
+            boolean chained)
             throws Exception {
         final StreamExecutionEnvironment env =
                 
StreamExecutionEnvironment.getExecutionEnvironment(createConfiguration(1));
@@ -385,7 +583,7 @@ public class KafkaSinkITCase extends TestLogger {
         }
         env.enableCheckpointing(300L);
         
env.getCheckpointConfig().setMaxConcurrentCheckpoints(maxConcurrentCheckpoints);
-        DataStreamSource<Long> source = env.fromSequence(1, 10);
+        final DataStream<Long> source = createThrottlingSource(env);
         SharedReference<Set<Long>> checkpointedRecords =
                 sharedObjects.add(new ConcurrentSkipListSet<>());
         DataStream<Long> stream =
@@ -402,6 +600,7 @@ public class KafkaSinkITCase extends TestLogger {
                                         .setValueSerializationSchema(new 
RecordSerializer())
                                         .build())
                         .setTransactionalIdPrefix("kafka-sink")
+                        .setTransactionNamingStrategy(namingStrategy)
                         .build());
         env.execute();
 
@@ -419,10 +618,13 @@ public class KafkaSinkITCase extends TestLogger {
     }
 
     private void writeRecordsToKafka(DeliveryGuarantee deliveryGuarantee) 
throws Exception {
-        writeRecordsToKafka(deliveryGuarantee, true);
+        writeRecordsToKafka(deliveryGuarantee, 
TransactionNamingStrategy.DEFAULT, true);
     }
 
-    private void writeRecordsToKafka(DeliveryGuarantee deliveryGuarantee, 
boolean chained)
+    private void writeRecordsToKafka(
+            DeliveryGuarantee deliveryGuarantee,
+            TransactionNamingStrategy namingStrategy,
+            boolean chained)
             throws Exception {
         final StreamExecutionEnvironment env =
                 
StreamExecutionEnvironment.getExecutionEnvironment(createConfiguration(1));
@@ -430,7 +632,7 @@ public class KafkaSinkITCase extends TestLogger {
             env.disableOperatorChaining();
         }
         env.enableCheckpointing(100L);
-        final DataStream<Long> source = env.addSource(new 
InfiniteIntegerSource());
+        final DataStream<Long> source = createThrottlingSource(env);
         SharedReference<Set<Long>> checkpointedRecords =
                 sharedObjects.add(new ConcurrentSkipListSet<>());
         source.map(new RecordFetcher(checkpointedRecords))
@@ -444,6 +646,7 @@ public class KafkaSinkITCase extends TestLogger {
                                                 
.setValueSerializationSchema(new RecordSerializer())
                                                 .build())
                                 .setTransactionalIdPrefix("kafka-sink")
+                                .setTransactionNamingStrategy(namingStrategy)
                                 .build());
         env.execute();
 
@@ -454,6 +657,17 @@ public class KafkaSinkITCase extends TestLogger {
         
assertThat(collectedRecords).containsExactlyInAnyOrderElementsOf(checkpointedRecords.get());
     }
 
+    private DataStream<Long> createThrottlingSource(StreamExecutionEnvironment 
env) {
+        return env.fromSource(
+                new DataGeneratorSource<>(
+                        value -> value,
+                        1000,
+                        new ThrottleUntilFirstCheckpointStrategy(),
+                        BasicTypeInfo.LONG_TYPE_INFO),
+                WatermarkStrategy.noWatermarks(),
+                "Generator Source");
+    }
+
     private static List<Long> deserializeValues(List<ConsumerRecord<byte[], 
byte[]>> records) {
         return records.stream()
                 .map(
@@ -582,7 +796,6 @@ public class KafkaSinkITCase extends TestLogger {
 
         @Override
         public Long map(Long value) throws Exception {
-            Thread.sleep(100);
             return value;
         }
 
@@ -683,8 +896,6 @@ public class KafkaSinkITCase extends TestLogger {
                 failed.get().set(true);
                 throw new RuntimeException("Planned exception.");
             }
-            // Delay execution to ensure that at-least one checkpoint is 
triggered before finish
-            Thread.sleep(50);
             emittedBetweenCheckpoint++;
             return value;
         }
@@ -696,39 +907,30 @@ public class KafkaSinkITCase extends TestLogger {
         }
     }
 
-    /**
-     * Exposes information about how man records have been emitted overall and 
finishes after
-     * receiving the checkpoint completed event.
-     */
-    private static final class InfiniteIntegerSource
-            implements SourceFunction<Long>, CheckpointListener {
-
-        private volatile boolean running = true;
-        private final AtomicInteger nextRecord = new AtomicInteger();
-
-        InfiniteIntegerSource() {}
+    private static class ThrottleUntilFirstCheckpointStrategy implements 
RateLimiterStrategy {
+        private final RateLimiterStrategy baseStrategy = 
RateLimiterStrategy.perCheckpoint(10);
 
         @Override
-        public void run(SourceContext<Long> ctx) throws Exception {
-            Object lock = ctx.getCheckpointLock();
-            while (running) {
-                synchronized (lock) {
-                    ctx.collect((long) nextRecord.getAndIncrement());
-                    Thread.sleep(1);
+        public RateLimiter createRateLimiter(int parallelism) {
+            RateLimiter baseLimiter = 
baseStrategy.createRateLimiter(parallelism);
+
+            return new RateLimiter() {
+                int numCheckpointed;
+
+                @Override
+                public CompletionStage<Void> acquire() {
+                    if (numCheckpointed >= 2) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    return baseLimiter.acquire();
                 }
-            }
-            LOG.info("last emitted record {}", nextRecord.get() - 1);
-        }
-
-        @Override
-        public void cancel() {
-            running = false;
-        }
 
-        @Override
-        public void notifyCheckpointComplete(long checkpointId) {
-            running = false;
-            LOG.info("notifyCheckpointCompleted {}", checkpointId);
+                @Override
+                public void notifyCheckpointComplete(long checkpointId) {
+                    baseLimiter.notifyCheckpointComplete(checkpointId);
+                    numCheckpointed++;
+                }
+            };
         }
     }
 }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
index 11de7167..1046ee3c 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
@@ -23,6 +23,10 @@ import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
 import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory;
 import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
 import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel;
@@ -138,6 +142,10 @@ public abstract class KafkaWriterTestBase {
         return (T) createSink(sinkBuilderAdjuster).restoreWriter(initContext, 
recoveredState);
     }
 
+    public String getTransactionalPrefix() {
+        return TEST_PREFIX + writerIndex;
+    }
+
     KafkaSink<Integer> createSink(Consumer<KafkaSinkBuilder<?>> 
sinkBuilderAdjuster) {
         KafkaSinkBuilder<Integer> builder =
                 KafkaSink.<Integer>builder()
@@ -180,6 +188,7 @@ public abstract class KafkaWriterTestBase {
         protected final SinkWriterMetricGroup metricGroup;
         protected final ProcessingTimeService timeService;
         @Nullable protected final Consumer<RecordMetadata> metadataConsumer;
+        private Long checkpointId;
 
         SinkInitContext(
                 SinkWriterMetricGroup metricGroup,
@@ -205,11 +214,6 @@ public abstract class KafkaWriterTestBase {
             return metricGroup;
         }
 
-        @Override
-        public OptionalLong getRestoredCheckpointId() {
-            return OptionalLong.empty();
-        }
-
         @Override
         public SerializationSchema.InitializationContext
                 asSerializationSchemaInitializationContext() {
@@ -220,11 +224,20 @@ public abstract class KafkaWriterTestBase {
         public <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
             return Optional.ofNullable((Consumer<MetaT>) metadataConsumer);
         }
+
+        @Override
+        public OptionalLong getRestoredCheckpointId() {
+            return checkpointId == null ? OptionalLong.empty() : 
OptionalLong.of(checkpointId);
+        }
+
+        public void setRestoredCheckpointId(long checkpointId) {
+            this.checkpointId = checkpointId;
+        }
     }
 
     /** mock recordSerializer for KafkaSink. */
     protected static class DummyRecordSerializer
-            implements KafkaRecordSerializationSchema<Integer> {
+            implements KafkaRecordSerializationSchema<Integer>, 
KafkaDatasetFacetProvider {
         @Override
         public ProducerRecord<byte[], byte[]> serialize(
                 Integer element, KafkaSinkContext context, Long timestamp) {
@@ -235,6 +248,14 @@ public abstract class KafkaWriterTestBase {
             byte[] bytes = ByteBuffer.allocate(4).putInt(element).array();
             return new ProducerRecord<>(topic, bytes, bytes);
         }
+
+        @Override
+        public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
+            return Optional.of(
+                    new DefaultKafkaDatasetFacet(
+                            DefaultKafkaDatasetIdentifier.ofTopics(
+                                    Collections.singletonList(topic))));
+        }
     }
 
     /**
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java
index ddc0592f..d25aece8 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java
@@ -83,7 +83,7 @@ class ProducerPoolImplITCase {
         }
     }
 
-    /** Tests direct recycling as used during abortion of transactions. */
+    /** Tests direct recycling as used during abort of transactions. */
     @Test
     void testRecycleProducer() throws Exception {
         try (ProducerPoolImpl producerPool =
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImplTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImplTest.java
new file mode 100644
index 00000000..7fea2e00
--- /dev/null
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImplTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID;
+import static 
org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyImpl.LISTING;
+import static 
org.apache.flink.connector.kafka.sink.internal.TransactionalIdFactory.extractSubtaskId;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class TransactionAbortStrategyImplTest {
+
+    private static final String PREFIX = "prefix";
+
+    @Nested
+    class Listing {
+        @Test
+        void testDownscale() {
+            TestContext testContext = new TestContext();
+            testContext.setSubtaskIdAndParallelism(0, 2);
+            testContext.setOwnedSubtaskIdsAndMaxParallelism(0, 1, 4);
+
+            testContext.addPrecommittedTransactionalIds(0, 1L);
+            testContext.addPrecommittedTransactionalIds(1, 1L);
+            testContext.addOpenTransaction(2, 1L); // not owned
+            String t02 = testContext.addOpenTransaction(0, 2L);
+            String t12 = testContext.addOpenTransaction(1, 2L);
+            testContext.addOpenTransaction(2, 2L); // not owned
+
+            LISTING.abortTransactions(testContext);
+            
assertThat(testContext.getAbortedTransactions()).containsExactlyInAnyOrder(t02, 
t12);
+        }
+
+        @Test
+        void testDownscaleWithUnsafeTransactionalIds() {
+            TestContext testContext = new TestContext();
+            testContext.setSubtaskIdAndParallelism(1, 2);
+            testContext.setOwnedSubtaskIdsAndMaxParallelism(2, 3, 4);
+
+            String t11 = testContext.addOpenTransaction(1, 1L); // not owned
+            testContext.addPrecommittedTransactionalIds(2, 1L);
+            testContext.addPrecommittedTransactionalIds(3, 1L);
+            String t12 = testContext.addOpenTransaction(1, 2L); // not owned
+            String t22 = testContext.addOpenTransaction(2, 2L);
+            String t32 = testContext.addOpenTransaction(3, 2L);
+
+            LISTING.abortTransactions(testContext);
+            
assertThat(testContext.getAbortedTransactions()).containsExactlyInAnyOrder(t22, 
t32);
+        }
+
+        @Test
+        void testDownscaleWithIntermediateUpscale() {
+            TestContext testContext = new TestContext();
+            testContext.setSubtaskIdAndParallelism(0, 2);
+            testContext.setOwnedSubtaskIdsAndMaxParallelism(0, 1, 4);
+
+            testContext.addPrecommittedTransactionalIds(0, 1L);
+            testContext.addPrecommittedTransactionalIds(1, 1L);
+            String t02 = testContext.addOpenTransaction(0, 2L);
+            String t12 = testContext.addOpenTransaction(1, 2L);
+            String t52 = testContext.addOpenTransaction(5, 2L);
+            testContext.addOpenTransaction(6, 2L); // not owned
+
+            LISTING.abortTransactions(testContext);
+            assertThat(testContext.getAbortedTransactions())
+                    .containsExactlyInAnyOrder(t02, t12, t52);
+        }
+
+        @Test
+        void testUpscale() {
+            TestContext testContext = new TestContext();
+            testContext.setSubtaskIdAndParallelism(0, 4);
+            testContext.setOwnedSubtaskIdsAndMaxParallelism(0, 2);
+
+            testContext.addPrecommittedTransactionalIds(0, 1L);
+            testContext.addOpenTransaction(1, 1L); // not owned
+            String t02 = testContext.addOpenTransaction(0, 2L);
+            testContext.addOpenTransaction(1, 2L); // not owned
+
+            LISTING.abortTransactions(testContext);
+            
assertThat(testContext.getAbortedTransactions()).containsExactlyInAnyOrder(t02);
+        }
+
+        @Test
+        void testUpscaleWithIntermediateUpscale() {
+            TestContext testContext = new TestContext();
+            testContext.setSubtaskIdAndParallelism(0, 4);
+            testContext.setOwnedSubtaskIdsAndMaxParallelism(0, 2);
+
+            testContext.addPrecommittedTransactionalIds(0, 1L);
+            String t02 = testContext.addOpenTransaction(0, 2L);
+            String t42 = testContext.addOpenTransaction(4, 2L);
+            testContext.addOpenTransaction(5, 2L); // not owned
+
+            LISTING.abortTransactions(testContext);
+            
assertThat(testContext.getAbortedTransactions()).containsExactlyInAnyOrder(t02, 
t42);
+        }
+
+        @Test
+        void testUpscaleWithNoState() {
+            TestContext testContext = new TestContext();
+            testContext.setSubtaskIdAndParallelism(3, 4);
+            testContext.setOwnedSubtaskIdsAndMaxParallelism(3, 4);
+
+            String t31 = testContext.addOpenTransaction(3, 1L);
+            String t32 = testContext.addOpenTransaction(3, 2L);
+
+            LISTING.abortTransactions(testContext);
+            
assertThat(testContext.getAbortedTransactions()).containsExactlyInAnyOrder(t31, 
t32);
+        }
+    }
+
+    static final class TestContext implements 
TransactionAbortStrategyImpl.Context {
+        private final Set<Integer> ownedSubtaskIds = new HashSet<>();
+        private final Set<String> precommittedTransactionalIds = new 
HashSet<>();
+        private final Collection<String> abortedTransactions = new 
ArrayList<>();
+        private final Collection<String> openTransactionalIds = new 
ArrayList<>();
+        private int currentSubtaskId;
+        private int currentParallelism;
+        private int maxParallelism;
+        private String prefix = PREFIX;
+        private final Set<String> prefixesToAbort = new 
HashSet<>(Set.of(prefix));
+        private long startCheckpointId = INITIAL_CHECKPOINT_ID;
+
+        public void setPrefix(String prefix) {
+            this.prefix = prefix;
+            this.prefixesToAbort.add(prefix);
+        }
+
+        public Collection<String> getAbortedTransactions() {
+            return abortedTransactions;
+        }
+
+        public String addPrecommittedTransactionalIds(int oldSubtaskId, long 
checkpointId) {
+            String transactionalId = addOpenTransaction(oldSubtaskId, 
checkpointId);
+            precommittedTransactionalIds.add(transactionalId);
+            return transactionalId;
+        }
+
+        public String addOpenTransaction(int oldSubtaskId, long checkpointId) {
+            String transactionalId = getTransactionalId(oldSubtaskId, 
checkpointId);
+            openTransactionalIds.add(transactionalId);
+            return transactionalId;
+        }
+
+        @Override
+        public int getCurrentSubtaskId() {
+            return this.currentSubtaskId;
+        }
+
+        @Override
+        public int getCurrentParallelism() {
+            return this.currentParallelism;
+        }
+
+        @Override
+        public boolean ownsTransactionalId(String transactionalId) {
+            return ownedSubtaskIds.contains(extractSubtaskId(transactionalId) 
% maxParallelism);
+        }
+
+        @Override
+        public Set<String> getPrefixesToAbort() {
+            return this.prefixesToAbort;
+        }
+
+        @Override
+        public Set<String> getPrecommittedTransactionalIds() {
+            return this.precommittedTransactionalIds;
+        }
+
+        @Override
+        public long getStartCheckpointId() {
+            return this.startCheckpointId;
+        }
+
+        public void setStartCheckpointId(long startCheckpointId) {
+            this.startCheckpointId = startCheckpointId;
+        }
+
+        @Override
+        public TransactionAbortStrategyImpl.TransactionAborter 
getTransactionAborter() {
+            return transactionalId -> {
+                abortedTransactions.add(transactionalId);
+                return 0;
+            };
+        }
+
+        @Override
+        public Collection<String> getOpenTransactionsForTopics() {
+            return this.openTransactionalIds;
+        }
+
+        public void setSubtaskIdAndParallelism(int newSubtaskId, int 
newParallelism) {
+            this.currentSubtaskId = newSubtaskId;
+            this.currentParallelism = newParallelism;
+        }
+
+        public void setOwnedSubtaskIdsAndMaxParallelism(int... 
subtaskIdsOrParallelism) {
+            for (int index = 0; index < subtaskIdsOrParallelism.length; 
index++) {
+                if (index < subtaskIdsOrParallelism.length - 1) {
+                    this.ownedSubtaskIds.add(subtaskIdsOrParallelism[index]);
+                } else {
+                    this.maxParallelism = subtaskIdsOrParallelism[index];
+                }
+            }
+        }
+
+        private String getTransactionalId(int oldSubtaskId, long checkpointId) 
{
+            return TransactionalIdFactory.buildTransactionalId(prefix, 
oldSubtaskId, checkpointId);
+        }
+    }
+}
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnershipTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnershipTest.java
new file mode 100644
index 00000000..c63d5159
--- /dev/null
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnershipTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.connector.kafka.sink.KafkaWriterState;
+
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.apache.flink.connector.kafka.sink.KafkaWriterState.UNKNOWN;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+class TransactionOwnershipTest {
+    private static final String PREFIX = "prefix";
+
+    @Nested
+    class Implicit {
+        TransactionOwnership ownership = 
TransactionOwnership.IMPLICIT_BY_SUBTASK_ID;
+
+        @Test
+        void testNoRecoveredStates() {
+            assertThat(ownership.getOwnedSubtaskIds(2, 4, 
List.of())).containsExactlyInAnyOrder(2);
+            assertThat(ownership.getTotalNumberOfOwnedSubtasks(2, 4, 
List.of())).isEqualTo(4);
+        }
+
+        @Test
+        void testOneRecoveredStateWithUnknownValues() {
+            KafkaWriterState state =
+                    new KafkaWriterState(PREFIX, UNKNOWN, UNKNOWN, ownership, 
List.of());
+
+            assertThat(ownership.getOwnedSubtaskIds(2, 4, List.of(state)))
+                    .containsExactlyInAnyOrder(2);
+            assertThat(ownership.getTotalNumberOfOwnedSubtasks(2, 4, 
List.of(state))).isEqualTo(4);
+        }
+
+        @Test
+        void testOneRecoveredState() {
+            KafkaWriterState state = new KafkaWriterState(PREFIX, 1, 2, 
ownership, List.of());
+
+            assertThat(ownership.getOwnedSubtaskIds(2, 4, List.of(state)))
+                    .containsExactlyInAnyOrder(2);
+            assertThat(ownership.getTotalNumberOfOwnedSubtasks(2, 4, 
List.of(state))).isEqualTo(4);
+        }
+
+        @Test
+        void testOneRecoveredStateFromPoolingWithDownscaling() {
+            KafkaWriterState state =
+                    new KafkaWriterState(
+                            PREFIX, 1, 2, 
TransactionOwnership.EXPLICIT_BY_WRITER_STATE, List.of());
+
+            assertThatCode(() -> ownership.getOwnedSubtaskIds(2, 3, 
List.of(state)))
+                    .hasMessageContaining(
+                            "Attempted to switch the transaction naming 
strategy back to INCREMENTING");
+        }
+
+        @Test
+        void testTwoRecoveredStates() {
+            KafkaWriterState state1 = new KafkaWriterState(PREFIX, 1, 4, 
ownership, List.of());
+            KafkaWriterState state2 = new KafkaWriterState(PREFIX, 3, 4, 
ownership, List.of());
+
+            assertThat(ownership.getOwnedSubtaskIds(2, 3, List.of(state1, 
state2)))
+                    .containsExactlyInAnyOrder(2);
+            assertThat(ownership.getTotalNumberOfOwnedSubtasks(2, 3, 
List.of(state1, state2)))
+                    .isEqualTo(3);
+        }
+
+        @Test
+        void testTwoRecoveredStatesFromPooling() {
+            KafkaWriterState state1 =
+                    new KafkaWriterState(
+                            PREFIX, 1, 4, 
TransactionOwnership.EXPLICIT_BY_WRITER_STATE, List.of());
+            KafkaWriterState state2 =
+                    new KafkaWriterState(
+                            PREFIX, 3, 4, 
TransactionOwnership.EXPLICIT_BY_WRITER_STATE, List.of());
+
+            assertThatCode(() -> ownership.getOwnedSubtaskIds(2, 3, 
List.of(state1, state2)))
+                    .hasMessageContaining(
+                            "Attempted to switch the transaction naming 
strategy back to INCREMENTING");
+        }
+    }
+
+    @Nested
+    class Explicit {
+        TransactionOwnership ownership = 
TransactionOwnership.EXPLICIT_BY_WRITER_STATE;
+
+        @Test
+        void testNoRecoveredStates() {
+            assertThat(ownership.getOwnedSubtaskIds(2, 4, 
List.of())).containsExactlyInAnyOrder(2);
+            assertThat(ownership.getTotalNumberOfOwnedSubtasks(2, 4, 
List.of())).isEqualTo(4);
+        }
+
+        @Test
+        void testOneRecoveredStateWithUnknownValues() {
+            KafkaWriterState state =
+                    new KafkaWriterState(PREFIX, UNKNOWN, UNKNOWN, ownership, 
List.of());
+
+            assertThatCode(() -> ownership.getOwnedSubtaskIds(2, 4, 
List.of(state)))
+                    .hasMessageContaining("migrate");
+            assertThatCode(() -> ownership.getTotalNumberOfOwnedSubtasks(2, 4, 
List.of(state)))
+                    .hasMessageContaining("migrate");
+        }
+
+        @Test
+        void testOneRecoveredState() {
+            KafkaWriterState state = new KafkaWriterState(PREFIX, 1, 4, 
ownership, List.of());
+
+            assertThat(ownership.getOwnedSubtaskIds(2, 3, List.of(state)))
+                    .containsExactlyInAnyOrder(1);
+            assertThat(ownership.getTotalNumberOfOwnedSubtasks(2, 3, 
List.of(state))).isEqualTo(4);
+        }
+
+        @Test
+        void testNonConsecutive() {
+            KafkaWriterState state = new KafkaWriterState(PREFIX, 1, 2, 
ownership, List.of());
+
+            assertThatCode(() -> ownership.getOwnedSubtaskIds(3, 4, 
List.of(state)))
+                    .hasMessageContaining("State not consecutively assigned");
+            assertThat(ownership.getTotalNumberOfOwnedSubtasks(3, 4, 
List.of(state))).isEqualTo(4);
+        }
+
+        @Test
+        void testNonUniform() {
+            KafkaWriterState state1 = new KafkaWriterState(PREFIX, 1, 4, 
ownership, List.of());
+            KafkaWriterState state2 = new KafkaWriterState(PREFIX, 3, 4, 
ownership, List.of());
+
+            assertThat(ownership.getOwnedSubtaskIds(3, 4, List.of(state1, 
state2)))
+                    .containsExactlyInAnyOrder(1, 3);
+            assertThatCode(
+                            () ->
+                                    ownership.getTotalNumberOfOwnedSubtasks(
+                                            3, 4, List.of(state1, state2)))
+                    .hasMessageContaining("Not uniformly assigned");
+        }
+
+        @Test
+        void testTwoRecoveredStates() {
+            KafkaWriterState state1 = new KafkaWriterState(PREFIX, 1, 4, 
ownership, List.of());
+            KafkaWriterState state2 = new KafkaWriterState(PREFIX, 3, 4, 
ownership, List.of());
+
+            assertThat(ownership.getOwnedSubtaskIds(2, 3, List.of(state1, 
state2)))
+                    .containsExactlyInAnyOrder(1, 3);
+            assertThat(ownership.getTotalNumberOfOwnedSubtasks(2, 3, 
List.of(state1, state2)))
+                    .isEqualTo(4);
+        }
+    }
+}
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
index ae260489..392e5aa5 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
@@ -25,6 +25,7 @@ import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
 import org.apache.flink.connector.kafka.sink.KafkaSink;
 import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
+import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
 import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
 import 
org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
 import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
@@ -76,15 +77,21 @@ public class KafkaSinkExternalContext implements 
DataStreamSinkV2ExternalContext
 
     private final List<ExternalSystemDataReader<String>> readers = new 
ArrayList<>();
 
+    private final TransactionNamingStrategy transactionNamingStrategy;
+
     protected int numSplits = 0;
 
     private List<URL> connectorJarPaths;
 
     protected final AdminClient kafkaAdminClient;
 
-    public KafkaSinkExternalContext(String bootstrapServers, List<URL> 
connectorJarPaths) {
+    public KafkaSinkExternalContext(
+            String bootstrapServers,
+            List<URL> connectorJarPaths,
+            TransactionNamingStrategy transactionNamingStrategy) {
         this.bootstrapServers = bootstrapServers;
         this.connectorJarPaths = connectorJarPaths;
+        this.transactionNamingStrategy = transactionNamingStrategy;
         this.topicName =
                 TOPIC_NAME_PREFIX + "-" + 
ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
         kafkaAdminClient = createAdminClient();
@@ -136,6 +143,7 @@ public class KafkaSinkExternalContext implements 
DataStreamSinkV2ExternalContext
                 
.setDeliveryGuarantee(toDeliveryGuarantee(sinkSettings.getCheckpointingMode()))
                 .setTransactionalIdPrefix("testingFramework")
                 .setKafkaProducerConfig(properties)
+                .setTransactionNamingStrategy(transactionNamingStrategy)
                 .setRecordSerializer(
                         KafkaRecordSerializationSchema.builder()
                                 .setTopic(topicName)
@@ -235,7 +243,7 @@ public class KafkaSinkExternalContext implements 
DataStreamSinkV2ExternalContext
 
     @Override
     public String toString() {
-        return "Single-topic Kafka";
+        return transactionNamingStrategy.toString();
     }
 
     @Override
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java
index b7958548..bdebed03 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.kafka.sink.testutils;
 
+import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
 import org.apache.flink.connector.testframe.external.ExternalContextFactory;
 
 import org.testcontainers.containers.KafkaContainer;
@@ -32,10 +33,15 @@ public class KafkaSinkExternalContextFactory
 
     private final KafkaContainer kafkaContainer;
     private final List<URL> connectorJars;
+    private final TransactionNamingStrategy transactionNamingStrategy;
 
-    public KafkaSinkExternalContextFactory(KafkaContainer kafkaContainer, 
List<URL> connectorJars) {
+    public KafkaSinkExternalContextFactory(
+            KafkaContainer kafkaContainer,
+            List<URL> connectorJars,
+            TransactionNamingStrategy transactionNamingStrategy) {
         this.kafkaContainer = kafkaContainer;
         this.connectorJars = connectorJars;
+        this.transactionNamingStrategy = transactionNamingStrategy;
     }
 
     private String getBootstrapServer() {
@@ -48,6 +54,7 @@ public class KafkaSinkExternalContextFactory
 
     @Override
     public KafkaSinkExternalContext createExternalContext(String testName) {
-        return new KafkaSinkExternalContext(getBootstrapServer(), 
connectorJars);
+        return new KafkaSinkExternalContext(
+                getBootstrapServer(), connectorJars, 
transactionNamingStrategy);
     }
 }


Reply via email to