This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 645a0734b33b159a6169cb12c49194aefb9b017f
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Apr 3 09:32:00 2025 +0200

    [FLINK-37605][runtime] Infer checkpoint id on endInput in sink
    
    So far, we used a special value for the final checkpoint on endInput. 
However, as shown in the description of this ticket, final doesn't mean final. 
Hence, multiple committables with EOI could be created at different times.
    
    With this commit, we stop using a special value for such committables and 
instead try to guess the checkpoint id of the next checkpoint. There are 
various factors that influence the checkpoint id but we can mostly ignore them 
all because we just need to pick a checkpoint id that is
    - higher than all checkpoint ids of the previous, successful checkpoints of 
this attempt
    - higher than the checkpoint id of the restored checkpoint
    - lower than any future checkpoint id.
    
    Hence, we just remember the last observed checkpoint id (initialized with 
max(0, restored id)), and use last id + 1 for endInput. Naturally, multiple 
endInput calls happening through restarts will result in unique checkpoint ids. 
Note that aborted checkpoints before endInput may result in diverged checkpoint 
ids across subtasks. However, each of the id satisfies above requirements and 
any id of endInput1 will be smaller than any id of endInput2. Thus, diverged 
checkpoint ids will not  [...]
    
    (cherry picked from commit 93025452714570a4d461519510375dd72af3a2c0)
---
 .../sink/compactor/operator/CompactorOperator.java | 13 +++--
 .../api/connector/sink2/CommittableMessage.java    | 12 ++++-
 .../api/connector/sink2/CommittableSummary.java    |  4 +-
 .../connector/sink2/CommittableWithLineage.java    |  2 +-
 .../runtime/operators/sink/CommitterOperator.java  | 23 ++++----
 .../runtime/operators/sink/SinkWriterOperator.java | 58 ++++----------------
 .../CheckpointCommittableManagerImpl.java          |  1 +
 .../sink/committables/CommittableCollector.java    | 12 -----
 .../sink/GlobalCommitterOperatorTest.java          | 33 ------------
 .../committables/CommittableCollectorTest.java     | 20 -------
 .../util/AbstractStreamOperatorTestHarness.java    | 12 +++--
 .../sink/SinkV2CommitterOperatorTest.java          | 39 --------------
 .../sink/SinkV2SinkWriterOperatorTest.java         | 62 +++++++++++++++++++---
 13 files changed, 106 insertions(+), 185 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
index fa4e2b3d54b..ae2ffe7793a 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -107,6 +108,8 @@ public class CompactorOperator
     // submitted again while restoring
     private ListState<Map<Long, List<CompactorRequest>>> 
remainingRequestsState;
 
+    private long lastKnownCheckpointId = 
CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1;
+
     public CompactorOperator(
             StreamOperatorParameters<CommittableMessage<FileSinkCommittable>> 
parameters,
             FileCompactStrategy strategy,
@@ -139,15 +142,16 @@ public class CompactorOperator
     @Override
     public void endInput() throws Exception {
         // add collecting requests into the final snapshot
-        checkpointRequests.put(CommittableMessage.EOI, collectingRequests);
+        long checkpointId = lastKnownCheckpointId + 1;
+        checkpointRequests.put(checkpointId, collectingRequests);
         collectingRequests = new ArrayList<>();
 
         // submit all requests and wait until they are done
-        submitUntil(CommittableMessage.EOI);
+        submitUntil(checkpointId);
         assert checkpointRequests.isEmpty();
 
         getAllTasksFuture().join();
-        emitCompacted(CommittableMessage.EOI);
+        emitCompacted(checkpointId);
         assert compactingRequests.isEmpty();
     }
 
@@ -225,6 +229,8 @@ public class CompactorOperator
     }
 
     private void emitCompacted(long checkpointId) throws Exception {
+        lastKnownCheckpointId = checkpointId;
+
         List<FileSinkCommittable> compacted = new ArrayList<>();
         Iterator<Tuple2<CompactorRequest, 
CompletableFuture<Iterable<FileSinkCommittable>>>> iter =
                 compactingRequests.iterator();
@@ -252,7 +258,6 @@ public class CompactorOperator
                         
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(),
                         checkpointId,
                         compacted.size(),
-                        compacted.size(),
                         0);
         output.collect(new StreamRecord<>(summary));
         for (FileSinkCommittable c : compacted) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java
index df77f658780..e0978e0ffcf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java
@@ -26,7 +26,10 @@ public interface CommittableMessage<CommT> {
     /**
      * Special value for checkpointId for the end of input in case of batch 
commit or final
      * checkpoint.
+     *
+     * @deprecated the special value is not used anymore at all (remove with 
Flink 2.2)
      */
+    @Deprecated(forRemoval = true)
     long EOI = Long.MAX_VALUE;
 
     /** The subtask that created this committable. */
@@ -35,6 +38,13 @@ public interface CommittableMessage<CommT> {
     /**
      * Returns the checkpoint id or EOI if this message belong to the final 
checkpoint or the batch
      * commit.
+     *
+     * @deprecated the special value EOI is not used anymore
      */
-    long getCheckpointIdOrEOI();
+    @Deprecated(forRemoval = true)
+    default long getCheckpointIdOrEOI() {
+        return getCheckpointId();
+    }
+
+    long getCheckpointId();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
index 252b10fadf4..7496013b046 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
@@ -42,7 +42,7 @@ public class CommittableSummary<CommT> implements 
CommittableMessage<CommT> {
     /** The number of committables coming from the given subtask in the 
particular checkpoint. */
     private final int numberOfCommittables;
 
-    @Deprecated
+    @Deprecated(forRemoval = true)
     /** The number of committables that have not been successfully committed. 
*/
     private final int numberOfPendingCommittables;
 
@@ -88,7 +88,7 @@ public class CommittableSummary<CommT> implements 
CommittableMessage<CommT> {
         return numberOfSubtasks;
     }
 
-    public long getCheckpointIdOrEOI() {
+    public long getCheckpointId() {
         return checkpointId;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
index 819b4fdfc4f..6641a352885 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
@@ -50,7 +50,7 @@ public class CommittableWithLineage<CommT> implements 
CommittableMessage<CommT>
         return subtaskId;
     }
 
-    public long getCheckpointIdOrEOI() {
+    public long getCheckpointId() {
         return checkpointId;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index 9cd85c4001a..2f1cffcbcce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.configuration.SinkOptions;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import 
org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -51,7 +52,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.OptionalLong;
 
-import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -76,11 +76,9 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
     private SinkCommitterMetricGroup metricGroup;
     private Committer<CommT> committer;
     private CommittableCollector<CommT> committableCollector;
-    private long lastCompletedCheckpointId = -1;
+    private long lastCompletedCheckpointId = 
CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1;
     private int maxRetries;
 
-    private boolean endInput = false;
-
     /** The operator's state descriptor. */
     private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
             new ListStateDescriptor<>(
@@ -134,11 +132,11 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
                                 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
                                 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(),
                                 metricGroup));
-        if (context.isRestored()) {
+        if (checkpointId.isPresent()) {
             committableCollectorState.get().forEach(cc -> 
committableCollector.merge(cc));
             lastCompletedCheckpointId = checkpointId.getAsLong();
             // try to re-commit recovered transactions as quickly as possible
-            commitAndEmitCheckpoints();
+            commitAndEmitCheckpoints(lastCompletedCheckpointId);
         }
     }
 
@@ -151,24 +149,23 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
 
     @Override
     public void endInput() throws Exception {
-        endInput = true;
         if (!isCheckpointingEnabled || isBatchMode) {
             // There will be no final checkpoint, all committables should be 
committed here
-            commitAndEmitCheckpoints();
+            commitAndEmitCheckpoints(lastCompletedCheckpointId + 1);
         }
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
-        commitAndEmitCheckpoints();
+        commitAndEmitCheckpoints(Math.max(lastCompletedCheckpointId, 
checkpointId));
     }
 
-    private void commitAndEmitCheckpoints() throws IOException, 
InterruptedException {
-        long completedCheckpointId = endInput ? EOI : 
lastCompletedCheckpointId;
+    private void commitAndEmitCheckpoints(long checkpointId)
+            throws IOException, InterruptedException {
+        lastCompletedCheckpointId = checkpointId;
         for (CheckpointCommittableManager<CommT> checkpointManager :
-                
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
+                
committableCollector.getCheckpointCommittablesUpTo(checkpointId)) {
             // ensure that all committables of the first checkpoint are fully 
committed before
             // attempting the next committable
             commitAndEmit(checkpointManager);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index 6407c0360e2..712d6541eec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema.Initializat
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
 import org.apache.flink.api.connector.sink2.Sink;
@@ -54,8 +53,6 @@ import 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.util.UserCodeClassLoader;
 
-import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
-
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -64,6 +61,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.OptionalLong;
 
+import static 
org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID;
 import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -93,13 +91,6 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     @Nullable private final SimpleVersionedSerializer<CommT> 
committableSerializer;
     private final List<CommT> legacyCommittables = new ArrayList<>();
 
-    /**
-     * Used to remember that EOI has already happened so that we don't emit 
the last committables of
-     * the final checkpoints twice.
-     */
-    private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC =
-            new ListStateDescriptor<>("end_of_input_state", 
BooleanSerializer.INSTANCE);
-
     /** The runtime information of the input element. */
     private final Context<InputT> context;
 
@@ -118,10 +109,7 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
 
     private boolean endOfInput = false;
 
-    /**
-     * Remembers the endOfInput state for (final) checkpoints iff the operator 
emits committables.
-     */
-    @Nullable private ListState<Boolean> endOfInputState;
+    private long lastKnownCheckpointId = INITIAL_CHECKPOINT_ID - 1;
 
     SinkWriterOperator(
             StreamOperatorParameters<CommittableMessage<CommT>> parameters,
@@ -164,8 +152,10 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
-        WriterInitContext initContext = 
createInitContext(context.getRestoredCheckpointId());
-        if (context.isRestored()) {
+        OptionalLong restoredCheckpointId = context.getRestoredCheckpointId();
+        WriterInitContext initContext = 
createInitContext(restoredCheckpointId);
+        if (restoredCheckpointId.isPresent()) {
+            lastKnownCheckpointId = restoredCheckpointId.getAsLong();
             if (committableSerializer != null) {
                 final ListState<List<CommT>> legacyCommitterState =
                         new SimpleVersionedListState<>(
@@ -179,41 +169,12 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
         }
 
         sinkWriter = writerStateHandler.createWriter(initContext, context);
-
-        if (emitDownstream) {
-            // Figure out if we have seen end of input before and if we can 
suppress creating
-            // transactions and sending them downstream to the 
CommitterOperator. We have the
-            // following
-            // cases:
-            // 1. state is empty:
-            //   - First time initialization
-            //   - Restoring from a previous version of Flink that didn't 
handle EOI
-            //   - Upscaled from a final or regular checkpoint
-            // In all cases, we regularly handle EOI, potentially resulting in 
duplicate summaries
-            // that the CommitterOperator needs to handle.
-            // 2. state is not empty:
-            //   - This implies Flink restores from a version that handles EOI.
-            //   - If there is one entry, no rescaling happened (for this 
subtask), so if it's true,
-            //     we recover from a final checkpoint (for this subtask) and 
can ignore another EOI
-            //     else we have a regular checkpoint.
-            //   - If there are multiple entries, Flink downscaled, and we 
need to check if all are
-            //     true and do the same as above. As soon as one entry is 
false, we regularly start
-            //     the writer and potentially emit duplicate summaries if we 
indeed recovered from a
-            //     final checkpoint.
-            endOfInputState = 
context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC);
-            ArrayList<Boolean> previousState = 
Lists.newArrayList(endOfInputState.get());
-            endOfInput = !previousState.isEmpty() && 
!previousState.contains(false);
-        }
     }
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
         writerStateHandler.snapshotState(context.getCheckpointId());
-        if (endOfInputState != null) {
-            endOfInputState.clear();
-            endOfInputState.add(this.endOfInput);
-        }
     }
 
     @Override
@@ -243,17 +204,16 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
 
     @Override
     public void endInput() throws Exception {
+        LOG.info("Received endInput");
         if (!endOfInput) {
             endOfInput = true;
-            if (endOfInputState != null) {
-                endOfInputState.add(true);
-            }
             sinkWriter.flush(true);
-            emitCommittables(CommittableMessage.EOI);
+            emitCommittables(lastKnownCheckpointId + 1);
         }
     }
 
     private void emitCommittables(long checkpointId) throws IOException, 
InterruptedException {
+        lastKnownCheckpointId = checkpointId;
         if (!emitDownstream) {
             // To support SinkV1 topologies with only a writer we have to call 
prepareCommit
             // although no committables are forwarded
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
index 6aa7401a00a..2118874d777 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
@@ -95,6 +95,7 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
                         summary.getSubtaskId(),
                         checkpointId,
                         metricGroup);
+        // Remove branch once CommittableMessage.EOI has been removed 
(earliest 2.2)
         if (checkpointId == CommittableMessage.EOI) {
             SubtaskCommittableManager<CommT> merged =
                     subtasksCommittableManagers.merge(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
index be832152ee7..96585a632d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
@@ -33,7 +33,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
@@ -49,8 +48,6 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 @Internal
 public class CommittableCollector<CommT> {
-    private static final long EOI = Long.MAX_VALUE;
-
     /** Mapping of checkpoint id to {@link CheckpointCommittableManagerImpl}. 
*/
     private final NavigableMap<Long, CheckpointCommittableManagerImpl<CommT>>
             checkpointCommittables;
@@ -144,15 +141,6 @@ public class CommittableCollector<CommT> {
         return new ArrayList<>(checkpointCommittables.headMap(checkpointId, 
true).values());
     }
 
-    /**
-     * Returns {@link CheckpointCommittableManager} belonging to the last 
input.
-     *
-     * @return {@link CheckpointCommittableManager}
-     */
-    public Optional<CheckpointCommittableManager<CommT>> 
getEndOfInputCommittable() {
-        return Optional.ofNullable(checkpointCommittables.get(EOI));
-    }
-
     /**
      * Returns whether all {@link CheckpointCommittableManager} currently hold 
by the collector are
      * either committed or failed.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java
index a73dcc24d01..bf8da806b7b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java
@@ -35,7 +35,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.assertj.core.api.Assertions.assertThat;
 
 class GlobalCommitterOperatorTest {
@@ -140,38 +139,6 @@ class GlobalCommitterOperatorTest {
         }
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testCommitAllCommittablesOnFinalCheckpoint(boolean commitOnInput) 
throws Exception {
-        final MockCommitter committer = new MockCommitter();
-        final OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, 
Void> testHarness =
-                createTestHarness(committer, commitOnInput);
-        testHarness.open();
-
-        final CommittableSummary<Integer> committableSummary =
-                new CommittableSummary<>(1, 2, EOI, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary));
-        final CommittableSummary<Integer> committableSummary2 =
-                new CommittableSummary<>(2, 2, EOI, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary2));
-
-        final CommittableWithLineage<Integer> first = new 
CommittableWithLineage<>(1, EOI, 1);
-        testHarness.processElement(new StreamRecord<>(first));
-        final CommittableWithLineage<Integer> second = new 
CommittableWithLineage<>(2, EOI, 2);
-        testHarness.processElement(new StreamRecord<>(second));
-
-        // commitOnInput implies that the global committer is not using 
notifyCheckpointComplete
-        if (commitOnInput) {
-            assertThat(committer.committed).containsExactly(1, 2);
-        } else {
-            assertThat(committer.committed).isEmpty();
-            testHarness.notifyOfCompletedCheckpoint(EOI);
-            assertThat(committer.committed).containsExactly(1, 2);
-        }
-
-        assertThat(testHarness.getOutput()).isEmpty();
-    }
-
     private OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, 
Void> createTestHarness(
             Committer<Integer> committer, boolean commitOnInput) throws 
Exception {
         return new OneInputStreamOperatorTestHarness<>(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
index 6d311170d47..892b3785e25 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
@@ -24,9 +24,6 @@ import 
org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.Optional;
-
-import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.assertj.core.api.Assertions.assertThat;
 
 class CommittableCollectorTest {
@@ -44,22 +41,5 @@ class CommittableCollectorTest {
         committableCollector.addMessage(new CommittableSummary<>(1, 1, 3L, 1, 
0));
 
         
assertThat(committableCollector.getCheckpointCommittablesUpTo(2)).hasSize(2);
-
-        
assertThat(committableCollector.getEndOfInputCommittable()).isNotPresent();
-    }
-
-    @Test
-    void testGetEndOfInputCommittable() {
-        final CommittableCollector<Integer> committableCollector =
-                new CommittableCollector<>(METRIC_GROUP);
-        CommittableSummary<Integer> first = new CommittableSummary<>(1, 1, 
EOI, 1, 0);
-        committableCollector.addMessage(first);
-
-        Optional<CheckpointCommittableManager<Integer>> endOfInputCommittable =
-                committableCollector.getEndOfInputCommittable();
-        assertThat(endOfInputCommittable).isPresent();
-        assertThat(endOfInputCommittable)
-                .get()
-                .returns(EOI, CheckpointCommittableManager::getCheckpointId);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index e600f666204..561ed1fc4dd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -185,6 +185,8 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
 
     private volatile boolean wasFailedExternally = false;
 
+    private long restoredCheckpointId = 0;
+
     public AbstractStreamOperatorTestHarness(
             StreamOperator<OUT> operator, int maxParallelism, int parallelism, 
int subtaskIndex)
             throws Exception {
@@ -397,6 +399,10 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
         return config;
     }
 
+    public void setRestoredCheckpointId(long restoredCheckpointId) {
+        this.restoredCheckpointId = restoredCheckpointId;
+    }
+
     /** Get all the output from the task. This contains StreamRecords and 
Events interleaved. */
     public ConcurrentLinkedQueue<Object> getOutput() {
         return outputList;
@@ -614,16 +620,16 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
             jmTaskStateSnapshot.putSubtaskStateByOperatorID(
                     operator.getOperatorID(), jmOperatorStateHandles);
 
-            taskStateManager.setReportedCheckpointId(0);
+            taskStateManager.setReportedCheckpointId(restoredCheckpointId);
             taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(
-                    Collections.singletonMap(0L, jmTaskStateSnapshot));
+                    Collections.singletonMap(restoredCheckpointId, 
jmTaskStateSnapshot));
 
             if (tmOperatorStateHandles != null) {
                 TaskStateSnapshot tmTaskStateSnapshot = new 
TaskStateSnapshot();
                 tmTaskStateSnapshot.putSubtaskStateByOperatorID(
                         operator.getOperatorID(), tmOperatorStateHandles);
                 
taskStateManager.setTaskManagerTaskStateSnapshotsByCheckpointId(
-                        Collections.singletonMap(0L, tmTaskStateSnapshot));
+                        Collections.singletonMap(restoredCheckpointId, 
tmTaskStateSnapshot));
             }
         }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
index 950f76b75d5..94a22fb68bf 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2CommitterOperatorTest.java
@@ -41,7 +41,6 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.util.Collection;
 import java.util.function.IntSupplier;
 
-import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
 import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
 import static org.assertj.core.api.Assertions.as;
@@ -165,44 +164,6 @@ class SinkV2CommitterOperatorTest {
         testHarness.close();
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws 
Exception {
-        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(sinkAndCounters.sink, 
isBatchMode, !isBatchMode);
-        testHarness.open();
-
-        final CommittableSummary<String> committableSummary =
-                new CommittableSummary<>(1, 2, EOI, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary));
-        final CommittableSummary<String> committableSummary2 =
-                new CommittableSummary<>(2, 2, EOI, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary2));
-
-        final CommittableWithLineage<String> first = new 
CommittableWithLineage<>("1", EOI, 1);
-        testHarness.processElement(new StreamRecord<>(first));
-        final CommittableWithLineage<String> second = new 
CommittableWithLineage<>("1", EOI, 2);
-        testHarness.processElement(new StreamRecord<>(second));
-
-        testHarness.endInput();
-        if (!isBatchMode) {
-            assertThat(testHarness.getOutput()).isEmpty();
-            // notify final checkpoint complete
-            testHarness.notifyOfCompletedCheckpoint(1);
-        }
-
-        ListAssert<CommittableMessage<String>> records =
-                assertThat(testHarness.extractOutputValues()).hasSize(3);
-        records.element(0, as(committableSummary()))
-                .hasFailedCommittables(0)
-                .hasOverallCommittables(2);
-        records.element(1, 
as(committableWithLineage())).isEqualTo(first.withSubtaskId(0));
-        records.element(2, 
as(committableWithLineage())).isEqualTo(second.withSubtaskId(0));
-        testHarness.close();
-    }
-
     @Test
     void testStateRestore() throws Exception {
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
index 55fe2cbfae9..ba3f80d66fa 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.java
@@ -37,7 +37,6 @@ import 
org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 import 
org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert;
 import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
@@ -69,7 +68,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID;
-import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
 import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
 import static org.assertj.core.api.Assertions.as;
@@ -264,7 +262,7 @@ class SinkV2SinkWriterOperatorTest {
 
             testHarness.processElement(1, 1);
             testHarness.endInput();
-            assertBasicOutput(testHarness.extractOutputValues(), 1, EOI);
+            assertBasicOutput(testHarness.extractOutputValues(), 1, 1L);
         }
     }
 
@@ -411,6 +409,54 @@ class SinkV2SinkWriterOperatorTest {
         }
     }
 
+    @Test
+    void testDoubleEndOfInput() throws Exception {
+        TestSinkV2<Integer> sink =
+                TestSinkV2.newBuilder()
+                        .setWriter(new DefaultCommittingSinkWriter<Integer>())
+                        .setCommitter(new DefaultCommitter<>(), 
RecordSerializer::new)
+                        .setWriterState(true)
+                        .build();
+
+        OperatorSubtaskState snapshot;
+        try (OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Record<Integer>>>
+                testHarness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new SinkWriterOperatorFactory<>(sink))) {
+            testHarness.open();
+            testHarness.processElement(1, 1);
+
+            testHarness.endInput();
+            testHarness.prepareSnapshotPreBarrier(1);
+            snapshot = testHarness.snapshot(1, 1);
+
+            assertBasicOutput(testHarness.extractOutputValues(), 1, 1L);
+        }
+
+        final TestSinkV2<Integer> restoredSink =
+                TestSinkV2.newBuilder()
+                        .setCommitter(new DefaultCommitter<>(), 
RecordSerializer::new)
+                        .setWriter(new DefaultStatefulSinkWriter<Integer>())
+                        .setWriterState(true)
+                        .build();
+        try (OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
+                restoredTestHarness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new 
SinkWriterOperatorFactory<>(restoredSink))) {
+            restoredTestHarness.setRestoredCheckpointId(1L);
+            restoredTestHarness.initializeState(snapshot);
+            restoredTestHarness.open();
+            restoredTestHarness.processElement(2, 2);
+
+            restoredTestHarness.endInput();
+            restoredTestHarness.prepareSnapshotPreBarrier(3);
+            restoredTestHarness.snapshot(3, 1);
+
+            // asserts the guessed checkpoint id which needs
+            assertBasicOutput(restoredTestHarness.extractOutputValues(), 1, 
2L);
+        }
+    }
+
     @Test
     void testInitContext() throws Exception {
         final AtomicReference<WriterInitContext> initContext = new 
AtomicReference<>();
@@ -459,12 +505,12 @@ class SinkV2SinkWriterOperatorTest {
     }
 
     private static void assertBasicOutput(
-            List<CommittableMessage<Integer>> output, int 
numberOfCommittables, long checkpointId) {
-        ListAssert<CommittableMessage<Integer>> records =
+            List<? extends CommittableMessage<?>> output,
+            int numberOfCommittables,
+            long checkpointId) {
+        ListAssert<? extends CommittableMessage<?>> records =
                 assertThat(output).hasSize(numberOfCommittables + 1);
-        CommittableSummaryAssert<Object> objectCommittableSummaryAssert =
-                records.element(0, as(committableSummary()))
-                        .hasOverallCommittables(numberOfCommittables);
+        records.element(0, 
as(committableSummary())).hasOverallCommittables(numberOfCommittables);
         records.filteredOn(r -> r instanceof CommittableWithLineage)
                 .allSatisfy(
                         cl ->


Reply via email to