This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 37ccdbe67ccb4e9d568f06214314a9afd9dae379 Author: sxnan <[email protected]> AuthorDate: Wed Aug 5 13:00:36 2020 +0800 [FLINK-18820] Emit MAX_WATERMARK at the end in SourceOperator Starting from this commit we emit a MAX_WATERMARK when all records are produced in SourceOperator or a stop with savepoint was triggered. --- .../streaming/api/operators/SourceOperator.java | 13 ++++- .../runtime/tasks/SourceOperatorStreamTask.java | 8 ++- .../api/operators/SourceOperatorTest.java | 20 +++++++- .../source/SourceOperatorEventTimeTest.java | 12 +++-- .../tasks/SourceOperatorStreamTaskTest.java | 57 ++++++++++++++++------ 5 files changed, 87 insertions(+), 23 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 742d7ce..eaf5481 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.CollectionUtil; @@ -201,13 +202,21 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> // short circuit the common case (every invocation except the first) if (currentMainOutput != null) { - return sourceReader.pollNext(currentMainOutput); + return pollNextRecord(output); } // this creates a batch or streaming output based on the runtime mode currentMainOutput = eventTimeLogic.createMainOutput(output); lastInvokedOutput = output; - return sourceReader.pollNext(currentMainOutput); + return pollNextRecord(output); + } + + private InputStatus pollNextRecord(DataOutput<OUT> output) throws Exception { + InputStatus inputStatus = sourceReader.pollNext(currentMainOutput); + if (inputStatus == InputStatus.END_OF_INPUT) { + output.emitWatermark(Watermark.MAX_WATERMARK); + } + return inputStatus; } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java index 47a1d73..c461dca 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java @@ -39,6 +39,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ @Internal public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, ?>> { + private AsyncDataOutputToOutput<T> output; public SourceOperatorStreamTask(Environment env) throws Exception { super(env); @@ -47,7 +48,7 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, @Override public void init() { StreamTaskInput<T> input = new StreamTaskSourceInput<>(headOperator); - DataOutput<T> output = new AsyncDataOutputToOutput<>( + output = new AsyncDataOutputToOutput<>( operatorChain.getChainEntryPoint(), getStreamStatusMaintainer()); @@ -57,6 +58,11 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, operatorChain); } + @Override + protected void advanceToEndOfEventTime() { + output.emitWatermark(Watermark.MAX_WATERMARK); + } + /** * Implementation of {@link DataOutput} that wraps a specific {@link Output}. */ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index 44a5d44..8fbcf19 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -38,10 +38,13 @@ import org.apache.flink.runtime.state.StateInitializationContextImpl; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.operators.source.TestingSourceOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; import org.apache.flink.util.CollectionUtil; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.util.Arrays; import java.util.Collections; @@ -137,6 +140,17 @@ public class SourceOperatorTest { } @Test + public void testCloseWillSendMaxWatermark() throws Exception { + MockSourceSplit mockSplit = new MockSourceSplit(1, 0, 0); + operator.initializeState(getStateContext(mockSplit)); + PushingAsyncDataInput.DataOutput<Integer> dataOutput = + Mockito.mock(PushingAsyncDataInput.DataOutput.class); + operator.open(); + operator.emitNext(dataOutput); + Mockito.verify(dataOutput, Mockito.times(1)).emitWatermark(Watermark.MAX_WATERMARK); + } + + @Test public void testSnapshotState() throws Exception { StateInitializationContext stateContext = getStateContext(); operator.initializeState(stateContext); @@ -159,9 +173,13 @@ public class SourceOperatorTest { // ---------------- helper methods ------------------------- private StateInitializationContext getStateContext() throws Exception { + return getStateContext(MOCK_SPLIT); + } + + private StateInitializationContext getStateContext(MockSourceSplit mockSplit) throws Exception { // Create a mock split. byte[] serializedSplitWithVersion = SimpleVersionedSerialization - .writeVersionAndSerialize(new MockSourceSplitSerializer(), MOCK_SPLIT); + .writeVersionAndSerialize(new MockSourceSplitSerializer(), mockSplit); // Crate the state context. OperatorStateStore operatorStateStore = createOperatorStateStore(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java index 8d05336..c189001 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java @@ -68,7 +68,8 @@ public class SourceOperatorEventTimeTest { assertThat(result, contains( new Watermark(100L), - new Watermark(120L) + new Watermark(120L), + Watermark.MAX_WATERMARK )); } @@ -86,7 +87,8 @@ public class SourceOperatorEventTimeTest { assertThat(result, contains( new Watermark(100L), - new Watermark(120L) + new Watermark(120L), + Watermark.MAX_WATERMARK )); } @@ -111,7 +113,8 @@ public class SourceOperatorEventTimeTest { assertThat(result, contains( new Watermark(100L), new Watermark(150L), - new Watermark(200L) + new Watermark(200L), + Watermark.MAX_WATERMARK )); } @@ -136,7 +139,8 @@ public class SourceOperatorEventTimeTest { assertThat(result, contains( new Watermark(100L), new Watermark(150L), - new Watermark(200L) + new Watermark(200L), + Watermark.MAX_WATERMARK )); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java index 56475ed..189bdff 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java @@ -29,12 +29,15 @@ import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.source.event.AddSplitEvent; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.streaming.api.operators.SourceOperator; import org.apache.flink.streaming.api.operators.SourceOperatorFactory; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.SerializedValue; @@ -78,6 +81,24 @@ public class SourceOperatorStreamTaskTest { IntStream.range(NUM_RECORDS, NUM_RECORDS * 2)); } + @Test + public void testSnapshotAndAdvanceToEndOfEventTime() throws Exception { + final int checkpointId = 1; + try (StreamTaskMailboxTestHarness<Integer> testHarness = createTestHarness(checkpointId, null)) { + getAndMaybeAssignSplit(testHarness); + + final CheckpointOptions checkpointOptions = new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, + CheckpointStorageLocationReference.getDefault()); + triggerCheckpointWaitForFinish(testHarness, checkpointId, checkpointOptions); + + Queue<Object> expectedOutput = new LinkedList<>(); + expectedOutput.add(Watermark.MAX_WATERMARK); + expectedOutput.add(new CheckpointBarrier(checkpointId, checkpointId, checkpointOptions)); + + assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + } + } + private TaskStateSnapshot executeAndWaitForCheckpoint( long checkpointId, TaskStateSnapshot initialSnapshot, @@ -91,22 +112,8 @@ public class SourceOperatorStreamTaskTest { // Process all the records. processUntil(testHarness, () -> !testHarness.getStreamTask().inputProcessor.getAvailableFuture().isDone()); - // Trigger a checkpoint. CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(); - OneShotLatch waitForAcknowledgeLatch = new OneShotLatch(); - testHarness.taskStateManager.setWaitForReportLatch(waitForAcknowledgeLatch); - CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointId); - Future<Boolean> checkpointFuture = - testHarness - .getStreamTask() - .triggerCheckpointAsync(checkpointMetaData, checkpointOptions, false); - - // Wait until the checkpoint finishes. - // We have to mark the source reader as available here, otherwise the runMailboxStep() call after - // checkpiont is completed will block. - getSourceReaderFromTask(testHarness).markAvailable(); - processUntil(testHarness, checkpointFuture::isDone); - waitForAcknowledgeLatch.await(); + triggerCheckpointWaitForFinish(testHarness, checkpointId, checkpointOptions); // Build expected output to verify the results Queue<Object> expectedOutput = new LinkedList<>(); @@ -121,6 +128,26 @@ public class SourceOperatorStreamTaskTest { } } + private void triggerCheckpointWaitForFinish(StreamTaskMailboxTestHarness<Integer> testHarness, + long checkpointId, + CheckpointOptions checkpointOptions) throws Exception { + // Trigger a checkpoint. + OneShotLatch waitForAcknowledgeLatch = new OneShotLatch(); + testHarness.taskStateManager.setWaitForReportLatch(waitForAcknowledgeLatch); + CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, checkpointId); + Future<Boolean> checkpointFuture = + testHarness + .getStreamTask() + .triggerCheckpointAsync(checkpointMetaData, checkpointOptions, true); + + // Wait until the checkpoint finishes. + // We have to mark the source reader as available here, otherwise the runMailboxStep() call after + // checkpiont is completed will block. + getSourceReaderFromTask(testHarness).markAvailable(); + processUntil(testHarness, checkpointFuture::isDone); + waitForAcknowledgeLatch.await(); + } + private void processUntil(StreamTaskMailboxTestHarness testHarness, Supplier<Boolean> condition) throws Exception { do { testHarness.getStreamTask().runMailboxStep();
