fapaul commented on a change in pull request #17019:
URL: https://github.com/apache/flink/pull/17019#discussion_r697362882



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
##########
@@ -33,11 +35,16 @@
  */
 public interface ManagedInitializationContext {
 
+    /** Returns true, if state was restored from the snapshot of a previous 
execution. */
+    default boolean isRestored() {
+        return getRestoredCheckpointId() != null;

Review comment:
       Shouldn't this be `getRestoredCheckpointId().isPresent()`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
##########
@@ -31,9 +31,17 @@
 public interface StreamOperatorStateContext {
 
     /**
-     * Returns true, the states provided by this context are restored from a 
checkpoint/savepoint.
+     * Returns true if the states provided by this context are restored from a 
checkpoint/savepoint.
      */
-    boolean isRestored();
+    default boolean isRestored() {

Review comment:
       Can we pull out a common interface for these two methods and use for 
this class and `ManagedInitializationContext`

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/sink/SinkWriter.java
##########
@@ -53,26 +55,43 @@
      * <p>This method is intended for advanced sinks that propagate watermarks.
      *
      * @param watermark The watermark.
+     *
      * @throws IOException if fail to add a watermark.
      */
-    default void writeWatermark(Watermark watermark) throws IOException, 
InterruptedException {}
+    default void writeWatermark(Watermark watermark) throws IOException, 
InterruptedException {
+    }
 
     /**
      * Prepare for a commit.
      *
      * <p>This will be called before we checkpoint the Writer's state in 
Streaming execution mode.
      *
      * @param flush Whether flushing the un-staged data or not
+     *
      * @return The data is ready to commit.
+     *
      * @throws IOException if fail to prepare for a commit.
      */
     List<CommT> prepareCommit(boolean flush) throws IOException, 
InterruptedException;
 
     /**
      * @return The writer's state.
+     *
+     * @throws IOException if fail to snapshot writer's state.
+     * @deprecated implement {@link #snapshotState(long)}
+     */
+    default List<WriterStateT> snapshotState() throws IOException {
+        return Collections.emptyList();
+    }
+
+    /**
+     * @return The writer's state.
+     *
      * @throws IOException if fail to snapshot writer's state.
      */
-    List<WriterStateT> snapshotState() throws IOException;
+    default List<WriterStateT> snapshotState(long checkpointId) throws 
IOException {

Review comment:
       Should we implement a test to ensure the `checkpointId` is correctly 
passed to the writer?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
##########
@@ -91,12 +106,14 @@ public void snapshotState(StateSnapshotContext context) 
throws Exception {
     @Override
     public void endInput() throws Exception {
         emitCommittables(committerHandler.endOfInput());
+        retryer.retryIndefinitely();
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
         
emitCommittables(committerHandler.notifyCheckpointCompleted(checkpointId));
+        retryer.retryWithDelay();

Review comment:
       Not sure I like this approach, this means on every checkpoint we enqueue 
a trigger in the mailbox. In the case of a very small checkpoint interval and 
long commit durations, this overhead is significant.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetryer.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import java.io.IOException;
+
+/**
+ * Retries the committables of a {@link CommitterHandler} until all 
committables are eventually
+ * committed.
+ */
+public class CommitRetryer<InputT, OutputT> {
+    private final ProcessingTimeService processingTimeService;
+    private final CommitterHandler<InputT, OutputT> committerHandler;
+    public static final int RETRY_DELAY = 1000;
+
+    public CommitRetryer(
+            ProcessingTimeService processingTimeService,
+            CommitterHandler<InputT, OutputT> committerHandler) {
+        this.processingTimeService = processingTimeService;
+        this.committerHandler = committerHandler;
+    }
+
+    public void retryWithDelay() {
+        if (committerHandler.needsRetry()) {
+            processingTimeService.registerTimer(
+                    RETRY_DELAY,
+                    ts -> {
+                        if (retry(1)) {
+                            retryWithDelay();
+                        }
+                    });
+        }
+    }
+
+    public void retryIndefinitely() throws IOException, InterruptedException {
+        retry(Long.MAX_VALUE);
+    }
+
+    private boolean retry(long tries) throws IOException, InterruptedException 
{
+        for (long i = 0; i < tries; i++) {
+            if (!committerHandler.needsRetry()) {
+                return false;
+            }
+            committerHandler.retry();
+        }
+        return true;

Review comment:
       Nit: return `!committerHandler.needsRetry()`

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterHandler.java
##########
@@ -60,27 +53,21 @@ public GlobalStreamingCommitterHandler(
         super(committableSerializer);
         this.globalCommitter = checkNotNull(globalCommitter);
 
-        this.recoveredGlobalCommittables = new ArrayList<>();
         this.endOfInput = false;
     }
 
     @Override
-    void recoveredCommittables(List<GlobalCommT> committables) throws 
IOException {
-        final List<GlobalCommT> recovered =
-                
globalCommitter.filterRecoveredCommittables(checkNotNull(committables));
-        recoveredGlobalCommittables.addAll(recovered);
+    protected void recoveredCommittables(List<GlobalCommT> committables) 
throws IOException {
+        super.recoveredCommittables(
+                
globalCommitter.filterRecoveredCommittables(checkNotNull(committables)));
     }
 
     @Override
     List<GlobalCommT> prepareCommit(List<CommT> input) throws IOException {
-        checkNotNull(input);

Review comment:
       Nit: this check was removed

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -174,6 +174,9 @@ public void write(IN element, Context context) throws 
IOException {
 
     @Override
     public void close() throws Exception {
+        if (currentProducer.isInTransaction()) {

Review comment:
       Can we test this behavior?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -37,39 +36,41 @@
  */
 class KafkaCommitter implements Committer<KafkaCommittable> {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaCommitter.class);
+
     private final Properties kafkaProducerConfig;
 
     KafkaCommitter(Properties kafkaProducerConfig) {
         this.kafkaProducerConfig = kafkaProducerConfig;
     }
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaCommitter.class);
-
     @Override
     public List<KafkaCommittable> commit(List<KafkaCommittable> committables) 
throws IOException {
-        committables.forEach(this::commitTransaction);
-        return Collections.emptyList();
+        List<KafkaCommittable> retryableCommittables = new ArrayList<>();
+        for (KafkaCommittable committable : committables) {
+            final String transactionalId = committable.getTransactionalId();
+            LOG.debug("Committing Kafka transaction {}", transactionalId);
+            try (FlinkKafkaInternalProducer<?, ?> producer =
+                    committable.getProducer().orElseGet(() -> 
createProducer(committable))) {
+                producer.commitTransaction();
+            } catch (ProducerFencedException e) {

Review comment:
       Do we also want to retry the `InvalidTxnStateException`? If the 
transaction is already in a different state we can probably also abandon it.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -164,13 +163,12 @@ public void write(IN element, Context context) throws 
IOException {
     @Override
     public List<KafkaCommittable> prepareCommit(boolean flush) {
         flushRecords(flush);
-        List<KafkaCommittable> committables = precommit();
-        currentProducer = createProducer();
-        return committables;
+        return precommit();
     }
 
     @Override
-    public List<KafkaWriterState> snapshotState() throws IOException {
+    public List<KafkaWriterState> snapshotState(long checkpointId) throws 
IOException {
+        currentProducer = createProducer(checkpointId);

Review comment:
       It is unfortunate that we have to create the new producer in 
snapshotState. Although it is probably not possible that `SinkWriter#write` is 
invoked between `snapshotState` and `preCommit` but it leaves room for silent 
errors which are hard to detect in the future.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
         closer.close();
     }
 
-    private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> 
recoveredStates) {
-        final int subtaskId = kafkaSinkContext.getParallelInstanceId();
-        if (recoveredStates.isEmpty()) {
-            final KafkaWriterState state =
-                    new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
-            abortTransactions(getTransactionsToAbort(state, new 
ArrayList<>()));
-            return state;
+    private void abortLingeringTransactions(
+            List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+        List<String> prefixesToAbort = 
Lists.newArrayList(transactionalIdPrefix);
+
+        if (!recoveredStates.isEmpty()) {
+            KafkaWriterState lastState = recoveredStates.get(0);
+            if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {

Review comment:
       Shouldn't the condition be inverted. If the transactional id prefix has 
changed we need to abort something.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
         closer.close();
     }
 
-    private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> 
recoveredStates) {
-        final int subtaskId = kafkaSinkContext.getParallelInstanceId();
-        if (recoveredStates.isEmpty()) {
-            final KafkaWriterState state =
-                    new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
-            abortTransactions(getTransactionsToAbort(state, new 
ArrayList<>()));
-            return state;
+    private void abortLingeringTransactions(
+            List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+        List<String> prefixesToAbort = 
Lists.newArrayList(transactionalIdPrefix);
+
+        if (!recoveredStates.isEmpty()) {
+            KafkaWriterState lastState = recoveredStates.get(0);
+            if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
+                prefixesToAbort.add(lastState.getTransactionalIdPrefix());
+                LOG.warn(
+                        "Transactional id prefix from previous execution {} 
has changed to {}.",
+                        lastState.getTransactionalIdPrefix(),
+                        transactionalIdPrefix);
+            }
         }
-        final Map<Integer, KafkaWriterState> taskOffsetMapping =
-                recoveredStates.stream()
-                        .collect(
-                                Collectors.toMap(
-                                        KafkaWriterState::getSubtaskId, 
Function.identity()));
-        checkState(
-                taskOffsetMapping.containsKey(subtaskId),
-                "Internal error: It is expected that state from previous 
executions is distributed to the same subtask id.");
-        final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId);
-        taskOffsetMapping.remove(subtaskId);
-        abortTransactions(
-                getTransactionsToAbort(lastState, new 
ArrayList<>(taskOffsetMapping.values())));
-        if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
-            LOG.warn(
-                    "Transactional id prefix from previous execution {} has 
changed to {}.",
-                    lastState.getTransactionalIdPrefix(),
-                    transactionalIdPrefix);
-            return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
+
+        final Properties properties = new Properties();
+        properties.putAll(kafkaProducerConfig);
+        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dummy");
+        try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
+                new FlinkKafkaInternalProducer<>(properties)) {
+            for (String prefix : prefixesToAbort) {
+                abortTransactionsWithPrefix(producer, prefix, 
startCheckpointId);
+            }
         }
-        return new KafkaWriterState(
-                transactionalIdPrefix, subtaskId, 
lastState.getTransactionalIdOffset());
     }
 
-    private void abortTransactions(List<String> transactionsToAbort) {
-        transactionsToAbort.forEach(
-                transaction -> {
-                    // don't mess with the original configuration or any other
-                    // properties of the
-                    // original object
-                    // -> create an internal kafka producer on our own and do 
not rely
-                    // on
-                    //    initTransactionalProducer().
-                    final Properties myConfig = new Properties();
-                    myConfig.putAll(kafkaProducerConfig);
-                    myConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transaction);
-                    LOG.info("Aborting Kafka transaction {}.", transaction);
-                    FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer = 
null;
-                    try {
-                        kafkaProducer = new 
FlinkKafkaInternalProducer<>(myConfig);
-                        // it suffices to call initTransactions - this will 
abort any
-                        // lingering transactions
-                        kafkaProducer.initTransactions();
-                    } finally {
-                        if (kafkaProducer != null) {
-                            kafkaProducer.close(Duration.ofSeconds(0));
-                        }
-                    }
-                });
+    /**
+     * Aborts all transactions that have been created by this subtask in a 
previous run.
+     *
+     * <p>It also aborts transactions from subtasks that may have been removed 
because of
+     * downscaling.
+     *
+     * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is 
responsible for cleaning
+     * all subtasks j in [0; X), where j % Y = i. For example, if we downscale 
to 2, then subtask 0
+     * is responsible for all even and subtask 1 for all odd subtasks.
+     */
+    private void abortTransactionsWithPrefix(
+            FlinkKafkaInternalProducer<byte[], byte[]> producer,
+            String prefix,
+            long startCheckpointId) {
+        final int p = kafkaSinkContext.getNumberOfParallelInstances();
+        for (int subtaskId = kafkaSinkContext.getParallelInstanceId(); ; 
subtaskId += p) {
+            if (abortTransactionOfSubtask(producer, prefix, startCheckpointId, 
subtaskId) == 0) {
+                // If Flink didn't abort any transaction for current subtask, 
then we assume that no
+                // such subtask existed and no subtask with a higher number as 
well.
+                break;
+            }
+        }
+    }
+
+    /**
+     * Aborts all transactions that have been created by a subtask in a 
previous run after the given
+     * checkpoint id.
+     *
+     * <p>We assume that transaction ids are consecutively used and thus Flink 
can stop aborting as
+     * soon as Flink notices that a particular transaction id was unused.
+     */
+    private int abortTransactionOfSubtask(

Review comment:
       WDYT about moving the aborting logic to a separate class to make it 
easier to test? 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
##########
@@ -121,10 +127,12 @@
     }
 
     @Override
-    public void open() throws Exception {
-        super.open();
-
-        this.currentWatermark = Long.MIN_VALUE;
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<byte[]>> output) {
+        super.setup(containingTask, config, output);
+        retryer = new CommitRetryer<>(processingTimeService, committerHandler);

Review comment:
       Nit: `retryer` -> `commitRetrier` to make the context clear

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -47,6 +48,7 @@
 
     private final Properties kafkaProducerConfig;
     @Nullable private final String transactionalId;
+    private boolean inTransaction;

Review comment:
       Do we require this variable to be thread-safe? I wonder what happens if 
the committer calls `commitTransaction` while the writer checks the transaction 
state.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractCommitterHandler.java
##########
@@ -19,18 +19,58 @@
 
 import org.apache.flink.util.function.SupplierWithException;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
 
-abstract class AbstractCommitterHandler<InputT, OutputT>
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+abstract class AbstractCommitterHandler<InputT, OutputT, RecoverT>
         implements CommitterHandler<InputT, OutputT> {
 
     /** Record all the committables until commit. */
     private final Deque<InputT> committables = new ArrayDeque<>();
 
+    /** The committables that need to be committed again after recovering from 
a failover. */
+    private final List<RecoverT> recoveredCommittables = new ArrayList<>();

Review comment:
       I see the risk that this list might increase constantly and cause an 
uprising OOM error. If a lot of committables fail the periodic commit can be 
too slow.
   I think we either need to guard somehow against and warn the user or in 
addition commit the retried committables also as part of the checkpoint.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetryer.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import java.io.IOException;
+
+/**
+ * Retries the committables of a {@link CommitterHandler} until all 
committables are eventually
+ * committed.
+ */
+public class CommitRetryer<InputT, OutputT> {

Review comment:
       `CommitRetrier` ;) 

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -71,6 +70,18 @@ private void commitTransaction(KafkaCommittable committable) 
{
         }
     }
 
+    /**
+     * Creates a producer that con commit into the same transaction as the 
upstream producer that
+     * was serialized into {@link KafkaCommitter}.
+     */
+    private FlinkKafkaInternalProducer<?, ?> createProducer(KafkaCommittable 
committable) {
+        FlinkKafkaInternalProducer<Object, Object> producer =

Review comment:
       Nit: use `?, ?` instead of `Object, Object`

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandlerTest.java
##########
@@ -44,16 +45,20 @@ public void throwExceptionWithoutCommitter() throws 
Exception {
         testHarness.initializeEmptyState();
     }
 
-    @Test(expected = UnsupportedOperationException.class)
-    public void doNotSupportRetry() throws Exception {
+    @Test
+    public void supportRetry() throws Exception {

Review comment:
       How do the tests work now that the retry is done by the timerservice?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -36,25 +34,28 @@
 import java.time.Duration;
 import java.util.Properties;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A {@link KafkaProducer} that exposes private fields to allow resume 
producing from a given state.
  */
-class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
+public class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {

Review comment:
       Mark as `@Internal` if it needs to be public

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -86,6 +87,11 @@ public boolean isInTransaction() {
         return inTransaction;
     }
 
+    @Override
+    public void close() {
+        super.close(Duration.ZERO);
+    }

Review comment:
       Hmm, I did want to implement this because it might cause problems for 
the `KafkaCommitter` because closing with ZERO does **not** ensure all pending 
messages for the producer are emitted.
   For the committer, this means if the network connection is slow it can never 
commit because we do not call `flush()` explicitly and rather use the close 
mechanism to do so. Before this change, it waited indefinitely.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
         closer.close();
     }
 
-    private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> 
recoveredStates) {
-        final int subtaskId = kafkaSinkContext.getParallelInstanceId();
-        if (recoveredStates.isEmpty()) {
-            final KafkaWriterState state =
-                    new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
-            abortTransactions(getTransactionsToAbort(state, new 
ArrayList<>()));
-            return state;
+    private void abortLingeringTransactions(
+            List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+        List<String> prefixesToAbort = 
Lists.newArrayList(transactionalIdPrefix);
+
+        if (!recoveredStates.isEmpty()) {
+            KafkaWriterState lastState = recoveredStates.get(0);
+            if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
+                prefixesToAbort.add(lastState.getTransactionalIdPrefix());
+                LOG.warn(
+                        "Transactional id prefix from previous execution {} 
has changed to {}.",
+                        lastState.getTransactionalIdPrefix(),
+                        transactionalIdPrefix);
+            }
         }
-        final Map<Integer, KafkaWriterState> taskOffsetMapping =
-                recoveredStates.stream()
-                        .collect(
-                                Collectors.toMap(
-                                        KafkaWriterState::getSubtaskId, 
Function.identity()));
-        checkState(
-                taskOffsetMapping.containsKey(subtaskId),
-                "Internal error: It is expected that state from previous 
executions is distributed to the same subtask id.");
-        final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId);
-        taskOffsetMapping.remove(subtaskId);
-        abortTransactions(
-                getTransactionsToAbort(lastState, new 
ArrayList<>(taskOffsetMapping.values())));
-        if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
-            LOG.warn(
-                    "Transactional id prefix from previous execution {} has 
changed to {}.",
-                    lastState.getTransactionalIdPrefix(),
-                    transactionalIdPrefix);
-            return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
+
+        final Properties properties = new Properties();
+        properties.putAll(kafkaProducerConfig);
+        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dummy");

Review comment:
       Is this correct?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -144,6 +145,10 @@
         this.kafkaWriterState =
                 recoverAndInitializeState(checkNotNull(recoveredStates, 
"recoveredStates"));
         this.currentProducer = createProducer();
+        disableMetrics =

Review comment:
       👍 

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
         closer.close();
     }
 
-    private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> 
recoveredStates) {
-        final int subtaskId = kafkaSinkContext.getParallelInstanceId();
-        if (recoveredStates.isEmpty()) {
-            final KafkaWriterState state =
-                    new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
-            abortTransactions(getTransactionsToAbort(state, new 
ArrayList<>()));
-            return state;
+    private void abortLingeringTransactions(
+            List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+        List<String> prefixesToAbort = 
Lists.newArrayList(transactionalIdPrefix);
+
+        if (!recoveredStates.isEmpty()) {
+            KafkaWriterState lastState = recoveredStates.get(0);
+            if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
+                prefixesToAbort.add(lastState.getTransactionalIdPrefix());
+                LOG.warn(
+                        "Transactional id prefix from previous execution {} 
has changed to {}.",
+                        lastState.getTransactionalIdPrefix(),
+                        transactionalIdPrefix);
+            }
         }
-        final Map<Integer, KafkaWriterState> taskOffsetMapping =
-                recoveredStates.stream()
-                        .collect(
-                                Collectors.toMap(
-                                        KafkaWriterState::getSubtaskId, 
Function.identity()));
-        checkState(
-                taskOffsetMapping.containsKey(subtaskId),
-                "Internal error: It is expected that state from previous 
executions is distributed to the same subtask id.");
-        final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId);
-        taskOffsetMapping.remove(subtaskId);
-        abortTransactions(
-                getTransactionsToAbort(lastState, new 
ArrayList<>(taskOffsetMapping.values())));
-        if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
-            LOG.warn(
-                    "Transactional id prefix from previous execution {} has 
changed to {}.",
-                    lastState.getTransactionalIdPrefix(),
-                    transactionalIdPrefix);
-            return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
+
+        final Properties properties = new Properties();
+        properties.putAll(kafkaProducerConfig);
+        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dummy");
+        try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
+                new FlinkKafkaInternalProducer<>(properties)) {
+            for (String prefix : prefixesToAbort) {
+                abortTransactionsWithPrefix(producer, prefix, 
startCheckpointId);
+            }
         }
-        return new KafkaWriterState(
-                transactionalIdPrefix, subtaskId, 
lastState.getTransactionalIdOffset());
     }
 
-    private void abortTransactions(List<String> transactionsToAbort) {
-        transactionsToAbort.forEach(
-                transaction -> {
-                    // don't mess with the original configuration or any other
-                    // properties of the
-                    // original object
-                    // -> create an internal kafka producer on our own and do 
not rely
-                    // on
-                    //    initTransactionalProducer().
-                    final Properties myConfig = new Properties();
-                    myConfig.putAll(kafkaProducerConfig);
-                    myConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transaction);
-                    LOG.info("Aborting Kafka transaction {}.", transaction);
-                    FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer = 
null;
-                    try {
-                        kafkaProducer = new 
FlinkKafkaInternalProducer<>(myConfig);
-                        // it suffices to call initTransactions - this will 
abort any
-                        // lingering transactions
-                        kafkaProducer.initTransactions();
-                    } finally {
-                        if (kafkaProducer != null) {
-                            kafkaProducer.close(Duration.ofSeconds(0));
-                        }
-                    }
-                });
+    /**
+     * Aborts all transactions that have been created by this subtask in a 
previous run.
+     *
+     * <p>It also aborts transactions from subtasks that may have been removed 
because of
+     * downscaling.
+     *
+     * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is 
responsible for cleaning
+     * all subtasks j in [0; X), where j % Y = i. For example, if we downscale 
to 2, then subtask 0
+     * is responsible for all even and subtask 1 for all odd subtasks.
+     */
+    private void abortTransactionsWithPrefix(

Review comment:
       The word `prefix` seems to be a bit overloaded because we already have 
the `transactionalIdPrefix` 

##########
File path: flink-connectors/flink-connector-kafka/pom.xml
##########
@@ -124,6 +124,12 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>junit-jupiter</artifactId>
+                       <scope>test</scope>
+               </dependency>
+

Review comment:
       This change probably does not belong here?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetryer.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import java.io.IOException;
+
+/**
+ * Retries the committables of a {@link CommitterHandler} until all 
committables are eventually
+ * committed.
+ */
+public class CommitRetryer<InputT, OutputT> {

Review comment:
       The generic seems unnecessary you can pass `CommitterHandler<?, ?>` to 
the constructor.

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java
##########
@@ -17,7 +17,8 @@
 
 package org.apache.flink.connector.kafka.sink;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import 
org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionRecord;

Review comment:
       Nit: Do we need a test for a test class? 

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/TransactionTest.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source;
+
+import org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer;
+

Review comment:
       Should this class be part of the final PR?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -327,41 +346,43 @@ private void flushRecords(boolean finalFlush) {
      * For each checkpoint we create new {@link FlinkKafkaInternalProducer} so 
that new transactions
      * will not clash with transactions created during previous checkpoints 
({@code
      * producer.initTransactions()} assures that we obtain new producerId and 
epoch counters).
+     *
+     * <p>Ensures that all transaction ids in between lastCheckpointId and 
checkpointId are
+     * initialized.
      */
-    private FlinkKafkaInternalProducer<byte[], byte[]> 
createTransactionalProducer() {
-        final long transactionalIdOffset = 
kafkaWriterState.getTransactionalIdOffset() + 1;
+    private FlinkKafkaInternalProducer<byte[], byte[]> 
createTransactionalProducer(
+            long checkpointId) {
+        checkState(
+                checkpointId > lastCheckpointId,
+                "Expected %s > %s",
+                checkpointId,
+                lastCheckpointId);
         final Properties copiedProducerConfig = new Properties();
         copiedProducerConfig.putAll(kafkaProducerConfig);
-        initTransactionalProducerConfig(
-                copiedProducerConfig,
-                transactionalIdOffset,
-                transactionalIdPrefix,
-                kafkaSinkContext.getParallelInstanceId());
+        copiedProducerConfig.put(
+                ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+                TransactionalIdFactory.buildTransactionalId(
+                        transactionalIdPrefix,
+                        kafkaSinkContext.getParallelInstanceId(),
+                        lastCheckpointId + 1));
         final FlinkKafkaInternalProducer<byte[], byte[]> producer =
                 new FlinkKafkaInternalProducer<>(copiedProducerConfig);
         producer.initTransactions();
-        kafkaWriterState =
-                new KafkaWriterState(
-                        transactionalIdPrefix,
-                        kafkaSinkContext.getParallelInstanceId(),
-                        transactionalIdOffset);
-        LOG.info(
-                "Created new transactional producer {}",
-                
copiedProducerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
+        // in case checkpoints have been aborted, Flink would create 
non-consecutive transaction ids
+        // this loop ensures that all gaps are filled with initialized (empty) 
transactions
+        for (long id = lastCheckpointId + 2; id <= checkpointId; id++) {
+            producer.setTransactionalId(
+                    TransactionalIdFactory.buildTransactionalId(
+                            transactionalIdPrefix,
+                            kafkaSinkContext.getParallelInstanceId(),
+                            lastCheckpointId + 1));
+            producer.initTransactions();
+        }

Review comment:
       I do not understand why we have to do this every time we create a new 
producer. Can you give a concrete example?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to