This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2c9d64e34cfd3025c87c1d3bbd2d1b596df11691
Author: Gen Luo <[email protected]>
AuthorDate: Tue Mar 1 17:48:16 2022 +0800

    [FLINK-26403][datastream] Fixes the endOfInput logic of sink writer and 
committer.
    
    SinkWriterOperator should emit all the pending committables on endOfInput,
    and CommitterOperator should commit all committables when the final 
checkpoint
    is completed or on endOfInput if there's no final checkpoint.
    
    This closes #18938.
---
 .../runtime/operators/sink/CommitterOperator.java  | 57 +++++++++++-----------
 .../operators/sink/CommitterOperatorFactory.java   | 13 +++--
 .../runtime/operators/sink/SinkWriterOperator.java | 41 +++-------------
 .../operators/sink/SinkWriterOperatorFactory.java  | 14 +-----
 .../translators/SinkTransformationTranslator.java  |  9 ++--
 .../operators/sink/CommitterOperatorTest.java      | 32 ++++++++----
 .../operators/sink/SinkWriterOperatorTest.java     | 37 ++++----------
 7 files changed, 82 insertions(+), 121 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index 9cb5a4a..ed74daa 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -64,12 +64,12 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
     private final SimpleVersionedSerializer<CommT> committableSerializer;
     private final Committer<CommT> committer;
     private final boolean emitDownstream;
-    private final boolean isCheckpointingOrBatchModeEnabled;
+    private final boolean isBatchMode;
+    private final boolean isCheckpointingEnabled;
     private CommittableCollector<CommT> committableCollector;
     private long lastCompletedCheckpointId = -1;
 
     private boolean endInput = false;
-    private boolean finalEmission = false;
 
     /** The operator's state descriptor. */
     private static final ListStateDescriptor<byte[]> 
STREAMING_COMMITTER_RAW_STATES_DESC =
@@ -84,9 +84,11 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
             SimpleVersionedSerializer<CommT> committableSerializer,
             Committer<CommT> committer,
             boolean emitDownstream,
-            boolean isCheckpointingOrBatchModeEnabled) {
+            boolean isBatchMode,
+            boolean isCheckpointingEnabled) {
         this.emitDownstream = emitDownstream;
-        this.isCheckpointingOrBatchModeEnabled = 
isCheckpointingOrBatchModeEnabled;
+        this.isBatchMode = isBatchMode;
+        this.isCheckpointingEnabled = isCheckpointingEnabled;
         this.processingTimeService = checkNotNull(processingTimeService);
         this.committableSerializer = checkNotNull(committableSerializer);
         this.committer = checkNotNull(committer);
@@ -130,43 +132,40 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
     @Override
     public void endInput() throws Exception {
         endInput = true;
-        final CommittableManager<CommT> endOfInputCommittable =
-                committableCollector.getEndOfInputCommittable();
-        // indicates batch
-        if (endOfInputCommittable != null) {
-            do {
-                commitAndEmit(endOfInputCommittable, false);
-            } while (!committableCollector.isFinished());
-        }
-        if (!isCheckpointingOrBatchModeEnabled) {
-            notifyCheckpointComplete(
-                    lastCompletedCheckpointId == -1 ? 1 : 
lastCompletedCheckpointId + 1);
+        if (!isCheckpointingEnabled || isBatchMode) {
+            // There will be no final checkpoint, all committables should be 
committed here
+            notifyCheckpointComplete(Long.MAX_VALUE);
         }
     }
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        // If a streaming job finishes and a savepoint is triggered afterwards 
we do not want to
-        // flush again
-        if (finalEmission) {
-            return;
-        }
+        super.notifyCheckpointComplete(checkpointId);
         if (endInput) {
-            finalEmission = true;
+            // This is the final checkpoint, all committables should be 
committed
+            lastCompletedCheckpointId = Long.MAX_VALUE;
+        } else {
+            lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
         }
-        super.notifyCheckpointComplete(checkpointId);
-        lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, 
checkpointId);
         commitAndEmitCheckpoints();
     }
 
     private void commitAndEmitCheckpoints() throws IOException, 
InterruptedException {
-        for (CheckpointCommittableManager<CommT> manager :
-                
committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) {
-            // wait for all committables of the current manager before 
submission
-            boolean fullyReceived = manager.getCheckpointId() == 
lastCompletedCheckpointId;
-            commitAndEmit(manager, fullyReceived);
-        }
+        do {
+            for (CheckpointCommittableManager<CommT> manager :
+                    
committableCollector.getCheckpointCommittablesUpTo(lastCompletedCheckpointId)) {
+                // wait for all committables of the current manager before 
submission
+                boolean fullyReceived =
+                        !endInput && manager.getCheckpointId() == 
lastCompletedCheckpointId;
+                commitAndEmit(manager, fullyReceived);
+            }
+            // !committableCollector.isFinished() indicates that we should 
retry
+            // Retry should be done here if this is a final checkpoint 
(indicated by endInput)
+            // WARN: this is an endless retry, may make the job stuck while 
finishing
+        } while (!committableCollector.isFinished() && endInput);
+
         if (!committableCollector.isFinished()) {
+            // if not endInput, we can schedule retrying later
             retryWithDelay();
         }
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
index f2327c9..8f2056f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.java
@@ -42,12 +42,16 @@ public final class CommitterOperatorFactory<CommT>
                 CommittableMessage<CommT>, CommittableMessage<CommT>> {
 
     private final TwoPhaseCommittingSink<?, CommT> sink;
-    private final boolean isCheckpointingOrBatchModeEnabled;
+    private final boolean isBatchMode;
+    private final boolean isCheckpointingEnabled;
 
     public CommitterOperatorFactory(
-            TwoPhaseCommittingSink<?, CommT> sink, boolean 
isCheckpointingOrBatchModeEnabled) {
+            TwoPhaseCommittingSink<?, CommT> sink,
+            boolean isBatchMode,
+            boolean isCheckpointingEnabled) {
         this.sink = checkNotNull(sink);
-        this.isCheckpointingOrBatchModeEnabled = 
isCheckpointingOrBatchModeEnabled;
+        this.isBatchMode = isBatchMode;
+        this.isCheckpointingEnabled = isCheckpointingEnabled;
     }
 
     @Override
@@ -62,7 +66,8 @@ public final class CommitterOperatorFactory<CommT>
                             sink.getCommittableSerializer(),
                             sink.createCommitter(),
                             sink instanceof WithPostCommitTopology,
-                            isCheckpointingOrBatchModeEnabled);
+                            isBatchMode,
+                            isCheckpointingEnabled);
             committerOperator.setup(
                     parameters.getContainingTask(),
                     parameters.getStreamConfig(),
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index edef318..564f0aa 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -70,8 +70,6 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     private final Context<InputT> context;
 
     private final boolean emitDownstream;
-    private final boolean isBatchMode;
-    private final boolean isCheckpointingEnabled;
 
     // ------------------------------- runtime fields 
---------------------------------------
 
@@ -80,23 +78,16 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
 
     private SinkWriter<InputT> sinkWriter;
 
-    private OptionalLong restoredCheckpointId = OptionalLong.empty();
-
     private final SinkWriterStateHandler<InputT> writerStateHandler;
 
     private final MailboxExecutor mailboxExecutor;
 
     private boolean endOfInput = false;
-    private boolean finalEmission = false;
 
     SinkWriterOperator(
             Sink<InputT> sink,
             ProcessingTimeService processingTimeService,
-            MailboxExecutor mailboxExecutor,
-            boolean isBatchMode,
-            boolean isCheckpointingEnabled) {
-        this.isBatchMode = isBatchMode;
-        this.isCheckpointingEnabled = isCheckpointingEnabled;
+            MailboxExecutor mailboxExecutor) {
         this.processingTimeService = checkNotNull(processingTimeService);
         this.mailboxExecutor = checkNotNull(mailboxExecutor);
         this.context = new Context<>();
@@ -114,7 +105,6 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
         OptionalLong checkpointId = context.getRestoredCheckpointId();
-        restoredCheckpointId = checkpointId;
         InitContext initContext =
                 createInitContext(checkpointId.isPresent() ? 
checkpointId.getAsLong() : null);
 
@@ -135,17 +125,12 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
 
     @Override
     public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
-        // If a streaming job finishes and a savepoint is triggered afterwards 
we do not want to
-        // flush again
-        if (finalEmission) {
-            return;
-        }
-        if (endOfInput) {
-            finalEmission = true;
-        }
         super.prepareSnapshotPreBarrier(checkpointId);
-        sinkWriter.flush(endOfInput);
-        emitCommittables(checkpointId);
+        if (!endOfInput) {
+            sinkWriter.flush(false);
+            emitCommittables(checkpointId);
+        }
+        // no records are expected to emit after endOfInput
     }
 
     @Override
@@ -159,18 +144,8 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     @Override
     public void endInput() throws Exception {
         endOfInput = true;
-        // Only in batch mode we want to emit with the Long.MAX_VALUE 
checkpoint id. In streaming
-        // mode there will be a final checkpoint after endInput that flushes 
all pending
-        // committables.
-        if (isBatchMode) {
-            sinkWriter.flush(true);
-            emitCommittables(Long.MAX_VALUE);
-            return;
-        }
-        // There will be no final checkpoint but the job runs in streaming 
mode, so we try to commit
-        if (!isCheckpointingEnabled) {
-            prepareSnapshotPreBarrier(restoredCheckpointId.orElse(0) + 1);
-        }
+        sinkWriter.flush(true);
+        emitCommittables(Long.MAX_VALUE);
     }
 
     private void emitCommittables(Long checkpointId) throws IOException, 
InterruptedException {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorFactory.java
index f36880d..4ad16c6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorFactory.java
@@ -45,26 +45,16 @@ public final class SinkWriterOperatorFactory<InputT, CommT>
                 YieldingOperatorFactory<CommittableMessage<CommT>> {
 
     private final Sink<InputT> sink;
-    private final boolean isBatchMode;
-    private final boolean isCheckpointingEnabled;
 
-    public SinkWriterOperatorFactory(
-            Sink<InputT> sink, boolean isBatchMode, boolean 
isCheckpointingEnabled) {
+    public SinkWriterOperatorFactory(Sink<InputT> sink) {
         this.sink = checkNotNull(sink);
-        this.isBatchMode = isBatchMode;
-        this.isCheckpointingEnabled = isCheckpointingEnabled;
     }
 
     public <T extends StreamOperator<CommittableMessage<CommT>>> T 
createStreamOperator(
             StreamOperatorParameters<CommittableMessage<CommT>> parameters) {
         try {
             final SinkWriterOperator<InputT, CommT> writerOperator =
-                    new SinkWriterOperator<>(
-                            sink,
-                            processingTimeService,
-                            getMailboxExecutor(),
-                            isBatchMode,
-                            isCheckpointingEnabled);
+                    new SinkWriterOperator<>(sink, processingTimeService, 
getMailboxExecutor());
             writerOperator.setup(
                     parameters.getContainingTask(),
                     parameters.getStreamConfig(),
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index 30d5316..09240c0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -140,8 +140,7 @@ public class SinkTransformationTranslator<Input, Output>
                                 input.transform(
                                         WRITER_NAME,
                                         CommittableMessageTypeInfo.noOutput(),
-                                        new SinkWriterOperatorFactory<>(
-                                                sink, isBatchMode, 
isCheckpointingEnabled)));
+                                        new 
SinkWriterOperatorFactory<>(sink)));
             }
 
             final List<Transformation<?>> sinkTransformations =
@@ -172,8 +171,7 @@ public class SinkTransformationTranslator<Input, Output>
                                     input.transform(
                                             WRITER_NAME,
                                             typeInformation,
-                                            new SinkWriterOperatorFactory<>(
-                                                    sink, isBatchMode, 
isCheckpointingEnabled)));
+                                            new 
SinkWriterOperatorFactory<>(sink)));
 
             DataStream<CommittableMessage<CommT>> precommitted = 
addFailOverRegion(written);
 
@@ -193,7 +191,8 @@ public class SinkTransformationTranslator<Input, Output>
                                             typeInformation,
                                             new CommitterOperatorFactory<>(
                                                     committingSink,
-                                                    isBatchMode || 
isCheckpointingEnabled)));
+                                                    isBatchMode,
+                                                    isCheckpointingEnabled)));
 
             if (sink instanceof WithPostCommitTopology) {
                 DataStream<CommittableMessage<CommT>> postcommitted = 
addFailOverRegion(committed);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
index 06b44dc..204d664 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTest.java
@@ -72,7 +72,7 @@ class CommitterOperatorTest {
                 testHarness =
                         new OneInputStreamOperatorTestHarness<>(
                                 new CommitterOperatorFactory<>(
-                                        (TwoPhaseCommittingSink<?, String>) 
sink, true));
+                                        (TwoPhaseCommittingSink<?, String>) 
sink, false, true));
         testHarness.open();
 
         final CommittableSummary<String> committableSummary =
@@ -105,7 +105,7 @@ class CommitterOperatorTest {
         final ForwardingCommitter committer = new ForwardingCommitter();
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(committer);
+                testHarness = createTestHarness(committer, false, true);
         testHarness.open();
         testHarness.setProcessingTime(0);
 
@@ -146,7 +146,7 @@ class CommitterOperatorTest {
         final ForwardingCommitter committer = new ForwardingCommitter();
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(committer);
+                testHarness = createTestHarness(committer, false, true);
         testHarness.open();
 
         final CommittableSummary<String> committableSummary =
@@ -176,12 +176,13 @@ class CommitterOperatorTest {
         testHarness.close();
     }
 
-    @Test
-    void testEmitAllCommittablesOnEndOfInput() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws 
Exception {
         final ForwardingCommitter committer = new ForwardingCommitter();
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(committer);
+                testHarness = createTestHarness(committer, isBatchMode, 
!isBatchMode);
         testHarness.open();
 
         final CommittableSummary<String> committableSummary =
@@ -197,6 +198,11 @@ class CommitterOperatorTest {
         testHarness.processElement(new StreamRecord<>(second));
 
         testHarness.endInput();
+        if (!isBatchMode) {
+            assertThat(testHarness.getOutput()).hasSize(0);
+            // notify final checkpoint complete
+            testHarness.notifyOfCompletedCheckpoint(1);
+        }
 
         final List<StreamElement> output = fromOutput(testHarness.getOutput());
         assertThat(output).hasSize(3);
@@ -215,7 +221,7 @@ class CommitterOperatorTest {
     void testStateRestore() throws Exception {
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(new 
TestSink.RetryOnceCommitter());
+                testHarness = createTestHarness(new 
TestSink.RetryOnceCommitter(), false, true);
         testHarness.open();
 
         final CommittableSummary<String> committableSummary =
@@ -235,7 +241,7 @@ class CommitterOperatorTest {
         final ForwardingCommitter committer = new ForwardingCommitter();
         final OneInputStreamOperatorTestHarness<
                         CommittableMessage<String>, CommittableMessage<String>>
-                restored = createTestHarness(committer);
+                restored = createTestHarness(committer, false, true);
 
         restored.initializeState(snapshot);
         restored.open();
@@ -271,6 +277,7 @@ class CommitterOperatorTest {
                         new OneInputStreamOperatorTestHarness<>(
                                 new CommitterOperatorFactory<>(
                                         (TwoPhaseCommittingSink<?, String>) 
sink,
+                                        false,
                                         isCheckpointingEnabled));
         testHarness.open();
 
@@ -318,7 +325,11 @@ class CommitterOperatorTest {
 
     private OneInputStreamOperatorTestHarness<
                     CommittableMessage<String>, CommittableMessage<String>>
-            createTestHarness(Committer<String> committer) throws Exception {
+            createTestHarness(
+                    Committer<String> committer,
+                    boolean isBatchMode,
+                    boolean isCheckpointingEnabled)
+                    throws Exception {
         return new OneInputStreamOperatorTestHarness<>(
                 new CommitterOperatorFactory<>(
                         (TwoPhaseCommittingSink<?, String>)
@@ -329,7 +340,8 @@ class CommitterOperatorTest {
                                                 
TestSink.StringCommittableSerializer.INSTANCE)
                                         .build()
                                         .asV2(),
-                        true));
+                        isBatchMode,
+                        isCheckpointingEnabled));
     }
 
     private static class ForwardingCommitter extends TestSink.DefaultCommitter 
{
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
index 3a7b8a9..d71becf 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTest.java
@@ -57,18 +57,13 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class SinkWriterOperatorTest {
 
-    private static final boolean STREAMING_MODE = false;
-    private static final boolean CHECKPOINTING_ENABLED = true;
-
     @Test
     void testNotEmitCommittablesWithoutCommitter() throws Exception {
         final TestSink.DefaultSinkWriter<Integer> sinkWriter = new 
TestSink.DefaultSinkWriter<>();
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
                         new SinkWriterOperatorFactory<>(
-                                
TestSink.newBuilder().setWriter(sinkWriter).build().asV2(),
-                                STREAMING_MODE,
-                                CHECKPOINTING_ENABLED));
+                                
TestSink.newBuilder().setWriter(sinkWriter).build().asV2()));
         testHarness.open();
         testHarness.processElement(1, 1);
 
@@ -90,9 +85,7 @@ class SinkWriterOperatorTest {
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
                         new SinkWriterOperatorFactory<>(
-                                
TestSink.newBuilder().setWriter(writer).build().asV2(),
-                                STREAMING_MODE,
-                                CHECKPOINTING_ENABLED));
+                                
TestSink.newBuilder().setWriter(writer).build().asV2()));
         testHarness.open();
 
         testHarness.processWatermark(initialTime);
@@ -118,9 +111,7 @@ class SinkWriterOperatorTest {
                                         .setDefaultCommitter()
                                         .setWriter(new 
TimeBasedBufferingSinkWriter())
                                         .build()
-                                        .asV2(),
-                                STREAMING_MODE,
-                                CHECKPOINTING_ENABLED));
+                                        .asV2()));
 
         testHarness.open();
 
@@ -148,9 +139,7 @@ class SinkWriterOperatorTest {
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(
                         new SinkWriterOperatorFactory<>(
-                                
TestSink.newBuilder().setDefaultCommitter().build().asV2(),
-                                STREAMING_MODE,
-                                CHECKPOINTING_ENABLED));
+                                
TestSink.newBuilder().setDefaultCommitter().build().asV2()));
 
         testHarness.open();
         assertThat(testHarness.getOutput()).isEmpty();
@@ -169,7 +158,7 @@ class SinkWriterOperatorTest {
     void testEmitOnEndOfInputInBatchMode() throws Exception {
         final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
                 new SinkWriterOperatorFactory<>(
-                        
TestSink.newBuilder().setDefaultCommitter().build().asV2(), true, false);
+                        
TestSink.newBuilder().setDefaultCommitter().build().asV2());
         final OneInputStreamOperatorTestHarness<Integer, 
CommittableMessage<Integer>> testHarness =
                 new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
 
@@ -223,7 +212,7 @@ class SinkWriterOperatorTest {
         restoredTestHarness.prepareSnapshotPreBarrier(checkpointId);
 
         if (stateful) {
-            assertBasicOutput(restoredTestHarness.getOutput(), 2, 
checkpointId);
+            assertBasicOutput(restoredTestHarness.getOutput(), 2, 
Long.MAX_VALUE);
         } else {
             
assertThat(fromOutput(restoredTestHarness.getOutput()).get(0).asRecord().getValue())
                     .isInstanceOf(CommittableSummary.class)
@@ -313,9 +302,7 @@ class SinkWriterOperatorTest {
                                         .setWriter(sinkWriter)
                                         .setDefaultCommitter()
                                         .build()
-                                        .asV2(),
-                                STREAMING_MODE,
-                                isCheckpointingEnabled));
+                                        .asV2()));
         testHarness.open();
         testHarness.processElement(1, 1);
 
@@ -329,10 +316,6 @@ class SinkWriterOperatorTest {
             testHarness.prepareSnapshotPreBarrier(1);
         }
 
-        // Ensure after the final emission no emission is possible anymore to 
prevent empty updates
-        testHarness.prepareSnapshotPreBarrier(2);
-        testHarness.endInput();
-
         assertEmitted(Collections.singletonList(record), 
testHarness.getOutput());
         assertThat(sinkWriter.elements).isEmpty();
 
@@ -373,8 +356,7 @@ class SinkWriterOperatorTest {
             builder.withWriterState();
         }
         final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
-                new SinkWriterOperatorFactory<>(
-                        builder.build().asV2(), STREAMING_MODE, 
CHECKPOINTING_ENABLED);
+                new SinkWriterOperatorFactory<>(builder.build().asV2());
         return new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
     }
 
@@ -391,8 +373,7 @@ class SinkWriterOperatorTest {
             builder.withWriterState();
         }
         final SinkWriterOperatorFactory<Integer, Integer> 
writerOperatorFactory =
-                new SinkWriterOperatorFactory<>(
-                        builder.build().asV2(), STREAMING_MODE, 
CHECKPOINTING_ENABLED);
+                new SinkWriterOperatorFactory<>(builder.build().asV2());
         return new OneInputStreamOperatorTestHarness<>(writerOperatorFactory);
     }
 

Reply via email to