fapaul commented on a change in pull request #17019:
URL: https://github.com/apache/flink/pull/17019#discussion_r699070362



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
##########
@@ -192,7 +196,17 @@
      * snapshots.
      */
     public boolean isRestored() {
-        return restored;
+        return restoredCheckpointId != null;
+    }
+
+    /**
+     * Returns non-empty if this was created for a restored operator, false 
otherwise. Restored

Review comment:
       Update the comment because it is not optional anymore?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -334,9 +348,9 @@ private void writeRecordsToKafka(
                 drainAllRecordsFromTopic(topic);
         final long recordsCount = expectedRecords.get().get();
         assertEquals(collectedRecords.size(), recordsCount);
-        assertEquals(
+        assertThat(
                 deserializeValues(collectedRecords),
-                LongStream.range(1, recordsCount + 
1).boxed().collect(Collectors.toList()));
+                containsInAnyOrder(LongStream.range(1, recordsCount + 
1).boxed().toArray()));

Review comment:
       order?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -226,11 +236,12 @@ public void 
testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Excep
                 new FailingCheckpointMapper(failed, lastCheckpointedRecord), 
config, "newPrefix");
         final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
                 drainAllRecordsFromTopic(topic);
-        assertEquals(
+        assertThat(
                 deserializeValues(collectedRecords),
-                LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
-                        .boxed()
-                        .collect(Collectors.toList()));
+                containsInAnyOrder(

Review comment:
       Shouldn't the order be guaranteed?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -254,11 +265,12 @@ public void 
testAbortTransactionsAfterScaleInBeforeFirstCheckpoint() throws Exce
                 new FailingCheckpointMapper(failed, lastCheckpointedRecord), 
config, null);
         final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
                 drainAllRecordsFromTopic(topic);
-        assertEquals(
+        assertThat(
                 deserializeValues(collectedRecords),
-                LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
-                        .boxed()
-                        .collect(Collectors.toList()));
+                containsInAnyOrder(

Review comment:
       Same here what about the order?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Aborts lingering transactions on restart.
+ *
+ * <p>Transactions are lingering if they are not tracked anywhere. For 
example, if a job is started
+ * transactions are opened. A restart without checkpoint would not allow Flink 
to abort old
+ * transactions. Since Kafka's transactions are sequential, newly produced 
data does not become
+ * visible for read_committed consumers. However, Kafka has no API for 
querying open transactions,
+ * so they become lingering.
+ *
+ * <p>Flink solves this by assuming consecutive transaction ids. On restart of 
checkpoint C on
+ * subtask S, it will sequentially cancel transaction C+1, C+2, ... of S until 
it finds the first
+ * unused transaction.
+ *
+ * <p>Additionally, to cover for weird downscaling cases without checkpoints, 
it also checks for
+ * transactions of subtask S+P where P is the current parallelism until it 
finds a subtask without
+ * transactions.
+ */
+class TransactionAborter {
+    private final int subtaskId;
+    private final int parallelism;
+    private final Function<String, FlinkKafkaInternalProducer<byte[], byte[]>> 
producerFactory;
+    @Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
+
+    public TransactionAborter(
+            int subtaskId,
+            int parallelism,
+            Function<String, FlinkKafkaInternalProducer<byte[], byte[]>> 
producerFactory) {
+        this.subtaskId = subtaskId;
+        this.parallelism = parallelism;
+        this.producerFactory = producerFactory;
+    }
+
+    void abortLingeringTransactions(List<String> prefixesToAbort, long 
startCheckpointId) {
+        for (String prefix : prefixesToAbort) {
+            abortTransactionsWithPrefix(prefix, startCheckpointId);
+        }
+    }
+
+    /**
+     * Aborts all transactions that have been created by this subtask in a 
previous run.
+     *
+     * <p>It also aborts transactions from subtasks that may have been removed 
because of
+     * downscaling.
+     *
+     * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is 
responsible for cleaning
+     * all subtasks j in [0; X), where j % Y = i. For example, if we downscale 
to 2, then subtask 0
+     * is responsible for all even and subtask 1 for all odd subtasks.
+     */
+    void abortTransactionsWithPrefix(String prefix, long startCheckpointId) {
+        for (int subtaskId = this.subtaskId; ; subtaskId += parallelism) {
+            if (abortTransactionOfSubtask(prefix, startCheckpointId, 
subtaskId) == 0) {
+                // If Flink didn't abort any transaction for current subtask, 
then we assume that no
+                // such subtask existed and no subtask with a higher number as 
well.
+                break;
+            }
+        }
+    }
+
+    /**
+     * Aborts all transactions that have been created by a subtask in a 
previous run after the given
+     * checkpoint id.
+     *
+     * <p>We assume that transaction ids are consecutively used and thus Flink 
can stop aborting as
+     * soon as Flink notices that a particular transaction id was unused.
+     */
+    int abortTransactionOfSubtask(String prefix, long startCheckpointId, int 
subtaskId) {

Review comment:
       why package-private?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrier.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Retries the committables of a {@link CommitterHandler} until all 
committables are eventually
+ * committed.
+ */
+public class CommitRetrier {
+    private final ProcessingTimeService processingTimeService;
+    private final CommitterHandler<?, ?> committerHandler;
+    private final Clock clock;
+    public static final int RETRY_DELAY = 1000;

Review comment:
       Nit: Does it need to be public?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -47,14 +47,23 @@
             
"org.apache.kafka.clients.producer.internals.TransactionManager$State";
     private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME = 
"producerIdAndEpoch";
 
-    private final Properties kafkaProducerConfig;
-    @Nullable private final String transactionalId;
+    @Nullable private String transactionalId;
     private volatile boolean inTransaction;
 
-    public FlinkKafkaInternalProducer(Properties properties) {
-        super(properties);
-        this.kafkaProducerConfig = properties;
-        this.transactionalId = 
properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+    public FlinkKafkaInternalProducer(Properties properties, @Nullable String 
transactionalId) {
+        super(withTransactionId(properties, transactionalId));
+        this.transactionalId = transactionalId;
+    }
+
+    private static Properties withTransactionId(

Review comment:
       `withTransactionalId` to keep it consistent

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Aborts lingering transactions on restart.
+ *
+ * <p>Transactions are lingering if they are not tracked anywhere. For 
example, if a job is started
+ * transactions are opened. A restart without checkpoint would not allow Flink 
to abort old
+ * transactions. Since Kafka's transactions are sequential, newly produced 
data does not become
+ * visible for read_committed consumers. However, Kafka has no API for 
querying open transactions,
+ * so they become lingering.
+ *
+ * <p>Flink solves this by assuming consecutive transaction ids. On restart of 
checkpoint C on
+ * subtask S, it will sequentially cancel transaction C+1, C+2, ... of S until 
it finds the first
+ * unused transaction.
+ *
+ * <p>Additionally, to cover for weird downscaling cases without checkpoints, 
it also checks for
+ * transactions of subtask S+P where P is the current parallelism until it 
finds a subtask without
+ * transactions.
+ */
+class TransactionAborter {
+    private final int subtaskId;
+    private final int parallelism;
+    private final Function<String, FlinkKafkaInternalProducer<byte[], byte[]>> 
producerFactory;
+    @Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
+
+    public TransactionAborter(
+            int subtaskId,
+            int parallelism,
+            Function<String, FlinkKafkaInternalProducer<byte[], byte[]>> 
producerFactory) {
+        this.subtaskId = subtaskId;
+        this.parallelism = parallelism;
+        this.producerFactory = producerFactory;
+    }
+
+    void abortLingeringTransactions(List<String> prefixesToAbort, long 
startCheckpointId) {
+        for (String prefix : prefixesToAbort) {
+            abortTransactionsWithPrefix(prefix, startCheckpointId);
+        }
+    }
+
+    /**
+     * Aborts all transactions that have been created by this subtask in a 
previous run.
+     *
+     * <p>It also aborts transactions from subtasks that may have been removed 
because of
+     * downscaling.
+     *
+     * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is 
responsible for cleaning
+     * all subtasks j in [0; X), where j % Y = i. For example, if we downscale 
to 2, then subtask 0
+     * is responsible for all even and subtask 1 for all odd subtasks.
+     */
+    void abortTransactionsWithPrefix(String prefix, long startCheckpointId) {

Review comment:
       why package-private?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -141,13 +138,15 @@
         } catch (Exception e) {
             throw new FlinkRuntimeException("Cannot initialize schema.", e);
         }
-        this.kafkaWriterState =
-                recoverAndInitializeState(checkNotNull(recoveredStates, 
"recoveredStates"));
         disableMetrics =
                 kafkaProducerConfig.containsKey(KEY_DISABLE_METRICS)
                         && Boolean.parseBoolean(
                                 
kafkaProducerConfig.getProperty(KEY_DISABLE_METRICS));
-        this.currentProducer = createProducer();
+        lastCheckpointId = 
sinkInitContext.getRestoredCheckpointId().orElse(-1) + 1;

Review comment:
       It seems I have seen different starting checkpoint ids. Does it start 
with 0 or 1? 1 feels more natural to me because afaik Flink also has no 
checkpoint 0.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Aborts lingering transactions on restart.
+ *
+ * <p>Transactions are lingering if they are not tracked anywhere. For 
example, if a job is started
+ * transactions are opened. A restart without checkpoint would not allow Flink 
to abort old
+ * transactions. Since Kafka's transactions are sequential, newly produced 
data does not become
+ * visible for read_committed consumers. However, Kafka has no API for 
querying open transactions,
+ * so they become lingering.
+ *
+ * <p>Flink solves this by assuming consecutive transaction ids. On restart of 
checkpoint C on
+ * subtask S, it will sequentially cancel transaction C+1, C+2, ... of S until 
it finds the first
+ * unused transaction.
+ *
+ * <p>Additionally, to cover for weird downscaling cases without checkpoints, 
it also checks for
+ * transactions of subtask S+P where P is the current parallelism until it 
finds a subtask without
+ * transactions.
+ */
+class TransactionAborter {
+    private final int subtaskId;
+    private final int parallelism;
+    private final Function<String, FlinkKafkaInternalProducer<byte[], byte[]>> 
producerFactory;
+    @Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
+
+    public TransactionAborter(
+            int subtaskId,
+            int parallelism,
+            Function<String, FlinkKafkaInternalProducer<byte[], byte[]>> 
producerFactory) {
+        this.subtaskId = subtaskId;
+        this.parallelism = parallelism;
+        this.producerFactory = producerFactory;

Review comment:
       `checkNotNull`

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -37,39 +37,41 @@
  */
 class KafkaCommitter implements Committer<KafkaCommittable> {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaCommitter.class);
+
     private final Properties kafkaProducerConfig;
 
     KafkaCommitter(Properties kafkaProducerConfig) {
         this.kafkaProducerConfig = kafkaProducerConfig;
     }
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaCommitter.class);
-
     @Override
     public List<KafkaCommittable> commit(List<KafkaCommittable> committables) 
throws IOException {
-        committables.forEach(this::commitTransaction);
-        return Collections.emptyList();
+        List<KafkaCommittable> retryableCommittables = new ArrayList<>();
+        for (KafkaCommittable committable : committables) {
+            final String transactionalId = committable.getTransactionalId();
+            LOG.debug("Committing Kafka transaction {}", transactionalId);
+            try (FlinkKafkaInternalProducer<?, ?> producer =
+                         committable.getProducer().orElseGet(() -> 
createProducer(committable))) {
+                producer.commitTransaction();
+            } catch (ProducerFencedException | InvalidTxnStateException e) {
+                // That means we have committed this transaction before.
+                LOG.warn(
+                        "Encountered error {} while recovering transaction {}. 
"
+                                + "Presumably this transaction has been 
already committed before",
+                        e,
+                        committable);
+            } catch (Throwable e) {
+                LOG.info("Cannot commit Kafka transaction, retrying.", e);

Review comment:
       Nit: Should this be a warning?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -78,16 +79,16 @@
     private final Callback deliveryCallback;
     private final AtomicLong pendingRecords = new AtomicLong();
     private final KafkaRecordSerializationSchema.KafkaSinkContext 
kafkaSinkContext;
-    private final List<FlinkKafkaInternalProducer<byte[], byte[]>> producers = 
new ArrayList<>();
     private final Map<String, KafkaMetricMutableWrapper> 
previouslyCreatedMetrics = new HashMap<>();
     private final SinkWriterMetricGroup metricGroup;
     private final Counter numBytesOutCounter;
     private final Sink.ProcessingTimeService timeService;
 
     private transient Metric byteOutMetric;

Review comment:
       Nit: also remove the transient?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -138,15 +144,30 @@
         } catch (Exception e) {
             throw new FlinkRuntimeException("Cannot initialize schema.", e);
         }
-        disableMetrics =
+
+        this.kafkaWriterState = new KafkaWriterState(transactionalIdPrefix);
+        this.disableMetrics =
                 kafkaProducerConfig.containsKey(KEY_DISABLE_METRICS)
                         && Boolean.parseBoolean(
                                 
kafkaProducerConfig.getProperty(KEY_DISABLE_METRICS));
-        lastCheckpointId = 
sinkInitContext.getRestoredCheckpointId().orElse(-1) + 1;
-        abortLingeringTransactions(
-                checkNotNull(recoveredStates, "recoveredStates"), 
lastCheckpointId);
-        this.kafkaWriterState = new KafkaWriterState(transactionalIdPrefix);
-        this.currentProducer = createProducer(lastCheckpointId);
+
+        this.lastCheckpointId = 
sinkInitContext.getRestoredCheckpointId().orElse(-1);
+        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+            abortLingeringTransactions(
+                    checkNotNull(recoveredStates, "recoveredStates"), 
lastCheckpointId + 1);
+        }
+        if (this.deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+            this.currentProducer = getTransactionalProducer(lastCheckpointId + 
1);
+            this.currentProducer.beginTransaction();

Review comment:
       Merge two condition branches?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
         closer.close();
     }
 
-    private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> 
recoveredStates) {
-        final int subtaskId = kafkaSinkContext.getParallelInstanceId();
-        if (recoveredStates.isEmpty()) {
-            final KafkaWriterState state =
-                    new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
-            abortTransactions(getTransactionsToAbort(state, new 
ArrayList<>()));
-            return state;
+    private void abortLingeringTransactions(
+            List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+        List<String> prefixesToAbort = 
Lists.newArrayList(transactionalIdPrefix);
+
+        if (!recoveredStates.isEmpty()) {
+            KafkaWriterState lastState = recoveredStates.get(0);
+            if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {

Review comment:
       Why do we only abort transactions if the prefix has changed?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrier.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Retries the committables of a {@link CommitterHandler} until all 
committables are eventually
+ * committed.
+ */
+public class CommitRetrier {

Review comment:
       Add `@Internal`?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -379,9 +379,9 @@ private static Properties getKafkaClientConfiguration() {
         return standardProps;
     }
 
-    private Consumer<byte[], byte[]> createTestConsumer(String topic) {
+    static Consumer<byte[], byte[]> createTestConsumer(String topic, 
Properties properties) {

Review comment:
       private?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java
##########
@@ -72,7 +73,7 @@ void abortLingeringTransactions(List<String> prefixesToAbort, 
long startCheckpoi
      * all subtasks j in [0; X), where j % Y = i. For example, if we downscale 
to 2, then subtask 0
      * is responsible for all even and subtask 1 for all odd subtasks.
      */
-    void abortTransactionsWithPrefix(String prefix, long startCheckpointId) {
+    private void abortTransactionsWithPrefix(String prefix, long 
startCheckpointId) {

Review comment:
       Nit: this class is introduced and touched on separate commits ;) 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkTestBase.java
##########
@@ -369,7 +369,7 @@ public String toString() {
 
     static StateInitializationContextImpl buildInitCtx(boolean restored) {
         return new StateInitializationContextImpl(
-                restored,
+                restored ? 0L : null,

Review comment:
       Nit: I know this is only test code but shouldn't the id start with `1`?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -71,6 +70,18 @@ private void commitTransaction(KafkaCommittable committable) 
{
         }
     }
 
+    /**
+     * Creates a producer that con commit into the same transaction as the 
upstream producer that
+     * was serialized into {@link KafkaCommitter}.

Review comment:
       KafkaCommitter -> KafkaCommittable ?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -163,80 +162,54 @@ public void write(IN element, Context context) throws 
IOException {
     @Override
     public List<KafkaCommittable> prepareCommit(boolean flush) {
         flushRecords(flush);
-        List<KafkaCommittable> committables = precommit();
-        currentProducer = createProducer();
-        return committables;
+        return precommit();
     }
 
     @Override
     public List<KafkaWriterState> snapshotState(long checkpointId) throws 
IOException {
+        currentProducer = createProducer(checkpointId + 1);
         return ImmutableList.of(kafkaWriterState);
     }
 
     @Override
     public void close() throws Exception {
+
         if (currentProducer.isInTransaction()) {
             currentProducer.abortTransaction();
         }
-        currentProducer.flush();
         closed = true;
         closer.close();
+        checkState(currentProducer.isClosed());
     }
 
-    private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> 
recoveredStates) {
-        final int subtaskId = kafkaSinkContext.getParallelInstanceId();
-        if (recoveredStates.isEmpty()) {
-            final KafkaWriterState state =
-                    new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
-            abortTransactions(getTransactionsToAbort(state, new 
ArrayList<>()));
-            return state;
-        }
-        final Map<Integer, KafkaWriterState> taskOffsetMapping =
-                recoveredStates.stream()
-                        .collect(
-                                Collectors.toMap(
-                                        KafkaWriterState::getSubtaskId, 
Function.identity()));
-        checkState(
-                taskOffsetMapping.containsKey(subtaskId),
-                "Internal error: It is expected that state from previous 
executions is distributed to the same subtask id.");
-        final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId);
-        taskOffsetMapping.remove(subtaskId);
-        abortTransactions(
-                getTransactionsToAbort(lastState, new 
ArrayList<>(taskOffsetMapping.values())));
-        if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
-            LOG.warn(
-                    "Transactional id prefix from previous execution {} has 
changed to {}.",
-                    lastState.getTransactionalIdPrefix(),
-                    transactionalIdPrefix);
-            return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
+    private void abortLingeringTransactions(
+            List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+        List<String> prefixesToAbort = 
Lists.newArrayList(transactionalIdPrefix);
+
+        if (!recoveredStates.isEmpty()) {
+            KafkaWriterState lastState = recoveredStates.get(0);
+            if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
+                prefixesToAbort.add(lastState.getTransactionalIdPrefix());
+                LOG.warn(
+                        "Transactional id prefix from previous execution {} 
has changed to {}.",
+                        lastState.getTransactionalIdPrefix(),
+                        transactionalIdPrefix);
+            }
         }
-        return new KafkaWriterState(
-                transactionalIdPrefix, subtaskId, 
lastState.getTransactionalIdOffset());
-    }
 
-    private void abortTransactions(List<String> transactionsToAbort) {
-        transactionsToAbort.forEach(
-                transaction -> {
-                    // don't mess with the original configuration or any other
-                    // properties of the
-                    // original object
-                    // -> create an internal kafka producer on our own and do 
not rely
-                    // on
-                    //    initTransactionalProducer().
-                    LOG.info("Aborting Kafka transaction {}.", transaction);
-                    FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer = 
null;
-                    try {
-                        kafkaProducer =
-                                new 
FlinkKafkaInternalProducer<>(kafkaProducerConfig, transaction);
-                        // it suffices to call initTransactions - this will 
abort any
-                        // lingering transactions
-                        kafkaProducer.initTransactions();
-                    } finally {
-                        if (kafkaProducer != null) {
-                            kafkaProducer.close(Duration.ofSeconds(0));
-                        }
-                    }
-                });
+        TransactionAborter transactionAborter = null;

Review comment:
       Nit: Make the `TransactionAborter` `AutoClosable` and use a try-resource 
block.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -71,6 +70,18 @@ private void commitTransaction(KafkaCommittable committable) 
{
         }
     }
 
+    /**
+     * Creates a producer that con commit into the same transaction as the 
upstream producer that

Review comment:
       `con` -> `can`

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -406,8 +406,14 @@ private void deleteTestTopic(String topic)
     }
 
     private List<ConsumerRecord<byte[], byte[]>> 
drainAllRecordsFromTopic(String topic) {
+        Properties properties = getKafkaClientConfiguration();
+        return drainAllRecordsFromTopic(topic, properties);
+    }
+
+    static List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(

Review comment:
       private?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.function.Function;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Aborts lingering transactions on restart.
+ *
+ * <p>Transactions are lingering if they are not tracked anywhere. For 
example, if a job is started
+ * transactions are opened. A restart without checkpoint would not allow Flink 
to abort old
+ * transactions. Since Kafka's transactions are sequential, newly produced 
data does not become
+ * visible for read_committed consumers. However, Kafka has no API for 
querying open transactions,
+ * so they become lingering.
+ *
+ * <p>Flink solves this by assuming consecutive transaction ids. On restart of 
checkpoint C on
+ * subtask S, it will sequentially cancel transaction C+1, C+2, ... of S until 
it finds the first
+ * unused transaction.
+ *
+ * <p>Additionally, to cover for weird downscaling cases without checkpoints, 
it also checks for
+ * transactions of subtask S+P where P is the current parallelism until it 
finds a subtask without
+ * transactions.
+ */
+class TransactionAborter {
+    private final int subtaskId;
+    private final int parallelism;
+    private final Function<String, FlinkKafkaInternalProducer<byte[], byte[]>> 
producerFactory;
+    @Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
+
+    public TransactionAborter(
+            int subtaskId,
+            int parallelism,
+            Function<String, FlinkKafkaInternalProducer<byte[], byte[]>> 
producerFactory) {
+        this.subtaskId = subtaskId;
+        this.parallelism = parallelism;
+        this.producerFactory = checkNotNull(producerFactory);
+    }
+
+    void abortLingeringTransactions(List<String> prefixesToAbort, long 
startCheckpointId) {
+        for (String prefix : prefixesToAbort) {
+            abortTransactionsWithPrefix(prefix, startCheckpointId);
+        }
+    }
+
+    /**
+     * Aborts all transactions that have been created by this subtask in a 
previous run.
+     *
+     * <p>It also aborts transactions from subtasks that may have been removed 
because of
+     * downscaling.
+     *
+     * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is 
responsible for cleaning
+     * all subtasks j in [0; X), where j % Y = i. For example, if we downscale 
to 2, then subtask 0
+     * is responsible for all even and subtask 1 for all odd subtasks.
+     */
+    void abortTransactionsWithPrefix(String prefix, long startCheckpointId) {
+        for (int subtaskId = this.subtaskId; ; subtaskId += parallelism) {
+            if (abortTransactionOfSubtask(prefix, startCheckpointId, 
subtaskId) == 0) {
+                // If Flink didn't abort any transaction for current subtask, 
then we assume that no
+                // such subtask existed and no subtask with a higher number as 
well.
+                break;
+            }
+        }
+    }
+
+    /**
+     * Aborts all transactions that have been created by a subtask in a 
previous run after the given
+     * checkpoint id.
+     *
+     * <p>We assume that transaction ids are consecutively used and thus Flink 
can stop aborting as
+     * soon as Flink notices that a particular transaction id was unused.
+     */
+    int abortTransactionOfSubtask(String prefix, long startCheckpointId, int 
subtaskId) {
+        int numTransactionAborted = 0;
+        for (long checkpointId = startCheckpointId; ; checkpointId++, 
numTransactionAborted++) {
+            // initTransactions fences all old transactions with the same id 
by bumping the epoch
+            String transactionalId =
+                    TransactionalIdFactory.buildTransactionalId(prefix, 
subtaskId, checkpointId);
+            if (producer == null) {
+                producer = producerFactory.apply(transactionalId);
+            } else {
+                producer.initTransactionId(transactionalId);
+            }
+            producer.flush();
+            // An epoch of 0 indicates that the id was unused before
+            if (producer.getEpoch() == 0) {
+                // Note that the check works beyond transaction log timeouts 
and just depends on the
+                // retention of the transaction topic (typically 7d). Any 
transaction that is not in
+                // the that topic anymore is also not lingering (i.e., it will 
not block downstream
+                // from reading)
+                // This method will only cease to work if transaction log 
timeout = topic retention
+                // and a user didn't restart the application for that period 
of time. Then the first
+                // transactions would vanish from the topic while later 
transactions are still
+                // lingering until they are cleaned up by Kafka. Then the user 
has to wait until the
+                // other transactions are timed out (which shouldn't take too 
long).
+                break;
+            }
+        }
+        return numTransactionAborted;
+    }
+
+    public void close() {

Review comment:
       Please implement some sort of closable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to