AHeise commented on a change in pull request #17019:
URL: https://github.com/apache/flink/pull/17019#discussion_r698508026



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandlerTest.java
##########
@@ -44,16 +45,20 @@ public void throwExceptionWithoutCommitter() throws 
Exception {
         testHarness.initializeEmptyState();
     }
 
-    @Test(expected = UnsupportedOperationException.class)
-    public void doNotSupportRetry() throws Exception {
+    @Test
+    public void supportRetry() throws Exception {

Review comment:
       For global committer timer is not really used. For streaming committer 
handler, we can use `TestProcessingTimeService` and manually advance the time 
which triggers all respective callbacks. I have also added a more fine-grain 
unit test with `ManualClock`.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -47,6 +48,7 @@
 
     private final Properties kafkaProducerConfig;
     @Nullable private final String transactionalId;
+    private boolean inTransaction;

Review comment:
       Good point. I was assuming that all accesses use the task thread, which 
probably holds until we reach cancellation (where abortion really matters). So 
I probably switch to volatile here - it's not on the hot path and gives safety.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -174,6 +174,9 @@ public void write(IN element, Context context) throws 
IOException {
 
     @Override
     public void close() throws Exception {
+        if (currentProducer.isInTransaction()) {

Review comment:
       I added a test. 👍 

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -86,6 +87,11 @@ public boolean isInTransaction() {
         return inTransaction;
     }
 
+    @Override
+    public void close() {
+        super.close(Duration.ZERO);
+    }

Review comment:
       I added a `flush` in `KafkaWriter#close` to be safe. All other paths are 
pretty much secured through `commitTransaction`.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -36,25 +34,28 @@
 import java.time.Duration;
 import java.util.Properties;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A {@link KafkaProducer} that exposes private fields to allow resume 
producing from a given state.
  */
-class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
+public class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {

Review comment:
       I think this is an oversight on my end.

##########
File path: flink-connectors/flink-connector-kafka/pom.xml
##########
@@ -124,6 +124,12 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>junit-jupiter</artifactId>
+                       <scope>test</scope>
+               </dependency>
+

Review comment:
       You are right that belongs in the commit with the JUnit5 test container 
test.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -164,13 +163,12 @@ public void write(IN element, Context context) throws 
IOException {
     @Override
     public List<KafkaCommittable> prepareCommit(boolean flush) {
         flushRecords(flush);
-        List<KafkaCommittable> committables = precommit();
-        currentProducer = createProducer();
-        return committables;
+        return precommit();
     }
 
     @Override
-    public List<KafkaWriterState> snapshotState() throws IOException {
+    public List<KafkaWriterState> snapshotState(long checkpointId) throws 
IOException {
+        currentProducer = createProducer(checkpointId);

Review comment:
       I now explicitly nulled the `currentProducer`. That still leaves room in 
the future for errors but at least not correctness issues.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
         closer.close();
     }
 
-    private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> 
recoveredStates) {
-        final int subtaskId = kafkaSinkContext.getParallelInstanceId();
-        if (recoveredStates.isEmpty()) {
-            final KafkaWriterState state =
-                    new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
-            abortTransactions(getTransactionsToAbort(state, new 
ArrayList<>()));
-            return state;
+    private void abortLingeringTransactions(
+            List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+        List<String> prefixesToAbort = 
Lists.newArrayList(transactionalIdPrefix);
+
+        if (!recoveredStates.isEmpty()) {
+            KafkaWriterState lastState = recoveredStates.get(0);
+            if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {

Review comment:
       Isn't this exactly what's done here? 
   We usually only try to abort transactions with the current prefix. When we 
detect a change, we additionally check for transactions with the old prefix.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
         closer.close();
     }
 
-    private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> 
recoveredStates) {
-        final int subtaskId = kafkaSinkContext.getParallelInstanceId();
-        if (recoveredStates.isEmpty()) {
-            final KafkaWriterState state =
-                    new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
-            abortTransactions(getTransactionsToAbort(state, new 
ArrayList<>()));
-            return state;
+    private void abortLingeringTransactions(
+            List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+        List<String> prefixesToAbort = 
Lists.newArrayList(transactionalIdPrefix);
+
+        if (!recoveredStates.isEmpty()) {
+            KafkaWriterState lastState = recoveredStates.get(0);
+            if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
+                prefixesToAbort.add(lastState.getTransactionalIdPrefix());
+                LOG.warn(
+                        "Transactional id prefix from previous execution {} 
has changed to {}.",
+                        lastState.getTransactionalIdPrefix(),
+                        transactionalIdPrefix);
+            }
         }
-        final Map<Integer, KafkaWriterState> taskOffsetMapping =
-                recoveredStates.stream()
-                        .collect(
-                                Collectors.toMap(
-                                        KafkaWriterState::getSubtaskId, 
Function.identity()));
-        checkState(
-                taskOffsetMapping.containsKey(subtaskId),
-                "Internal error: It is expected that state from previous 
executions is distributed to the same subtask id.");
-        final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId);
-        taskOffsetMapping.remove(subtaskId);
-        abortTransactions(
-                getTransactionsToAbort(lastState, new 
ArrayList<>(taskOffsetMapping.values())));
-        if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
-            LOG.warn(
-                    "Transactional id prefix from previous execution {} has 
changed to {}.",
-                    lastState.getTransactionalIdPrefix(),
-                    transactionalIdPrefix);
-            return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
+
+        final Properties properties = new Properties();
+        properties.putAll(kafkaProducerConfig);
+        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dummy");

Review comment:
       Yes, but we can solve it in a better way where this is not needed.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
         closer.close();
     }
 
-    private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> 
recoveredStates) {
-        final int subtaskId = kafkaSinkContext.getParallelInstanceId();
-        if (recoveredStates.isEmpty()) {
-            final KafkaWriterState state =
-                    new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
-            abortTransactions(getTransactionsToAbort(state, new 
ArrayList<>()));
-            return state;
+    private void abortLingeringTransactions(
+            List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+        List<String> prefixesToAbort = 
Lists.newArrayList(transactionalIdPrefix);
+
+        if (!recoveredStates.isEmpty()) {
+            KafkaWriterState lastState = recoveredStates.get(0);
+            if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
+                prefixesToAbort.add(lastState.getTransactionalIdPrefix());
+                LOG.warn(
+                        "Transactional id prefix from previous execution {} 
has changed to {}.",
+                        lastState.getTransactionalIdPrefix(),
+                        transactionalIdPrefix);
+            }
         }
-        final Map<Integer, KafkaWriterState> taskOffsetMapping =
-                recoveredStates.stream()
-                        .collect(
-                                Collectors.toMap(
-                                        KafkaWriterState::getSubtaskId, 
Function.identity()));
-        checkState(
-                taskOffsetMapping.containsKey(subtaskId),
-                "Internal error: It is expected that state from previous 
executions is distributed to the same subtask id.");
-        final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId);
-        taskOffsetMapping.remove(subtaskId);
-        abortTransactions(
-                getTransactionsToAbort(lastState, new 
ArrayList<>(taskOffsetMapping.values())));
-        if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
-            LOG.warn(
-                    "Transactional id prefix from previous execution {} has 
changed to {}.",
-                    lastState.getTransactionalIdPrefix(),
-                    transactionalIdPrefix);
-            return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
+
+        final Properties properties = new Properties();
+        properties.putAll(kafkaProducerConfig);
+        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dummy");
+        try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
+                new FlinkKafkaInternalProducer<>(properties)) {
+            for (String prefix : prefixesToAbort) {
+                abortTransactionsWithPrefix(producer, prefix, 
startCheckpointId);
+            }
         }
-        return new KafkaWriterState(
-                transactionalIdPrefix, subtaskId, 
lastState.getTransactionalIdOffset());
     }
 
-    private void abortTransactions(List<String> transactionsToAbort) {
-        transactionsToAbort.forEach(
-                transaction -> {
-                    // don't mess with the original configuration or any other
-                    // properties of the
-                    // original object
-                    // -> create an internal kafka producer on our own and do 
not rely
-                    // on
-                    //    initTransactionalProducer().
-                    final Properties myConfig = new Properties();
-                    myConfig.putAll(kafkaProducerConfig);
-                    myConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transaction);
-                    LOG.info("Aborting Kafka transaction {}.", transaction);
-                    FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer = 
null;
-                    try {
-                        kafkaProducer = new 
FlinkKafkaInternalProducer<>(myConfig);
-                        // it suffices to call initTransactions - this will 
abort any
-                        // lingering transactions
-                        kafkaProducer.initTransactions();
-                    } finally {
-                        if (kafkaProducer != null) {
-                            kafkaProducer.close(Duration.ofSeconds(0));
-                        }
-                    }
-                });
+    /**
+     * Aborts all transactions that have been created by this subtask in a 
previous run.
+     *
+     * <p>It also aborts transactions from subtasks that may have been removed 
because of
+     * downscaling.
+     *
+     * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is 
responsible for cleaning
+     * all subtasks j in [0; X), where j % Y = i. For example, if we downscale 
to 2, then subtask 0
+     * is responsible for all even and subtask 1 for all odd subtasks.
+     */
+    private void abortTransactionsWithPrefix(

Review comment:
       But this is exactly the same thing.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -327,41 +346,43 @@ private void flushRecords(boolean finalFlush) {
      * For each checkpoint we create new {@link FlinkKafkaInternalProducer} so 
that new transactions
      * will not clash with transactions created during previous checkpoints 
({@code
      * producer.initTransactions()} assures that we obtain new producerId and 
epoch counters).
+     *
+     * <p>Ensures that all transaction ids in between lastCheckpointId and 
checkpointId are
+     * initialized.
      */
-    private FlinkKafkaInternalProducer<byte[], byte[]> 
createTransactionalProducer() {
-        final long transactionalIdOffset = 
kafkaWriterState.getTransactionalIdOffset() + 1;
+    private FlinkKafkaInternalProducer<byte[], byte[]> 
createTransactionalProducer(
+            long checkpointId) {
+        checkState(
+                checkpointId > lastCheckpointId,
+                "Expected %s > %s",
+                checkpointId,
+                lastCheckpointId);
         final Properties copiedProducerConfig = new Properties();
         copiedProducerConfig.putAll(kafkaProducerConfig);
-        initTransactionalProducerConfig(
-                copiedProducerConfig,
-                transactionalIdOffset,
-                transactionalIdPrefix,
-                kafkaSinkContext.getParallelInstanceId());
+        copiedProducerConfig.put(
+                ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+                TransactionalIdFactory.buildTransactionalId(
+                        transactionalIdPrefix,
+                        kafkaSinkContext.getParallelInstanceId(),
+                        lastCheckpointId + 1));
         final FlinkKafkaInternalProducer<byte[], byte[]> producer =
                 new FlinkKafkaInternalProducer<>(copiedProducerConfig);
         producer.initTransactions();
-        kafkaWriterState =
-                new KafkaWriterState(
-                        transactionalIdPrefix,
-                        kafkaSinkContext.getParallelInstanceId(),
-                        transactionalIdOffset);
-        LOG.info(
-                "Created new transactional producer {}",
-                
copiedProducerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
+        // in case checkpoints have been aborted, Flink would create 
non-consecutive transaction ids
+        // this loop ensures that all gaps are filled with initialized (empty) 
transactions
+        for (long id = lastCheckpointId + 2; id <= checkpointId; id++) {
+            producer.setTransactionalId(
+                    TransactionalIdFactory.buildTransactionalId(
+                            transactionalIdPrefix,
+                            kafkaSinkContext.getParallelInstanceId(),
+                            lastCheckpointId + 1));
+            producer.initTransactions();
+        }

Review comment:
       I refactored the code to make it more obvious. I'm also adding an 
example in the comments.

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java
##########
@@ -17,7 +17,8 @@
 
 package org.apache.flink.connector.kafka.sink;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import 
org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionRecord;

Review comment:
       Yes, if it's non-trivial.

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java
##########
@@ -17,7 +17,8 @@
 
 package org.apache.flink.connector.kafka.sink;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import 
org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionRecord;

Review comment:
       Yes, if it's non-trivial and meant to be used as a util for other tests.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrier.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Retries the committables of a {@link CommitterHandler} until all 
committables are eventually
+ * committed.
+ */
+public class CommitRetrier {

Review comment:
       Made non-public instead.

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -226,11 +236,12 @@ public void 
testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Excep
                 new FailingCheckpointMapper(failed, lastCheckpointedRecord), 
config, "newPrefix");
         final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
                 drainAllRecordsFromTopic(topic);
-        assertEquals(
+        assertThat(
                 deserializeValues(collectedRecords),
-                LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
-                        .boxed()
-                        .collect(Collectors.toList()));
+                containsInAnyOrder(

Review comment:
       Yes you are right. I was playing around with MiniCluster at some 
intermediate state and things were executed in parallel. I'd switch to 
`contains` but would leave the change in as expected and actual was swapped 
originally.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -141,13 +138,15 @@
         } catch (Exception e) {
             throw new FlinkRuntimeException("Cannot initialize schema.", e);
         }
-        this.kafkaWriterState =
-                recoverAndInitializeState(checkNotNull(recoveredStates, 
"recoveredStates"));
         disableMetrics =
                 kafkaProducerConfig.containsKey(KEY_DISABLE_METRICS)
                         && Boolean.parseBoolean(
                                 
kafkaProducerConfig.getProperty(KEY_DISABLE_METRICS));
-        this.currentProducer = createProducer();
+        lastCheckpointId = 
sinkInitContext.getRestoredCheckpointId().orElse(-1) + 1;

Review comment:
       Good catch!

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
         closer.close();
     }
 
-    private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> 
recoveredStates) {
-        final int subtaskId = kafkaSinkContext.getParallelInstanceId();
-        if (recoveredStates.isEmpty()) {
-            final KafkaWriterState state =
-                    new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
-            abortTransactions(getTransactionsToAbort(state, new 
ArrayList<>()));
-            return state;
+    private void abortLingeringTransactions(
+            List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+        List<String> prefixesToAbort = 
Lists.newArrayList(transactionalIdPrefix);
+
+        if (!recoveredStates.isEmpty()) {
+            KafkaWriterState lastState = recoveredStates.get(0);
+            if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {

Review comment:
       The list is initialized with the new prefix. The old prefix is only 
added when changed. So we always abort with 1 and sometimes with 2 prefixes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to