This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b3ea6da49582af66a6182c27cfcfde772dc67dde
Author: sxnan <[email protected]>
AuthorDate: Wed Aug 5 13:00:36 2020 +0800

    [FLINK-18820] Emit MAX_WATERMARK at the end in SourceOperator
    
    Starting from this commit we emit a MAX_WATERMARK when all records are
    produced in SourceOperator or a stop with savepoint was triggered.
---
 .../streaming/api/operators/SourceOperator.java    | 13 ++++-
 .../runtime/tasks/SourceOperatorStreamTask.java    |  8 ++-
 .../api/operators/SourceOperatorTest.java          | 20 +++++++-
 .../source/SourceOperatorEventTimeTest.java        | 12 +++--
 .../tasks/SourceOperatorStreamTaskTest.java        | 57 ++++++++++++++++------
 5 files changed, 87 insertions(+), 23 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 742d7ce..eaf5481 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -42,6 +42,7 @@ import 
org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.CollectionUtil;
@@ -201,13 +202,21 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
 
                // short circuit the common case (every invocation except the 
first)
                if (currentMainOutput != null) {
-                       return sourceReader.pollNext(currentMainOutput);
+                       return pollNextRecord(output);
                }
 
                // this creates a batch or streaming output based on the 
runtime mode
                currentMainOutput = eventTimeLogic.createMainOutput(output);
                lastInvokedOutput = output;
-               return sourceReader.pollNext(currentMainOutput);
+               return pollNextRecord(output);
+       }
+
+       private InputStatus pollNextRecord(DataOutput<OUT> output) throws 
Exception {
+               InputStatus inputStatus = 
sourceReader.pollNext(currentMainOutput);
+               if (inputStatus == InputStatus.END_OF_INPUT) {
+                       output.emitWatermark(Watermark.MAX_WATERMARK);
+               }
+               return inputStatus;
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index 47a1d73..c461dca 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -39,6 +39,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 @Internal
 public class SourceOperatorStreamTask<T> extends StreamTask<T, 
SourceOperator<T, ?>> {
+       private AsyncDataOutputToOutput<T> output;
 
        public SourceOperatorStreamTask(Environment env) throws Exception {
                super(env);
@@ -47,7 +48,7 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
        @Override
        public void init() {
                StreamTaskInput<T> input = new 
StreamTaskSourceInput<>(headOperator);
-               DataOutput<T> output = new AsyncDataOutputToOutput<>(
+               output = new AsyncDataOutputToOutput<>(
                        operatorChain.getChainEntryPoint(),
                        getStreamStatusMaintainer());
 
@@ -57,6 +58,11 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
                        operatorChain);
        }
 
+       @Override
+       protected void advanceToEndOfEventTime() {
+               output.emitWatermark(Watermark.MAX_WATERMARK);
+       }
+
        /**
         * Implementation of {@link DataOutput} that wraps a specific {@link 
Output}.
         */
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 44a5d44..8fbcf19 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -38,10 +38,13 @@ import 
org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.util.CollectionUtil;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -137,6 +140,17 @@ public class SourceOperatorTest {
        }
 
        @Test
+       public void testCloseWillSendMaxWatermark() throws Exception {
+               MockSourceSplit mockSplit = new MockSourceSplit(1, 0, 0);
+               operator.initializeState(getStateContext(mockSplit));
+               PushingAsyncDataInput.DataOutput<Integer> dataOutput =
+                       Mockito.mock(PushingAsyncDataInput.DataOutput.class);
+               operator.open();
+               operator.emitNext(dataOutput);
+               Mockito.verify(dataOutput, 
Mockito.times(1)).emitWatermark(Watermark.MAX_WATERMARK);
+       }
+
+       @Test
        public void testSnapshotState() throws Exception {
                StateInitializationContext stateContext = getStateContext();
                operator.initializeState(stateContext);
@@ -159,9 +173,13 @@ public class SourceOperatorTest {
        // ---------------- helper methods -------------------------
 
        private StateInitializationContext getStateContext() throws Exception {
+               return getStateContext(MOCK_SPLIT);
+       }
+
+       private StateInitializationContext getStateContext(MockSourceSplit 
mockSplit) throws Exception {
                // Create a mock split.
                byte[] serializedSplitWithVersion = SimpleVersionedSerialization
-                       .writeVersionAndSerialize(new 
MockSourceSplitSerializer(), MOCK_SPLIT);
+                       .writeVersionAndSerialize(new 
MockSourceSplitSerializer(), mockSplit);
 
                // Crate the state context.
                OperatorStateStore operatorStateStore = 
createOperatorStateStore();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
index 8d05336..c189001 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
@@ -68,7 +68,8 @@ public class SourceOperatorEventTimeTest {
 
                assertThat(result, contains(
                        new Watermark(100L),
-                       new Watermark(120L)
+                       new Watermark(120L),
+                       Watermark.MAX_WATERMARK
                ));
        }
 
@@ -86,7 +87,8 @@ public class SourceOperatorEventTimeTest {
 
                assertThat(result, contains(
                        new Watermark(100L),
-                       new Watermark(120L)
+                       new Watermark(120L),
+                       Watermark.MAX_WATERMARK
                ));
        }
 
@@ -111,7 +113,8 @@ public class SourceOperatorEventTimeTest {
                assertThat(result, contains(
                        new Watermark(100L),
                        new Watermark(150L),
-                       new Watermark(200L)
+                       new Watermark(200L),
+                       Watermark.MAX_WATERMARK
                ));
        }
 
@@ -136,7 +139,8 @@ public class SourceOperatorEventTimeTest {
                assertThat(result, contains(
                        new Watermark(100L),
                        new Watermark(150L),
-                       new Watermark(200L)
+                       new Watermark(200L),
+                       Watermark.MAX_WATERMARK
                ));
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index 56475ed..189bdff 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -29,12 +29,15 @@ import 
org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.SerializedValue;
 
@@ -78,6 +81,24 @@ public class SourceOperatorStreamTaskTest {
                                IntStream.range(NUM_RECORDS, NUM_RECORDS * 2));
        }
 
+       @Test
+       public void testSnapshotAndAdvanceToEndOfEventTime() throws Exception {
+               final int checkpointId = 1;
+               try (StreamTaskMailboxTestHarness<Integer> testHarness = 
createTestHarness(checkpointId, null)) {
+                       getAndMaybeAssignSplit(testHarness);
+
+                       final CheckpointOptions checkpointOptions = new 
CheckpointOptions(CheckpointType.SYNC_SAVEPOINT,
+                               
CheckpointStorageLocationReference.getDefault());
+                       triggerCheckpointWaitForFinish(testHarness, 
checkpointId, checkpointOptions);
+
+                       Queue<Object> expectedOutput = new LinkedList<>();
+                       expectedOutput.add(Watermark.MAX_WATERMARK);
+                       expectedOutput.add(new CheckpointBarrier(checkpointId, 
checkpointId, checkpointOptions));
+
+                       assertOutputEquals("Output was not correct.", 
expectedOutput, testHarness.getOutput());
+               }
+       }
+
        private TaskStateSnapshot executeAndWaitForCheckpoint(
                        long checkpointId,
                        TaskStateSnapshot initialSnapshot,
@@ -91,22 +112,8 @@ public class SourceOperatorStreamTaskTest {
                        // Process all the records.
                        processUntil(testHarness, () -> 
!testHarness.getStreamTask().inputProcessor.getAvailableFuture().isDone());
 
-                       // Trigger a checkpoint.
                        CheckpointOptions checkpointOptions = 
CheckpointOptions.forCheckpointWithDefaultLocation();
-                       OneShotLatch waitForAcknowledgeLatch = new 
OneShotLatch();
-                       
testHarness.taskStateManager.setWaitForReportLatch(waitForAcknowledgeLatch);
-                       CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, checkpointId);
-                       Future<Boolean> checkpointFuture =
-                                       testHarness
-                                                       .getStreamTask()
-                                                       
.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, false);
-
-                       // Wait until the checkpoint finishes.
-                       // We have to mark the source reader as available here, 
otherwise the runMailboxStep() call after
-                       // checkpiont is completed will block.
-                       getSourceReaderFromTask(testHarness).markAvailable();
-                       processUntil(testHarness, checkpointFuture::isDone);
-                       waitForAcknowledgeLatch.await();
+                       triggerCheckpointWaitForFinish(testHarness, 
checkpointId, checkpointOptions);
 
                        // Build expected output to verify the results
                        Queue<Object> expectedOutput = new LinkedList<>();
@@ -121,6 +128,26 @@ public class SourceOperatorStreamTaskTest {
                }
        }
 
+       private void 
triggerCheckpointWaitForFinish(StreamTaskMailboxTestHarness<Integer> 
testHarness,
+                                                                               
                long checkpointId,
+                                                                               
                CheckpointOptions checkpointOptions) throws Exception {
+               // Trigger a checkpoint.
+               OneShotLatch waitForAcknowledgeLatch = new OneShotLatch();
+               
testHarness.taskStateManager.setWaitForReportLatch(waitForAcknowledgeLatch);
+               CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, checkpointId);
+               Future<Boolean> checkpointFuture =
+                       testHarness
+                               .getStreamTask()
+                               .triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions, true);
+
+               // Wait until the checkpoint finishes.
+               // We have to mark the source reader as available here, 
otherwise the runMailboxStep() call after
+               // checkpiont is completed will block.
+               getSourceReaderFromTask(testHarness).markAvailable();
+               processUntil(testHarness, checkpointFuture::isDone);
+               waitForAcknowledgeLatch.await();
+       }
+
        private void processUntil(StreamTaskMailboxTestHarness testHarness, 
Supplier<Boolean> condition) throws Exception {
                do {
                        testHarness.getStreamTask().runMailboxStep();

Reply via email to