This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 044ad25150143bdaf6e8070ccbb5f655ad0564fd
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Apr 3 09:32:00 2025 +0200

    [FLINK-37605][runtime] Infer checkpoint id on endInput in sink
    
    So far, we used a special value for the final checkpoint on endInput. 
However, as shown in the description of this ticket, final doesn't mean final. 
Hence, multiple committables with EOI could be created at different times.
    
    With this commit, we stop using a special value for such committables and 
instead try to guess the checkpoint id of the next checkpoint. There are 
various factors that influence the checkpoint id but we can mostly ignore them 
all because we just need to pick a checkpoint id that is
    - higher than all checkpoint ids of the previous, successful checkpoints of 
this attempt
    - higher than the checkpoint id of the restored checkpoint
    - lower than any future checkpoint id.
    
    Hence, we just remember the last observed checkpoint id (initialized with 
max(0, restored id)), and use last id + 1 for endInput. Naturally, multiple 
endInput calls happening through restarts will result in unique checkpoint ids. 
Note that aborted checkpoints before endInput may result in diverged checkpoint 
ids across subtasks. However, each of the id satisfies above requirements and 
any id of endInput1 will be smaller than any id of endInput2. Thus, diverged 
checkpoint ids will not  [...]
    
    (cherry picked from commit 93025452714570a4d461519510375dd72af3a2c0)
---
 .../sink/compactor/operator/CompactorOperator.java | 12 +++--
 .../api/connector/sink2/CommittableMessage.java    |  6 ++-
 .../runtime/operators/sink/CommitterOperator.java  | 23 ++++-----
 .../runtime/operators/sink/SinkWriterOperator.java | 58 ++++------------------
 .../CheckpointCommittableManagerImpl.java          |  1 +
 .../sink/committables/CommittableCollector.java    | 11 ----
 .../sink2/GlobalCommitterOperatorTest.java         | 33 ------------
 .../operators/sink/CommitterOperatorTestBase.java  | 40 ---------------
 .../operators/sink/SinkWriterOperatorTestBase.java | 40 ++++++++++++++-
 .../committables/CommittableCollectorTest.java     | 20 --------
 .../util/AbstractStreamOperatorTestHarness.java    | 12 +++--
 11 files changed, 81 insertions(+), 175 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
index 0cb573d466f..69ee4b07914 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -106,6 +107,8 @@ public class CompactorOperator
     // submitted again while restoring
     private ListState<Map<Long, List<CompactorRequest>>> 
remainingRequestsState;
 
+    private long lastKnownCheckpointId = 
CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1;
+
     public CompactorOperator(
             FileCompactStrategy strategy,
             SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer,
@@ -136,15 +139,16 @@ public class CompactorOperator
     @Override
     public void endInput() throws Exception {
         // add collecting requests into the final snapshot
-        checkpointRequests.put(CommittableMessage.EOI, collectingRequests);
+        long checkpointId = lastKnownCheckpointId + 1;
+        checkpointRequests.put(checkpointId, collectingRequests);
         collectingRequests = new ArrayList<>();
 
         // submit all requests and wait until they are done
-        submitUntil(CommittableMessage.EOI);
+        submitUntil(checkpointId);
         assert checkpointRequests.isEmpty();
 
         getAllTasksFuture().join();
-        emitCompacted(CommittableMessage.EOI);
+        emitCompacted(checkpointId);
         assert compactingRequests.isEmpty();
     }
 
@@ -222,6 +226,8 @@ public class CompactorOperator
     }
 
     private void emitCompacted(long checkpointId) throws Exception {
+        lastKnownCheckpointId = checkpointId;
+
         List<FileSinkCommittable> compacted = new ArrayList<>();
         Iterator<Tuple2<CompactorRequest, 
CompletableFuture<Iterable<FileSinkCommittable>>>> iter =
                 compactingRequests.iterator();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java
index 7db0c29ecc6..4a2049dbce8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java
@@ -28,8 +28,10 @@ public interface CommittableMessage<CommT> {
     /**
      * Special value for checkpointId for the end of input in case of batch 
commit or final
      * checkpoint.
+     *
+     * @deprecated the special value is not used anymore at all (remove with 
Flink 2.2)
      */
-    long EOI = Long.MAX_VALUE;
+    @Deprecated long EOI = Long.MAX_VALUE;
 
     /** The subtask that created this committable. */
     int getSubtaskId();
@@ -49,6 +51,8 @@ public interface CommittableMessage<CommT> {
     /**
      * Returns the checkpoint id or EOI if this message belong to the final 
checkpoint or the batch
      * commit.
+     *
+     * @deprecated the special value EOI is not used anymore
      */
     long getCheckpointIdOrEOI();
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index 10ae86cf10d..6954ad24e36 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.configuration.SinkOptions;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import 
org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -51,7 +52,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.OptionalLong;
 
-import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -76,11 +76,9 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
     private SinkCommitterMetricGroup metricGroup;
     private Committer<CommT> committer;
     private CommittableCollector<CommT> committableCollector;
-    private long lastCompletedCheckpointId = -1;
+    private long lastCompletedCheckpointId = 
CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1;
     private int maxRetries;
 
-    private boolean endInput = false;
-
     /** The operator's state descriptor. */
     private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
             new ListStateDescriptor<>(
@@ -131,11 +129,11 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
                                 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
                                 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(),
                                 metricGroup));
-        if (context.isRestored()) {
+        if (checkpointId.isPresent()) {
             committableCollectorState.get().forEach(cc -> 
committableCollector.merge(cc));
             lastCompletedCheckpointId = checkpointId.getAsLong();
             // try to re-commit recovered transactions as quickly as possible
-            commitAndEmitCheckpoints();
+            commitAndEmitCheckpoints(lastCompletedCheckpointId);
         }
     }
 
@@ -148,24 +146,23 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
 
     @Override
     public void endInput() throws Exception {
-        endInput = true;
         if (!isCheckpointingEnabled || isBatchMode) {
             // There will be no final checkpoint, all committables should be 
committed here
-            commitAndEmitCheckpoints();
+            commitAndEmitCheckpoints(lastCompletedCheckpointId + 1);
         }
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         super.notifyCheckpointComplete(checkpointId);
-        lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
-        commitAndEmitCheckpoints();
+        commitAndEmitCheckpoints(Math.max(lastCompletedCheckpointId, 
checkpointId));
     }
 
-    private void commitAndEmitCheckpoints() throws IOException, 
InterruptedException {
-        long completedCheckpointId = endInput ? EOI : 
lastCompletedCheckpointId;
+    private void commitAndEmitCheckpoints(long checkpointId)
+            throws IOException, InterruptedException {
+        lastCompletedCheckpointId = checkpointId;
         for (CheckpointCommittableManager<CommT> checkpointManager :
-                
committableCollector.getCheckpointCommittablesUpTo(completedCheckpointId)) {
+                
committableCollector.getCheckpointCommittablesUpTo(checkpointId)) {
             // ensure that all committables of the first checkpoint are fully 
committed before
             // attempting the next committable
             commitAndEmit(checkpointManager);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index 7fb78f37c0d..31397f48b37 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema.Initializat
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
 import org.apache.flink.api.connector.sink2.Sink;
@@ -52,8 +51,6 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.UserCodeClassLoader;
 
-import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
-
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -62,6 +59,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.OptionalLong;
 
+import static 
org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID;
 import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -91,13 +89,6 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     @Nullable private final SimpleVersionedSerializer<CommT> 
committableSerializer;
     private final List<CommT> legacyCommittables = new ArrayList<>();
 
-    /**
-     * Used to remember that EOI has already happened so that we don't emit 
the last committables of
-     * the final checkpoints twice.
-     */
-    private static final ListStateDescriptor<Boolean> END_OF_INPUT_STATE_DESC =
-            new ListStateDescriptor<>("end_of_input_state", 
BooleanSerializer.INSTANCE);
-
     /** The runtime information of the input element. */
     private final Context<InputT> context;
 
@@ -115,10 +106,7 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     private final MailboxExecutor mailboxExecutor;
 
     private boolean endOfInput = false;
-    /**
-     * Remembers the endOfInput state for (final) checkpoints iff the operator 
emits committables.
-     */
-    @Nullable private ListState<Boolean> endOfInputState;
+    private long lastKnownCheckpointId = INITIAL_CHECKPOINT_ID - 1;
 
     SinkWriterOperator(
             Sink<InputT> sink,
@@ -146,8 +134,10 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
-        WriterInitContext initContext = 
createInitContext(context.getRestoredCheckpointId());
-        if (context.isRestored()) {
+        OptionalLong restoredCheckpointId = context.getRestoredCheckpointId();
+        WriterInitContext initContext = 
createInitContext(restoredCheckpointId);
+        if (restoredCheckpointId.isPresent()) {
+            lastKnownCheckpointId = restoredCheckpointId.getAsLong();
             if (committableSerializer != null) {
                 final ListState<List<CommT>> legacyCommitterState =
                         new SimpleVersionedListState<>(
@@ -161,41 +151,12 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
         }
 
         sinkWriter = writerStateHandler.createWriter(initContext, context);
-
-        if (emitDownstream) {
-            // Figure out if we have seen end of input before and if we can 
suppress creating
-            // transactions and sending them downstream to the 
CommitterOperator. We have the
-            // following
-            // cases:
-            // 1. state is empty:
-            //   - First time initialization
-            //   - Restoring from a previous version of Flink that didn't 
handle EOI
-            //   - Upscaled from a final or regular checkpoint
-            // In all cases, we regularly handle EOI, potentially resulting in 
duplicate summaries
-            // that the CommitterOperator needs to handle.
-            // 2. state is not empty:
-            //   - This implies Flink restores from a version that handles EOI.
-            //   - If there is one entry, no rescaling happened (for this 
subtask), so if it's true,
-            //     we recover from a final checkpoint (for this subtask) and 
can ignore another EOI
-            //     else we have a regular checkpoint.
-            //   - If there are multiple entries, Flink downscaled, and we 
need to check if all are
-            //     true and do the same as above. As soon as one entry is 
false, we regularly start
-            //     the writer and potentially emit duplicate summaries if we 
indeed recovered from a
-            //     final checkpoint.
-            endOfInputState = 
context.getOperatorStateStore().getListState(END_OF_INPUT_STATE_DESC);
-            ArrayList<Boolean> previousState = 
Lists.newArrayList(endOfInputState.get());
-            endOfInput = !previousState.isEmpty() && 
!previousState.contains(false);
-        }
     }
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
         writerStateHandler.snapshotState(context.getCheckpointId());
-        if (endOfInputState != null) {
-            endOfInputState.clear();
-            endOfInputState.add(this.endOfInput);
-        }
     }
 
     @Override
@@ -225,17 +186,16 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
 
     @Override
     public void endInput() throws Exception {
+        LOG.info("Received endInput");
         if (!endOfInput) {
             endOfInput = true;
-            if (endOfInputState != null) {
-                endOfInputState.add(true);
-            }
             sinkWriter.flush(true);
-            emitCommittables(CommittableMessage.EOI);
+            emitCommittables(lastKnownCheckpointId + 1);
         }
     }
 
     private void emitCommittables(long checkpointId) throws IOException, 
InterruptedException {
+        lastKnownCheckpointId = checkpointId;
         if (!emitDownstream) {
             // To support SinkV1 topologies with only a writer we have to call 
prepareCommit
             // although no committables are forwarded
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
index da4491cda61..816bd55543e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
@@ -95,6 +95,7 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
                         summary.getSubtaskId(),
                         checkpointId,
                         metricGroup);
+        // Remove branch once CommittableMessage.EOI has been removed 
(earliest 2.2)
         if (checkpointId == CommittableMessage.EOI) {
             SubtaskCommittableManager<CommT> merged =
                     subtasksCommittableManagers.merge(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
index 4e49d73279e..96585a632d1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
@@ -33,7 +33,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
@@ -49,7 +48,6 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 @Internal
 public class CommittableCollector<CommT> {
-    private static final long EOI = Long.MAX_VALUE;
     /** Mapping of checkpoint id to {@link CheckpointCommittableManagerImpl}. 
*/
     private final NavigableMap<Long, CheckpointCommittableManagerImpl<CommT>>
             checkpointCommittables;
@@ -143,15 +141,6 @@ public class CommittableCollector<CommT> {
         return new ArrayList<>(checkpointCommittables.headMap(checkpointId, 
true).values());
     }
 
-    /**
-     * Returns {@link CheckpointCommittableManager} belonging to the last 
input.
-     *
-     * @return {@link CheckpointCommittableManager}
-     */
-    public Optional<CheckpointCommittableManager<CommT>> 
getEndOfInputCommittable() {
-        return Optional.ofNullable(checkpointCommittables.get(EOI));
-    }
-
     /**
      * Returns whether all {@link CheckpointCommittableManager} currently hold 
by the collector are
      * either committed or failed.
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
index 641a651e2e4..24f9422d30b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
@@ -32,7 +32,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.assertj.core.api.Assertions.assertThat;
 
 class GlobalCommitterOperatorTest {
@@ -138,38 +137,6 @@ class GlobalCommitterOperatorTest {
         }
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testCommitAllCommittablesOnFinalCheckpoint(boolean commitOnInput) 
throws Exception {
-        final MockCommitter committer = new MockCommitter();
-        final OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, 
Void> testHarness =
-                createTestHarness(committer, commitOnInput);
-        testHarness.open();
-
-        final CommittableSummary<Integer> committableSummary =
-                new CommittableSummary<>(1, 2, EOI, 1, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary));
-        final CommittableSummary<Integer> committableSummary2 =
-                new CommittableSummary<>(2, 2, EOI, 1, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary2));
-
-        final CommittableWithLineage<Integer> first = new 
CommittableWithLineage<>(1, EOI, 1);
-        testHarness.processElement(new StreamRecord<>(first));
-        final CommittableWithLineage<Integer> second = new 
CommittableWithLineage<>(2, EOI, 2);
-        testHarness.processElement(new StreamRecord<>(second));
-
-        // commitOnInput implies that the global committer is not using 
notifyCheckpointComplete
-        if (commitOnInput) {
-            assertThat(committer.committed).containsExactly(1, 2);
-        } else {
-            assertThat(committer.committed).isEmpty();
-            testHarness.notifyOfCompletedCheckpoint(EOI);
-            assertThat(committer.committed).containsExactly(1, 2);
-        }
-
-        assertThat(testHarness.getOutput()).isEmpty();
-    }
-
     private OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, 
Void> createTestHarness(
             Committer<Integer> committer, boolean commitOnInput) throws 
Exception {
         return new OneInputStreamOperatorTestHarness<>(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
index 756ea0c8022..c8b37943846 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
@@ -35,7 +35,6 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.function.IntSupplier;
 
-import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
 import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
 import static org.assertj.core.api.Assertions.as;
@@ -126,45 +125,6 @@ abstract class CommitterOperatorTestBase {
         testHarness.close();
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws 
Exception {
-        SinkAndCounters sinkAndCounters = sinkWithPostCommit();
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(sinkAndCounters.sink, 
isBatchMode, !isBatchMode);
-        testHarness.open();
-
-        final CommittableSummary<String> committableSummary =
-                new CommittableSummary<>(1, 2, EOI, 1, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary));
-        final CommittableSummary<String> committableSummary2 =
-                new CommittableSummary<>(2, 2, EOI, 1, 1, 0);
-        testHarness.processElement(new StreamRecord<>(committableSummary2));
-
-        final CommittableWithLineage<String> first = new 
CommittableWithLineage<>("1", EOI, 1);
-        testHarness.processElement(new StreamRecord<>(first));
-        final CommittableWithLineage<String> second = new 
CommittableWithLineage<>("1", EOI, 2);
-        testHarness.processElement(new StreamRecord<>(second));
-
-        testHarness.endInput();
-        if (!isBatchMode) {
-            assertThat(testHarness.getOutput()).isEmpty();
-            // notify final checkpoint complete
-            testHarness.notifyOfCompletedCheckpoint(1);
-        }
-
-        ListAssert<CommittableMessage<String>> records =
-                assertThat(testHarness.extractOutputValues()).hasSize(3);
-        records.element(0, as(committableSummary()))
-                .hasFailedCommittables(0)
-                .hasOverallCommittables(2)
-                .hasPendingCommittables(0);
-        records.element(1, 
as(committableWithLineage())).isEqualTo(first.withSubtaskId(0));
-        records.element(2, 
as(committableWithLineage())).isEqualTo(second.withSubtaskId(0));
-        testHarness.close();
-    }
-
     @Test
     void testStateRestore() throws Exception {
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
index 46914441c13..57c03df3220 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
@@ -66,7 +66,6 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.api.connector.sink2.InitContext.INITIAL_CHECKPOINT_ID;
-import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableSummary;
 import static 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions.committableWithLineage;
 import static org.assertj.core.api.Assertions.as;
@@ -178,7 +177,7 @@ abstract class SinkWriterOperatorTestBase {
 
         testHarness.processElement(1, 1);
         testHarness.endInput();
-        assertBasicOutput(testHarness.extractOutputValues(), 1, EOI);
+        assertBasicOutput(testHarness.extractOutputValues(), 1, 1L);
     }
 
     @ParameterizedTest
@@ -467,6 +466,43 @@ abstract class SinkWriterOperatorTestBase {
         testHarness.close();
     }
 
+    @Test
+    void testDoubleEndOfInput() throws Exception {
+        InspectableSink sink = sinkWithCommitter();
+
+        OperatorSubtaskState snapshot;
+        try (OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new SinkWriterOperatorFactory<>(sink.getSink()))) {
+            testHarness.open();
+            testHarness.processElement(1, 1);
+
+            testHarness.endInput();
+            testHarness.prepareSnapshotPreBarrier(1);
+            snapshot = testHarness.snapshot(1, 1);
+
+            assertBasicOutput(testHarness.extractOutputValues(), 1, 1L);
+        }
+
+        final InspectableSink restoredSink = sinkWithCommitter();
+        try (OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>>
+                restoredTestHarness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new 
SinkWriterOperatorFactory<>(restoredSink.getSink()))) {
+            restoredTestHarness.setRestoredCheckpointId(1L);
+            restoredTestHarness.initializeState(snapshot);
+            restoredTestHarness.open();
+            restoredTestHarness.processElement(2, 2);
+
+            restoredTestHarness.endInput();
+            restoredTestHarness.prepareSnapshotPreBarrier(3);
+            restoredTestHarness.snapshot(3, 1);
+
+            // asserts the guessed checkpoint id which needs
+            assertBasicOutput(restoredTestHarness.extractOutputValues(), 1, 
2L);
+        }
+    }
+
     private static void assertContextsEqual(
             Sink.InitContext initContext, WriterInitContext original) {
         assertThat(initContext.getUserCodeClassLoader().asClassLoader())
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
index 6e55adcc0c5..3181c21361a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
@@ -24,9 +24,6 @@ import 
org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.Optional;
-
-import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.assertj.core.api.Assertions.assertThat;
 
 class CommittableCollectorTest {
@@ -44,22 +41,5 @@ class CommittableCollectorTest {
         committableCollector.addMessage(new CommittableSummary<>(1, 1, 3L, 1, 
0, 0));
 
         
assertThat(committableCollector.getCheckpointCommittablesUpTo(2)).hasSize(2);
-
-        
assertThat(committableCollector.getEndOfInputCommittable()).isNotPresent();
-    }
-
-    @Test
-    void testGetEndOfInputCommittable() {
-        final CommittableCollector<Integer> committableCollector =
-                new CommittableCollector<>(METRIC_GROUP);
-        CommittableSummary<Integer> first = new CommittableSummary<>(1, 1, 
EOI, 1, 0, 0);
-        committableCollector.addMessage(first);
-
-        Optional<CheckpointCommittableManager<Integer>> endOfInputCommittable =
-                committableCollector.getEndOfInputCommittable();
-        assertThat(endOfInputCommittable).isPresent();
-        assertThat(endOfInputCommittable)
-                .get()
-                .returns(EOI, CheckpointCommittableManager::getCheckpointId);
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index d689f009842..f484f0e3ebe 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -182,6 +182,8 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
 
     private volatile boolean wasFailedExternally = false;
 
+    private long restoredCheckpointId = 0;
+
     public AbstractStreamOperatorTestHarness(
             StreamOperator<OUT> operator, int maxParallelism, int parallelism, 
int subtaskIndex)
             throws Exception {
@@ -402,6 +404,10 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
         return config;
     }
 
+    public void setRestoredCheckpointId(long restoredCheckpointId) {
+        this.restoredCheckpointId = restoredCheckpointId;
+    }
+
     /** Get all the output from the task. This contains StreamRecords and 
Events interleaved. */
     public ConcurrentLinkedQueue<Object> getOutput() {
         return outputList;
@@ -610,16 +616,16 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
             jmTaskStateSnapshot.putSubtaskStateByOperatorID(
                     operator.getOperatorID(), jmOperatorStateHandles);
 
-            taskStateManager.setReportedCheckpointId(0);
+            taskStateManager.setReportedCheckpointId(restoredCheckpointId);
             taskStateManager.setJobManagerTaskStateSnapshotsByCheckpointId(
-                    Collections.singletonMap(0L, jmTaskStateSnapshot));
+                    Collections.singletonMap(restoredCheckpointId, 
jmTaskStateSnapshot));
 
             if (tmOperatorStateHandles != null) {
                 TaskStateSnapshot tmTaskStateSnapshot = new 
TaskStateSnapshot();
                 tmTaskStateSnapshot.putSubtaskStateByOperatorID(
                         operator.getOperatorID(), tmOperatorStateHandles);
                 
taskStateManager.setTaskManagerTaskStateSnapshotsByCheckpointId(
-                        Collections.singletonMap(0L, tmTaskStateSnapshot));
+                        Collections.singletonMap(restoredCheckpointId, 
tmTaskStateSnapshot));
             }
         }
 

Reply via email to