This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 37e6724813bd92f8d323ddec80308241f693a5e1
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Sep 5 15:25:12 2024 +0200

    [FLINK-25920] Handle duplicate EOI in Sink
    
    In case of a failure after final checkpoint, EOI is called twice.
    
    SinkWriter should ignore the second call to avoid emitting more dummy 
committables = transactional objects containing no data since no data can 
arrive when recovering from final checkpoint. The commit uses a boolean list 
state to remember if EOI has been emitted. The cases are discussed in code.
    
    Since rescaling may still result in these dummy committables, the committer 
needs merge them into the CommittableCollector as these committables still need 
to be committed as systems like Kafka don't provide transactional isolation.
---
 .../runtime/operators/sink/SinkWriterOperator.java | 58 ++++++++++++-
 .../CheckpointCommittableManagerImpl.java          | 27 +++---
 .../sink/committables/CommittableCollector.java    |  2 +-
 .../CheckpointCommittableManagerImplTest.java      | 14 ++--
 .../flink/test/streaming/runtime/SinkITCase.java   | 97 +++++++++++++++++++---
 5 files changed, 165 insertions(+), 33 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index 93f5ba66198..35d6ca6f7ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema.Initializat
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
 import org.apache.flink.api.connector.sink2.Sink;
@@ -53,6 +54,8 @@ import 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.util.UserCodeClassLoader;
 
+import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -67,7 +70,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * An operator that processes records to be written into a {@link
- * org.apache.flink.api.connector.sink.Sink}. It also has a way to process 
committables with the
+ * org.apache.flink.api.connector.sink2.Sink}. It also has a way to process 
committables with the
  * same parallelism or send them downstream to a {@link CommitterOperator} 
with a different
  * parallelism.
  *
@@ -90,6 +93,13 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     @Nullable private final SimpleVersionedSerializer<CommT> 
committableSerializer;
     private final List<CommT> legacyCommittables = new ArrayList<>();
 
+    /**
+     * Used to remember that EOI has already happened so that we don't emit 
the last committables of
+     * the final checkpoints twice.
+     */
+    private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC =
+            new ListStateDescriptor<>("end_of_input_state", 
BooleanSerializer.INSTANCE);
+
     /** The runtime information of the input element. */
     private final Context<InputT> context;
 
@@ -107,6 +117,10 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     private final MailboxExecutor mailboxExecutor;
 
     private boolean endOfInput = false;
+    /**
+     * Remembers the endOfInput state for (final) checkpoints iff the operator 
emits committables.
+     */
+    @Nullable private ListState<Boolean> endOfInputState;
 
     SinkWriterOperator(
             Sink<InputT> sink,
@@ -160,17 +174,48 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
                 legacyCommitterState.clear();
             }
         }
+
         sinkWriter = writerStateHandler.createWriter(initContext, context);
+
+        if (emitDownstream) {
+            // Figure out if we have seen end of input before and if we can 
suppress creating
+            // transactions and sending them downstream to the 
CommitterOperator. We have the
+            // following
+            // cases:
+            // 1. state is empty:
+            //   - First time initialization
+            //   - Restoring from a previous version of Flink that didn't 
handle EOI
+            //   - Upscaled from a final or regular checkpoint
+            // In all cases, we regularly handle EOI, potentially resulting in 
duplicate summaries
+            // that the CommitterOperator needs to handle.
+            // 2. state is not empty:
+            //   - This implies Flink restores from a version that handles EOI.
+            //   - If there is one entry, no rescaling happened (for this 
subtask), so if it's true,
+            //     we recover from a final checkpoint (for this subtask) and 
can ignore another EOI
+            //     else we have a regular checkpoint.
+            //   - If there are multiple entries, Flink downscaled, and we 
need to check if all are
+            //     true and do the same as above. As soon as one entry is 
false, we regularly start
+            //     the writer and potentially emit duplicate summaries if we 
indeed recovered from a
+            //     final checkpoint.
+            endOfInputState = 
context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC);
+            ArrayList<Boolean> previousState = 
Lists.newArrayList(endOfInputState.get());
+            endOfInput = !previousState.isEmpty() && 
!previousState.contains(false);
+        }
     }
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
         writerStateHandler.snapshotState(context.getCheckpointId());
+        if (endOfInputState != null) {
+            endOfInputState.clear();
+            endOfInputState.add(this.endOfInput);
+        }
     }
 
     @Override
     public void processElement(StreamRecord<InputT> element) throws Exception {
+        checkState(!endOfInput, "Received element after endOfInput: %s", 
element);
         context.element = element;
         sinkWriter.write(element.getValue(), context);
     }
@@ -195,9 +240,14 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
 
     @Override
     public void endInput() throws Exception {
-        endOfInput = true;
-        sinkWriter.flush(true);
-        emitCommittables(CommittableMessage.EOI);
+        if (!endOfInput) {
+            endOfInput = true;
+            if (endOfInputState != null) {
+                endOfInputState.add(true);
+            }
+            sinkWriter.flush(true);
+            emitCommittables(CommittableMessage.EOI);
+        }
     }
 
     private void emitCommittables(long checkpointId) throws IOException, 
InterruptedException {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
index d98ec256e50..a217116055c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.streaming.runtime.operators.sink.committables;
 
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 
@@ -72,18 +73,24 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
         return subtasksCommittableManagers.values();
     }
 
-    void upsertSummary(CommittableSummary<CommT> summary) {
+    void addSummary(CommittableSummary<CommT> summary) {
+        long checkpointId = summary.getCheckpointIdOrEOI();
         SubtaskCommittableManager<CommT> manager =
                 new SubtaskCommittableManager<>(
-                        summary.getNumberOfCommittables(),
-                        subtaskId,
-                        summary.getCheckpointIdOrEOI(),
-                        metricGroup);
-        SubtaskCommittableManager<CommT> existing =
-                
subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager);
-        if (existing != null) {
-            throw new UnsupportedOperationException(
-                    "Currently it is not supported to update the 
CommittableSummary for a checkpoint coming from the same subtask. Please check 
the status of FLINK-25920");
+                        summary.getNumberOfCommittables(), subtaskId, 
checkpointId, metricGroup);
+        if (checkpointId == CommittableMessage.EOI) {
+            SubtaskCommittableManager<CommT> merged =
+                    subtasksCommittableManagers.merge(
+                            summary.getSubtaskId(), manager, 
SubtaskCommittableManager::merge);
+        } else {
+            SubtaskCommittableManager<CommT> existing =
+                    
subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager);
+            if (existing != null) {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Received duplicate committable summary for 
checkpoint %s + subtask %s (new=%s, old=%s). Please check the status of 
FLINK-25920",
+                                checkpointId, summary.getSubtaskId(), manager, 
existing));
+            }
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
index 801c8446850..2dac78c71ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
@@ -241,7 +241,7 @@ public class CommittableCollector<CommT> {
                                         numberOfSubtasks,
                                         summary.getCheckpointIdOrEOI(),
                                         metricGroup))
-                .upsertSummary(summary);
+                .addSummary(summary);
     }
 
     private void addCommittable(CommittableWithLineage<CommT> committable) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
index 5bcebda497f..6c8687b63c9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
@@ -47,7 +47,7 @@ class CheckpointCommittableManagerImplTest {
         
assertThat(checkpointCommittables.getSubtaskCommittableManagers()).isEmpty();
 
         final CommittableSummary<Integer> first = new CommittableSummary<>(1, 
1, 1L, 1, 0, 0);
-        checkpointCommittables.upsertSummary(first);
+        checkpointCommittables.addSummary(first);
         assertThat(checkpointCommittables.getSubtaskCommittableManagers())
                 .hasSize(1)
                 .satisfiesExactly(
@@ -60,7 +60,7 @@ class CheckpointCommittableManagerImplTest {
 
         // Add different subtask id
         final CommittableSummary<Integer> third = new CommittableSummary<>(2, 
1, 2L, 2, 1, 1);
-        checkpointCommittables.upsertSummary(third);
+        checkpointCommittables.addSummary(third);
         
assertThat(checkpointCommittables.getSubtaskCommittableManagers()).hasSize(2);
     }
 
@@ -68,8 +68,8 @@ class CheckpointCommittableManagerImplTest {
     void testCommit() throws IOException, InterruptedException {
         final CheckpointCommittableManagerImpl<Integer> checkpointCommittables 
=
                 new CheckpointCommittableManagerImpl<>(1, 1, 1L, METRIC_GROUP);
-        checkpointCommittables.upsertSummary(new CommittableSummary<>(1, 1, 
1L, 1, 0, 0));
-        checkpointCommittables.upsertSummary(new CommittableSummary<>(2, 1, 
1L, 2, 0, 0));
+        checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L, 
1, 0, 0));
+        checkpointCommittables.addSummary(new CommittableSummary<>(2, 1, 1L, 
2, 0, 0));
         checkpointCommittables.addCommittable(new CommittableWithLineage<>(3, 
1L, 1));
         checkpointCommittables.addCommittable(new CommittableWithLineage<>(4, 
1L, 2));
 
@@ -96,10 +96,10 @@ class CheckpointCommittableManagerImplTest {
     void testUpdateCommittableSummary() {
         final CheckpointCommittableManagerImpl<Integer> checkpointCommittables 
=
                 new CheckpointCommittableManagerImpl<>(1, 1, 1L, METRIC_GROUP);
-        checkpointCommittables.upsertSummary(new CommittableSummary<>(1, 1, 
1L, 1, 0, 0));
+        checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L, 
1, 0, 0));
         assertThatThrownBy(
                         () ->
-                                checkpointCommittables.upsertSummary(
+                                checkpointCommittables.addSummary(
                                         new CommittableSummary<>(1, 1, 1L, 2, 
0, 0)))
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessageContaining("FLINK-25920");
@@ -114,7 +114,7 @@ class CheckpointCommittableManagerImplTest {
         final CheckpointCommittableManagerImpl<Integer> original =
                 new CheckpointCommittableManagerImpl<>(
                         subtaskId, numberOfSubtasks, checkpointId, 
METRIC_GROUP);
-        original.upsertSummary(
+        original.addSummary(
                 new CommittableSummary<>(subtaskId, numberOfSubtasks, 
checkpointId, 1, 0, 0));
 
         CheckpointCommittableManagerImpl<Integer> copy = original.copy();
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
index 0a3de995c9a..383ea9cfd33 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
@@ -17,25 +17,38 @@
 
 package org.apache.flink.test.streaming.runtime;
 
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.connector.datagen.source.TestDataGenerators;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.runtime.operators.sink.TestSink;
-import org.apache.flink.test.util.AbstractTestBaseJUnit4;
+import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.SharedObjectsExtension;
+import org.apache.flink.testutils.junit.SharedReference;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BooleanSupplier;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -48,7 +61,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 /**
  * Integration test for {@link org.apache.flink.api.connector.sink.Sink} run 
time implementation.
  */
-public class SinkITCase extends AbstractTestBaseJUnit4 {
+class SinkITCase extends AbstractTestBase {
     static final List<Integer> SOURCE_DATA =
             Arrays.asList(
                     895, 127, 148, 161, 148, 662, 822, 491, 275, 122, 850, 
630, 682, 765, 434, 970,
@@ -109,14 +122,17 @@ public class SinkITCase extends AbstractTestBaseJUnit4 {
                             COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean()
                                     && 
GLOBAL_COMMIT_QUEUE_RECEIVE_ALL_DATA.getAsBoolean();
 
-    @Before
+    @RegisterExtension
+    private final SharedObjectsExtension sharedObjects = 
SharedObjectsExtension.create();
+
+    @BeforeEach
     public void init() {
         COMMIT_QUEUE.clear();
         GLOBAL_COMMIT_QUEUE.clear();
     }
 
     @Test
-    public void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() 
throws Exception {
+    void writerAndCommitterAndGlobalCommitterExecuteInStreamingMode() throws 
Exception {
         final StreamExecutionEnvironment env = buildStreamEnv();
 
         final DataStream<Integer> stream =
@@ -153,7 +169,7 @@ public class SinkITCase extends AbstractTestBaseJUnit4 {
     }
 
     @Test
-    public void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() 
throws Exception {
+    void writerAndCommitterAndGlobalCommitterExecuteInBatchMode() throws 
Exception {
         final StreamExecutionEnvironment env = buildBatchEnv();
 
         env.fromData(SOURCE_DATA)
@@ -177,7 +193,7 @@ public class SinkITCase extends AbstractTestBaseJUnit4 {
     }
 
     @Test
-    public void writerAndCommitterExecuteInStreamingMode() throws Exception {
+    void writerAndCommitterExecuteInStreamingMode() throws Exception {
         final StreamExecutionEnvironment env = buildStreamEnv();
 
         final DataStream<Integer> stream =
@@ -198,8 +214,45 @@ public class SinkITCase extends AbstractTestBaseJUnit4 {
                 
containsInAnyOrder(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE.toArray()));
     }
 
+    /**
+     * Creates a bounded stream with a failing committer. The test verifies 
that the Sink correctly
+     * recovers and handles multiple endInput().
+     */
+    @Test
+    void duplicateEndInput() throws Exception {
+        // we need at least 2 attempts but add a bit of a safety margin for 
unexpected retries
+        int maxAttempts = 10;
+        final Configuration conf = new Configuration();
+        conf.set(CheckpointingOptions.TOLERABLE_FAILURE_NUMBER, maxAttempts);
+        conf.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
+        conf.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
maxAttempts);
+
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(conf);
+        env.enableCheckpointing(100);
+
+        AtomicBoolean failedOnce = new AtomicBoolean(false);
+        List<String> committed = new ArrayList<>();
+        FailingOnceCommitter committer =
+                new FailingOnceCommitter(
+                        sharedObjects.add(failedOnce), 
sharedObjects.add(committed));
+        env.<Object>fromData("bounded")
+                
.sinkTo(TestSinkV2.newBuilder().setCommitter(committer).build());
+
+        JobClient jobClient = env.executeAsync();
+        // wait for job to finish including restarts
+        jobClient.getJobExecutionResult().get();
+        // Did we successfully finish?
+        
Assertions.assertThat(jobClient.getJobStatus().get()).isEqualTo(JobStatus.FINISHED);
+
+        // check that we error'ed once as expected
+        Assertions.assertThat(failedOnce).isTrue();
+        // but also eventually succeed to commit (size > 1 in case of 
unexpected retries)
+        Assertions.assertThat(committed).isNotEmpty();
+    }
+
     @Test
-    public void writerAndCommitterExecuteInBatchMode() throws Exception {
+    void writerAndCommitterExecuteInBatchMode() throws Exception {
         final StreamExecutionEnvironment env = buildBatchEnv();
 
         env.fromData(SOURCE_DATA)
@@ -214,7 +267,7 @@ public class SinkITCase extends AbstractTestBaseJUnit4 {
     }
 
     @Test
-    public void writerAndGlobalCommitterExecuteInStreamingMode() throws 
Exception {
+    void writerAndGlobalCommitterExecuteInStreamingMode() throws Exception {
         final StreamExecutionEnvironment env = buildStreamEnv();
 
         final DataStream<Integer> stream =
@@ -245,7 +298,7 @@ public class SinkITCase extends AbstractTestBaseJUnit4 {
     }
 
     @Test
-    public void writerAndGlobalCommitterExecuteInBatchMode() throws Exception {
+    void writerAndGlobalCommitterExecuteInBatchMode() throws Exception {
         final StreamExecutionEnvironment env = buildBatchEnv();
 
         env.fromData(SOURCE_DATA)
@@ -280,4 +333,26 @@ public class SinkITCase extends AbstractTestBaseJUnit4 {
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         return env;
     }
+
+    static class FailingOnceCommitter extends TestSinkV2.DefaultCommitter {
+        private final SharedReference<AtomicBoolean> failedOnce;
+        private final SharedReference<List<String>> committed;
+
+        public FailingOnceCommitter(
+                SharedReference<AtomicBoolean> failedOnce,
+                SharedReference<List<String>> committed) {
+            this.failedOnce = failedOnce;
+            this.committed = committed;
+        }
+
+        @Override
+        public void commit(Collection<CommitRequest<String>> committables) {
+            if (failedOnce.get().compareAndSet(false, true)) {
+                throw new RuntimeException("Fail to commit");
+            }
+            for (CommitRequest<String> committable : committables) {
+                this.committed.get().add(committable.getCommittable());
+            }
+        }
+    }
 }

Reply via email to