This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new de46599  [FLINK-25941][streaming] Only emit committables with 
Long.MAX_VALUE as checkpoint id in batch mode
de46599 is described below

commit de465995dc7e28bca71f1610579133b99f064e52
Author: Fabian Paul <[email protected]>
AuthorDate: Tue Feb 15 16:32:53 2022 +0100

    [FLINK-25941][streaming] Only emit committables with Long.MAX_VALUE as 
checkpoint id in batch mode
    
    Before this commit the SinkWriter and Committer operators emitted
    committables on endInput. This was troublesome because by doing so the
    checkpointId was set to effectively Long.MAX_VALUE because
    the emission was not part of any checkpoint. With the completion of
    FLIP-143 all jobs in streaming mode have a final checkpoint when they
    transition to finish so we can rely on the normal checkpoint mechanism
    and only need endInput for the batch execution.
---
 .../connector/sink2/GlobalCommitterOperator.java   | 21 ++----
 .../runtime/operators/sink/CommitterOperator.java  | 31 +++++++--
 .../operators/sink/CommitterOperatorFactory.java   |  8 ++-
 .../runtime/operators/sink/SinkWriterOperator.java | 43 +++++++++---
 .../operators/sink/SinkWriterOperatorFactory.java  | 14 +++-
 .../CheckpointCommittableManagerImpl.java          | 15 +++--
 .../sink/committables/CommittableCollector.java    |  9 +--
 .../translators/SinkTransformationTranslator.java  | 27 ++++++--
 .../connector/sink2/CommittableSummaryAssert.java  | 12 ++++
 .../operators/sink/CommitterOperatorTest.java      | 57 +++++++++++++++-
 .../operators/sink/SinkWriterOperatorTest.java     | 76 ++++++++++++++++++----
 .../CheckpointCommittableManagerImplTest.java      |  6 +-
 .../committables/CommittableCollectorTest.java     | 16 ++++-
 13 files changed, 266 insertions(+), 69 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
index 84c7547..6ad3ec4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java
@@ -114,15 +114,6 @@ class GlobalCommitterOperator<CommT> extends 
AbstractStreamOperator<Void>
         commit(lastCompletedCheckpointId);
     }
 
-    private Collection<? extends CheckpointCommittableManager<CommT>> 
getCommittables() {
-        final Collection<? extends CheckpointCommittableManager<CommT>> 
committables =
-                committableCollector.getEndOfInputCommittables();
-        if (committables == null) {
-            return Collections.emptyList();
-        }
-        return committables;
-    }
-
     private Collection<? extends CheckpointCommittableManager<CommT>> 
getCommittables(
             long checkpointId) {
         final Collection<? extends CheckpointCommittableManager<CommT>> 
committables =
@@ -142,11 +133,13 @@ class GlobalCommitterOperator<CommT> extends 
AbstractStreamOperator<Void>
 
     @Override
     public void endInput() throws Exception {
-        do {
-            for (CommittableManager<CommT> committable : getCommittables()) {
-                committable.commit(false, committer);
-            }
-        } while (!committableCollector.isFinished());
+        final CommittableManager<CommT> endOfInputCommittable =
+                committableCollector.getEndOfInputCommittable();
+        if (endOfInputCommittable != null) {
+            do {
+                endOfInputCommittable.commit(false, committer);
+            } while (!committableCollector.isFinished());
+        }
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index e4a5db5..9cb5a4a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -64,9 +64,13 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
     private final SimpleVersionedSerializer<CommT> committableSerializer;
     private final Committer<CommT> committer;
     private final boolean emitDownstream;
+    private final boolean isCheckpointingOrBatchModeEnabled;
     private CommittableCollector<CommT> committableCollector;
     private long lastCompletedCheckpointId = -1;
 
+    private boolean endInput = false;
+    private boolean finalEmission = false;
+
     /** The operator's state descriptor. */
     private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
             new ListStateDescriptor<>(
@@ -79,8 +83,10 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
             ProcessingTimeService processingTimeService,
             SimpleVersionedSerializer<CommT> committableSerializer,
             Committer<CommT> committer,
-            boolean emitDownstream) {
+            boolean emitDownstream,
+            boolean isCheckpointingOrBatchModeEnabled) {
         this.emitDownstream = emitDownstream;
+        this.isCheckpointingOrBatchModeEnabled = 
isCheckpointingOrBatchModeEnabled;
         this.processingTimeService = checkNotNull(processingTimeService);
         this.committableSerializer = checkNotNull(committableSerializer);
         this.committer = checkNotNull(committer);
@@ -123,20 +129,31 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
 
     @Override
     public void endInput() throws Exception {
-        Collection<? extends CommittableManager<CommT>> endOfInputCommittables 
=
-                committableCollector.getEndOfInputCommittables();
+        endInput = true;
+        final CommittableManager<CommT> endOfInputCommittable =
+                committableCollector.getEndOfInputCommittable();
         // indicates batch
-        if (endOfInputCommittables != null) {
+        if (endOfInputCommittable != null) {
             do {
-                for (CommittableManager<CommT> endOfInputCommittable : 
endOfInputCommittables) {
-                    commitAndEmit(endOfInputCommittable, false);
-                }
+                commitAndEmit(endOfInputCommittable, false);
             } while (!committableCollector.isFinished());
         }
+        if (!isCheckpointingOrBatchModeEnabled) {
+            notifyCheckpointComplete(
+                    lastCompletedCheckpointId == -1 ? 1 : 
lastCompletedCheckpointId + 1);
+        }
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // If a streaming job finishes and a savepoint is triggered afterwards 
we do not want to
+        // flush again
+        if (finalEmission) {
+            return;
+        }
+        if (endInput) {
+            finalEmission = true;
+        }
         super.notifyCheckpointComplete(checkpointId);
         lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
         commitAndEmitCheckpoints();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
index c0ed9e0..f2327c9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
@@ -42,9 +42,12 @@ public final class CommitterOperatorFactory<CommT>
                 CommittableMessage<CommT>, CommittableMessage<CommT>> {
 
     private final TwoPhaseCommittingSink<?, CommT> sink;
+    private final boolean isCheckpointingOrBatchModeEnabled;
 
-    public CommitterOperatorFactory(TwoPhaseCommittingSink<?, CommT> sink) {
+    public CommitterOperatorFactory(
+            TwoPhaseCommittingSink<?, CommT> sink, boolean 
isCheckpointingOrBatchModeEnabled) {
         this.sink = checkNotNull(sink);
+        this.isCheckpointingOrBatchModeEnabled = 
isCheckpointingOrBatchModeEnabled;
     }
 
     @Override
@@ -58,7 +61,8 @@ public final class CommitterOperatorFactory<CommT>
                             processingTimeService,
                             sink.getCommittableSerializer(),
                             sink.createCommitter(),
-                            sink instanceof WithPostCommitTopology);
+                            sink instanceof WithPostCommitTopology,
+                            isCheckpointingOrBatchModeEnabled);
             committerOperator.setup(
                     parameters.getContainingTask(),
                     parameters.getStreamConfig(),
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index 82141e3..edef318 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -70,6 +70,8 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     private final Context<InputT> context;
 
     private final boolean emitDownstream;
+    private final boolean isBatchMode;
+    private final boolean isCheckpointingEnabled;
 
     // ------------------------------- runtime fields 
---------------------------------------
 
@@ -78,17 +80,23 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
 
     private SinkWriter<InputT> sinkWriter;
 
+    private OptionalLong restoredCheckpointId = OptionalLong.empty();
+
     private final SinkWriterStateHandler<InputT> writerStateHandler;
 
     private final MailboxExecutor mailboxExecutor;
-    // record endOfInput state to avoid duplicate prepareCommit on final 
notifyCheckpointComplete
-    // once FLIP-147 is fully operational all endOfInput processing needs to 
be removed
+
     private boolean endOfInput = false;
+    private boolean finalEmission = false;
 
     SinkWriterOperator(
             Sink<InputT> sink,
             ProcessingTimeService processingTimeService,
-            MailboxExecutor mailboxExecutor) {
+            MailboxExecutor mailboxExecutor,
+            boolean isBatchMode,
+            boolean isCheckpointingEnabled) {
+        this.isBatchMode = isBatchMode;
+        this.isCheckpointingEnabled = isCheckpointingEnabled;
         this.processingTimeService = checkNotNull(processingTimeService);
         this.mailboxExecutor = checkNotNull(mailboxExecutor);
         this.context = new Context<>();
@@ -106,6 +114,7 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
         OptionalLong checkpointId = context.getRestoredCheckpointId();
+        restoredCheckpointId = checkpointId;
         InitContext initContext =
                 createInitContext(checkpointId.isPresent() ? 
checkpointId.getAsLong() : null);
 
@@ -126,11 +135,17 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
 
     @Override
     public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
-        super.prepareSnapshotPreBarrier(checkpointId);
-        if (!endOfInput) {
-            sinkWriter.flush(false);
-            emitCommittables(checkpointId);
+        // If a streaming job finishes and a savepoint is triggered afterwards 
we do not want to
+        // flush again
+        if (finalEmission) {
+            return;
+        }
+        if (endOfInput) {
+            finalEmission = true;
         }
+        super.prepareSnapshotPreBarrier(checkpointId);
+        sinkWriter.flush(endOfInput);
+        emitCommittables(checkpointId);
     }
 
     @Override
@@ -144,8 +159,18 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     @Override
     public void endInput() throws Exception {
         endOfInput = true;
-        sinkWriter.flush(true);
-        emitCommittables(Long.MAX_VALUE);
+        // Only in batch mode we want to emit with the Long.MAX_VALUE 
checkpoint id. In streaming
+        // mode there will be a final checkpoint after endInput that flushes 
all pending
+        // committables.
+        if (isBatchMode) {
+            sinkWriter.flush(true);
+            emitCommittables(Long.MAX_VALUE);
+            return;
+        }
+        // There will be no final checkpoint but the job runs in streaming 
mode, so we try to commit
+        if (!isCheckpointingEnabled) {
+            prepareSnapshotPreBarrier(restoredCheckpointId.orElse(0) + 1);
+        }
     }
 
     private void emitCommittables(Long checkpointId) throws IOException, 
InterruptedException {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorFactory.java
index 4ad16c6..f36880d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorFactory.java
@@ -45,16 +45,26 @@ public final class SinkWriterOperatorFactory<InputT, CommT>
                 YieldingOperatorFactory<CommittableMessage<CommT>> {
 
     private final Sink<InputT> sink;
+    private final boolean isBatchMode;
+    private final boolean isCheckpointingEnabled;
 
-    public SinkWriterOperatorFactory(Sink<InputT> sink) {
+    public SinkWriterOperatorFactory(
+            Sink<InputT> sink, boolean isBatchMode, boolean 
isCheckpointingEnabled) {
         this.sink = checkNotNull(sink);
+        this.isBatchMode = isBatchMode;
+        this.isCheckpointingEnabled = isCheckpointingEnabled;
     }
 
     public <T extends StreamOperator<CommittableMessage<CommT>>> T 
createStreamOperator(
             StreamOperatorParameters<CommittableMessage<CommT>> parameters) {
         try {
             final SinkWriterOperator<InputT, CommT> writerOperator =
-                    new SinkWriterOperator<>(sink, processingTimeService, 
getMailboxExecutor());
+                    new SinkWriterOperator<>(
+                            sink,
+                            processingTimeService,
+                            getMailboxExecutor(),
+                            isBatchMode,
+                            isCheckpointingEnabled);
             writerOperator.setup(
                     parameters.getContainingTask(),
                     parameters.getStreamConfig(),
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
index 815870b..4fd22f2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
@@ -22,11 +22,14 @@ import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -36,11 +39,12 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
     /** Mapping of subtask id to {@link SubtaskCommittableManager}. */
     private final Map<Integer, SubtaskCommittableManager<CommT>> 
subtasksCommittableManagers;
 
-    private final long checkpointId;
+    @Nullable private final Long checkpointId;
     private final int subtaskId;
     private final int numberOfSubtasks;
 
-    CheckpointCommittableManagerImpl(int subtaskId, int numberOfSubtasks, long 
checkpointId) {
+    CheckpointCommittableManagerImpl(
+            int subtaskId, int numberOfSubtasks, @Nullable Long checkpointId) {
         this.subtaskId = subtaskId;
         this.numberOfSubtasks = numberOfSubtasks;
         this.checkpointId = checkpointId;
@@ -49,7 +53,7 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
 
     CheckpointCommittableManagerImpl(
             Map<Integer, SubtaskCommittableManager<CommT>> 
subtasksCommittableManagers,
-            long checkpointId) {
+            @Nullable Long checkpointId) {
         this.subtasksCommittableManagers = 
checkNotNull(subtasksCommittableManagers);
         this.subtaskId = 0;
         this.numberOfSubtasks = 1;
@@ -58,6 +62,7 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
 
     @Override
     public long getCheckpointId() {
+        checkNotNull(checkpointId);
         return checkpointId;
     }
 
@@ -138,7 +143,7 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
     }
 
     CheckpointCommittableManagerImpl<CommT> 
merge(CheckpointCommittableManagerImpl<CommT> other) {
-        checkArgument(other.checkpointId == checkpointId);
+        checkArgument(Objects.equals(other.checkpointId, checkpointId));
         for (Map.Entry<Integer, SubtaskCommittableManager<CommT>> subtaskEntry 
:
                 other.subtasksCommittableManagers.entrySet()) {
             subtasksCommittableManagers.merge(
@@ -150,7 +155,7 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
     }
 
     CheckpointCommittableManagerImpl<CommT> copy() {
-        return new CheckpointCommittableManagerImpl<CommT>(
+        return new CheckpointCommittableManagerImpl<>(
                 subtasksCommittableManagers.entrySet().stream()
                         .collect(Collectors.toMap(Map.Entry::getKey, (e) -> 
e.getValue().copy())),
                 checkpointId);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
index fca9a70..94c5654 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
@@ -144,13 +144,14 @@ public class CommittableCollector<CommT> {
     }
 
     /**
-     * Returns all {@link CheckpointCommittableManager} that are currently 
hold by the collector.
+     * Returns {@link CheckpointCommittableManager} that is currently hold by 
the collector and
+     * associated with the {@link CommittableCollector#EOI} checkpoint id.
      *
-     * @return collection of {@link CheckpointCommittableManager}
+     * @return {@link CheckpointCommittableManager}
      */
     @Nullable
-    public Collection<? extends CheckpointCommittableManager<CommT>> 
getEndOfInputCommittables() {
-        return getCheckpointCommittablesUpTo(EOI);
+    public CommittableManager<CommT> getEndOfInputCommittable() {
+        return checkpointCommittables.get(EOI);
     }
 
     /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index 97c19d3..30d5316 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -64,19 +64,24 @@ public class SinkTransformationTranslator<Input, Output>
     @Override
     public Collection<Integer> translateForBatch(
             SinkTransformation<Input, Output> transformation, Context context) 
{
-        return translateForStreaming(transformation, context);
+        return translateInternal(transformation, context, true);
     }
 
     @Override
     public Collection<Integer> translateForStreaming(
             SinkTransformation<Input, Output> transformation, Context context) 
{
+        return translateInternal(transformation, context, false);
+    }
 
+    private Collection<Integer> translateInternal(
+            SinkTransformation<Input, Output> transformation, Context context, 
boolean batch) {
         SinkExpander<Input> expander =
                 new SinkExpander<>(
                         transformation.getInputStream(),
                         transformation.getSink(),
                         transformation,
-                        context);
+                        context,
+                        batch);
         expander.expand();
         return Collections.emptyList();
     }
@@ -94,18 +99,24 @@ public class SinkTransformationTranslator<Input, Output>
         private final DataStream<T> inputStream;
         private final StreamExecutionEnvironment executionEnvironment;
         private final int environmentParallelism;
+        private final boolean isBatchMode;
+        private final boolean isCheckpointingEnabled;
 
         public SinkExpander(
                 DataStream<T> inputStream,
                 Sink<T> sink,
                 SinkTransformation<T, ?> transformation,
-                Context context) {
+                Context context,
+                boolean isBatchMode) {
             this.inputStream = inputStream;
             this.executionEnvironment = inputStream.getExecutionEnvironment();
             this.environmentParallelism = 
executionEnvironment.getParallelism();
+            this.isCheckpointingEnabled =
+                    
executionEnvironment.getCheckpointConfig().isCheckpointingEnabled();
             this.transformation = transformation;
             this.sink = sink;
             this.context = context;
+            this.isBatchMode = isBatchMode;
         }
 
         private void expand() {
@@ -129,7 +140,8 @@ public class SinkTransformationTranslator<Input, Output>
                                 input.transform(
                                         WRITER_NAME,
                                         CommittableMessageTypeInfo.noOutput(),
-                                        new 
SinkWriterOperatorFactory<>(sink)));
+                                        new SinkWriterOperatorFactory<>(
+                                                sink, isBatchMode, 
isCheckpointingEnabled)));
             }
 
             final List<Transformation<?>> sinkTransformations =
@@ -160,7 +172,8 @@ public class SinkTransformationTranslator<Input, Output>
                                     input.transform(
                                             WRITER_NAME,
                                             typeInformation,
-                                            new 
SinkWriterOperatorFactory<>(sink)));
+                                            new SinkWriterOperatorFactory<>(
+                                                    sink, isBatchMode, 
isCheckpointingEnabled)));
 
             DataStream<CommittableMessage<CommT>> precommitted = 
addFailOverRegion(written);
 
@@ -178,7 +191,9 @@ public class SinkTransformationTranslator<Input, Output>
                                     pc.transform(
                                             COMMITTER_NAME,
                                             typeInformation,
-                                            new 
CommitterOperatorFactory<>(committingSink)));
+                                            new CommitterOperatorFactory<>(
+                                                    committingSink,
+                                                    isBatchMode || 
isCheckpointingEnabled)));
 
             if (sink instanceof WithPostCommitTopology) {
                 DataStream<CommittableMessage<CommT>> postcommitted = 
addFailOverRegion(committed);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
index 4672d93..bc84e31 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.connector.sink2;
 
 import org.assertj.core.api.AbstractAssert;
 
+import javax.annotation.Nullable;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Custom assertions for {@link CommittableSummary}. */
@@ -59,4 +61,14 @@ public class CommittableSummaryAssert
         
assertThat(actual.getNumberOfFailedCommittables()).isEqualTo(committableNumber);
         return this;
     }
+
+    public CommittableSummaryAssert hasCheckpointId(@Nullable Long 
checkpointId) {
+        isNotNull();
+        if (checkpointId == null) {
+            assertThat(actual.getCheckpointId()).isEmpty();
+        } else {
+            assertThat(actual.getCheckpointId()).hasValue(checkpointId);
+        }
+        return this;
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
index 05edfa5..06b44dc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
@@ -72,7 +72,7 @@ class CommitterOperatorTest {
                 testHarness =
                         new OneInputStreamOperatorTestHarness<>(
                                 new CommitterOperatorFactory<>(
-                                        (TwoPhaseCommittingSink<?, String>) 
sink));
+                                        (TwoPhaseCommittingSink<?, String>) 
sink, true));
         testHarness.open();
 
         final CommittableSummary<String> committableSummary =
@@ -254,6 +254,58 @@ class CommitterOperatorTest {
         restored.close();
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) 
throws Exception {
+        final Sink<Integer> sink =
+                TestSink.newBuilder()
+                        .setDefaultCommitter()
+                        .setDefaultGlobalCommitter()
+                        
.setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE)
+                        .build()
+                        .asV2();
+
+        final OneInputStreamOperatorTestHarness<
+                        CommittableMessage<String>, CommittableMessage<String>>
+                testHarness =
+                        new OneInputStreamOperatorTestHarness<>(
+                                new CommitterOperatorFactory<>(
+                                        (TwoPhaseCommittingSink<?, String>) 
sink,
+                                        isCheckpointingEnabled));
+        testHarness.open();
+
+        final CommittableSummary<String> committableSummary =
+                new CommittableSummary<>(1, 1, 1L, 1, 1, 0);
+        testHarness.processElement(new StreamRecord<>(committableSummary));
+        final CommittableWithLineage<String> committableWithLineage =
+                new CommittableWithLineage<>("1", 1L, 1);
+        testHarness.processElement(new StreamRecord<>(committableWithLineage));
+
+        testHarness.endInput();
+
+        // If checkpointing enabled endInput does not emit anything because a 
final checkpoint
+        // follows
+        if (isCheckpointingEnabled) {
+            testHarness.notifyOfCompletedCheckpoint(1);
+        }
+
+        final List<StreamElement> output = fromOutput(testHarness.getOutput());
+        assertThat(output).hasSize(2);
+        SinkV2Assertions.assertThat(toCommittableSummary(output.get(0)))
+                .hasCheckpointId(1L)
+                .hasPendingCommittables(0)
+                .hasOverallCommittables(1)
+                .hasFailedCommittables(0);
+        SinkV2Assertions.assertThat(toCommittableWithLinage(output.get(1)))
+                
.isEqualTo(copyCommittableWithDifferentOrigin(committableWithLineage, 0));
+
+        // Future emission calls should change the output
+        testHarness.notifyOfCompletedCheckpoint(2);
+        testHarness.endInput();
+
+        assertThat(testHarness.getOutput()).hasSize(2);
+    }
+
     CommittableWithLineage<?> copyCommittableWithDifferentOrigin(
             CommittableWithLineage<?> committable, int subtaskId) {
         return new CommittableWithLineage<>(
@@ -276,7 +328,8 @@ class CommitterOperatorTest {
                                         .setCommittableSerializer(
                                                 
TestSink.StringCommittableSerializer.INSTANCE)
                                         .build()
-                                        .asV2()));
+                                        .asV2(),
+                        true));
     }
 
     private static class ForwardingCommitter extends TestSink.DefaultCommitter 
{
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
index 7da923e..3a7b8a9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
@@ -57,13 +57,18 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class SinkWriterOperatorTest {
 
+    private static final boolean STREAMING_MODE = false;
+    private static final boolean CHECKPOINTING_ENABLED = true;
+
     @Test
     void testNotEmitCommittablesWithoutCommitter() throws Exception {
         final TestSink.DefaultSinkWriter<Integer> sinkWriter = new 
TestSink.DefaultSinkWriter<>();
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
                         new SinkWriterOperatorFactory<>(
-                                
TestSink.newBuilder().setWriter(sinkWriter).build().asV2()));
+                                
TestSink.newBuilder().setWriter(sinkWriter).build().asV2(),
+                                STREAMING_MODE,
+                                CHECKPOINTING_ENABLED));
         testHarness.open();
         testHarness.processElement(1, 1);
 
@@ -85,7 +90,9 @@ class SinkWriterOperatorTest {
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
                         new SinkWriterOperatorFactory<>(
-                                
TestSink.newBuilder().setWriter(writer).build().asV2()));
+                                
TestSink.newBuilder().setWriter(writer).build().asV2(),
+                                STREAMING_MODE,
+                                CHECKPOINTING_ENABLED));
         testHarness.open();
 
         testHarness.processWatermark(initialTime);
@@ -111,7 +118,9 @@ class SinkWriterOperatorTest {
                                         .setDefaultCommitter()
                                         .setWriter(new 
TimeBasedBufferingSinkWriter())
                                         .build()
-                                        .asV2()));
+                                        .asV2(),
+                                STREAMING_MODE,
+                                CHECKPOINTING_ENABLED));
 
         testHarness.open();
 
@@ -139,7 +148,9 @@ class SinkWriterOperatorTest {
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
                         new SinkWriterOperatorFactory<>(
-                                
TestSink.newBuilder().setDefaultCommitter().build().asV2()));
+                                
TestSink.newBuilder().setDefaultCommitter().build().asV2(),
+                                STREAMING_MODE,
+                                CHECKPOINTING_ENABLED));
 
         testHarness.open();
         assertThat(testHarness.getOutput()).isEmpty();
@@ -158,7 +169,7 @@ class SinkWriterOperatorTest {
     void testEmitOnEndOfInputInBatchMode() throws Exception {
         final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
                 new SinkWriterOperatorFactory<>(
-                        
TestSink.newBuilder().setDefaultCommitter().build().asV2());
+                        
TestSink.newBuilder().setDefaultCommitter().build().asV2(), true, false);
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
 
@@ -168,12 +179,6 @@ class SinkWriterOperatorTest {
         testHarness.processElement(1, 1);
         testHarness.endInput();
         assertBasicOutput(testHarness.getOutput(), 1, Long.MAX_VALUE);
-
-        // Not flush new records during snapshot barrier
-        testHarness.processElement(2, 2);
-        testHarness.prepareSnapshotPreBarrier(1);
-        assertThat(testHarness.getOutput()).hasSize(2);
-        testHarness.close();
     }
 
     @ParameterizedTest
@@ -214,9 +219,11 @@ class SinkWriterOperatorTest {
 
         // this will flush out the committables that were restored
         restoredTestHarness.endInput();
+        final long checkpointId = 2;
+        restoredTestHarness.prepareSnapshotPreBarrier(checkpointId);
 
         if (stateful) {
-            assertBasicOutput(restoredTestHarness.getOutput(), 2, 
Long.MAX_VALUE);
+            assertBasicOutput(restoredTestHarness.getOutput(), 2, 
checkpointId);
         } else {
             
assertThat(fromOutput(restoredTestHarness.getOutput()).get(0).asRecord().getValue())
                     .isInstanceOf(CommittableSummary.class)
@@ -263,6 +270,7 @@ class SinkWriterOperatorTest {
 
         // this will flush out the committables that were restored from 
previous sink
         compatibleWriterOperator.endInput();
+        compatibleWriterOperator.prepareSnapshotPreBarrier(1);
 
         OperatorSubtaskState operatorStateWithoutPreviousState =
                 compatibleWriterOperator.snapshot(1L, 1L);
@@ -288,11 +296,49 @@ class SinkWriterOperatorTest {
 
         // this will flush out the committables that were restored
         restoredSinkOperator.endInput();
+        restoredSinkOperator.prepareSnapshotPreBarrier(2);
 
         assertEmitted(expectedOutput2, restoredSinkOperator.getOutput());
         restoredSinkOperator.close();
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) 
throws Exception {
+        final TestSink.DefaultSinkWriter<Integer> sinkWriter = new 
TestSink.DefaultSinkWriter<>();
+        final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(
+                        new SinkWriterOperatorFactory<>(
+                                TestSink.newBuilder()
+                                        .setWriter(sinkWriter)
+                                        .setDefaultCommitter()
+                                        .build()
+                                        .asV2(),
+                                STREAMING_MODE,
+                                isCheckpointingEnabled));
+        testHarness.open();
+        testHarness.processElement(1, 1);
+
+        assertThat(testHarness.getOutput()).isEmpty();
+        final String record = "(1,1," + Long.MIN_VALUE + ")";
+        assertThat(sinkWriter.elements).containsOnly(record);
+
+        testHarness.endInput();
+
+        if (isCheckpointingEnabled) {
+            testHarness.prepareSnapshotPreBarrier(1);
+        }
+
+        // Ensure after the final emission no emission is possible anymore to 
prevent empty updates
+        testHarness.prepareSnapshotPreBarrier(2);
+        testHarness.endInput();
+
+        assertEmitted(Collections.singletonList(record), 
testHarness.getOutput());
+        assertThat(sinkWriter.elements).isEmpty();
+
+        testHarness.close();
+    }
+
     @SuppressWarnings("unchecked")
     private static void assertEmitted(List<String> records, Queue<Object> 
output) {
 
@@ -327,7 +373,8 @@ class SinkWriterOperatorTest {
             builder.withWriterState();
         }
         final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
-                new SinkWriterOperatorFactory<>(builder.build().asV2());
+                new SinkWriterOperatorFactory<>(
+                        builder.build().asV2(), STREAMING_MODE, 
CHECKPOINTING_ENABLED);
         return new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
     }
 
@@ -344,7 +391,8 @@ class SinkWriterOperatorTest {
             builder.withWriterState();
         }
         final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
-                new SinkWriterOperatorFactory<>(builder.build().asV2());
+                new SinkWriterOperatorFactory<>(
+                        builder.build().asV2(), STREAMING_MODE, 
CHECKPOINTING_ENABLED);
         return new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
index 19304ed..e47c21d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.java
@@ -35,7 +35,7 @@ class CheckpointCommittableManagerImplTest {
     @Test
     void testAddSummary() {
         final CheckpointCommittableManagerImpl<Integer> checkpointCommittables 
=
-                new CheckpointCommittableManagerImpl<>(2, 1, 1);
+                new CheckpointCommittableManagerImpl<>(2, 1, 1L);
         
assertThat(checkpointCommittables.getSubtaskCommittableManagers()).isEmpty();
 
         final CommittableSummary<Integer> first = new CommittableSummary<>(1, 
1, 1L, 1, 0, 0);
@@ -59,7 +59,7 @@ class CheckpointCommittableManagerImplTest {
     @Test
     void testCommit() throws IOException, InterruptedException {
         final CheckpointCommittableManagerImpl<Integer> checkpointCommittables 
=
-                new CheckpointCommittableManagerImpl<Integer>(1, 1, 1);
+                new CheckpointCommittableManagerImpl<>(1, 1, 1L);
         checkpointCommittables.upsertSummary(new CommittableSummary<>(1, 1, 
1L, 1, 0, 0));
         checkpointCommittables.upsertSummary(new CommittableSummary<>(2, 1, 
1L, 2, 0, 0));
         checkpointCommittables.addCommittable(new CommittableWithLineage<>(3, 
1L, 1));
@@ -83,7 +83,7 @@ class CheckpointCommittableManagerImplTest {
     @Test
     void testUpdateCommittableSummary() {
         final CheckpointCommittableManagerImpl<Integer> checkpointCommittables 
=
-                new CheckpointCommittableManagerImpl<Integer>(1, 1, 1);
+                new CheckpointCommittableManagerImpl<>(1, 1, 1L);
         checkpointCommittables.upsertSummary(new CommittableSummary<>(1, 1, 
1L, 1, 0, 0));
         assertThatThrownBy(
                         () ->
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
index 66b466f..f8a1e90 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.operators.sink.committables;
 
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
 
 import org.junit.jupiter.api.Test;
 
@@ -37,6 +38,19 @@ class CommittableCollectorTest {
 
         
assertThat(committableCollector.getCheckpointCommittablesUpTo(2)).hasSize(2);
 
-        
assertThat(committableCollector.getEndOfInputCommittables()).hasSize(3);
+        assertThat(committableCollector.getEndOfInputCommittable()).isNull();
+    }
+
+    @Test
+    void testGetEndOfInputCommittable() {
+        final CommittableCollector<Integer> committableCollector = new 
CommittableCollector<>(1, 1);
+        CommittableSummary<Integer> first = new CommittableSummary<>(1, 1, 
null, 1, 0, 0);
+        committableCollector.addMessage(first);
+
+        CommittableManager<Integer> endOfInputCommittable =
+                committableCollector.getEndOfInputCommittable();
+        assertThat(endOfInputCommittable).isNotNull();
+        SinkV2Assertions.assertThat(endOfInputCommittable.getSummary())
+                .hasCheckpointId(Long.MAX_VALUE);
     }
 }

Reply via email to