This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7926668fca0d2d63528754458e63a405c228ef92 Author: Dawid Wysakowicz <[email protected]> AuthorDate: Mon Nov 16 10:18:43 2020 +0100 [FLINK-20169] Move emitting MAX_WATERMARK out of the SourceOperator processing loop This commit reverts some of the changes introduced in fada6fb6ac9fd7f6510f1f2d77b6baa06563e222. Instead of checking for END_OF_INPUT in the SourceOperator, I emit the MAX_WATERMARK from SourceOperatorStreamTask#afterInvoke. I check if the Task was cancelled or not. If it was not that means the Task finished succesfully and we emit the MAX_WATERMARK. --- .../streaming/api/operators/SourceOperator.java | 13 ++------- .../runtime/tasks/SourceOperatorStreamTask.java | 8 ++++++ .../api/operators/SourceOperatorTest.java | 20 +------------- .../source/SourceOperatorEventTimeTest.java | 16 +++-------- .../tasks/SourceOperatorStreamTaskTest.java | 32 +++++++++++++++++++++- .../tasks/StreamTaskMailboxTestHarness.java | 13 ++++++--- 6 files changed, 55 insertions(+), 47 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 8ac54ea..c0e6e21 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -44,7 +44,6 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.CollectionUtil; @@ -213,21 +212,13 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> // short circuit the common case (every invocation except the first) if (currentMainOutput != null) { - return pollNextRecord(output); + return sourceReader.pollNext(currentMainOutput); } // this creates a batch or streaming output based on the runtime mode currentMainOutput = eventTimeLogic.createMainOutput(output); lastInvokedOutput = output; - return pollNextRecord(output); - } - - private InputStatus pollNextRecord(DataOutput<OUT> output) throws Exception { - InputStatus inputStatus = sourceReader.pollNext(currentMainOutput); - if (inputStatus == InputStatus.END_OF_INPUT) { - output.emitWatermark(Watermark.MAX_WATERMARK); - } - return inputStatus; + return sourceReader.pollNext(currentMainOutput); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java index c461dca..f32f7e3f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java @@ -63,6 +63,14 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, output.emitWatermark(Watermark.MAX_WATERMARK); } + @Override + protected void afterInvoke() throws Exception { + if (!isCanceled()) { + advanceToEndOfEventTime(); + } + super.afterInvoke(); + } + /** * Implementation of {@link DataOutput} that wraps a specific {@link Output}. */ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index 65228f0..80b9646 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -43,8 +43,6 @@ import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.operators.source.TestingSourceOperator; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; import org.apache.flink.streaming.util.MockOutput; @@ -54,7 +52,6 @@ import org.apache.flink.util.CollectionUtil; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import java.util.ArrayList; import java.util.Arrays; @@ -148,17 +145,6 @@ public class SourceOperatorTest { } @Test - public void testCloseWillSendMaxWatermark() throws Exception { - MockSourceSplit mockSplit = new MockSourceSplit(1, 0, 0); - operator.initializeState(getStateContext(mockSplit)); - PushingAsyncDataInput.DataOutput<Integer> dataOutput = - Mockito.mock(PushingAsyncDataInput.DataOutput.class); - operator.open(); - operator.emitNext(dataOutput); - Mockito.verify(dataOutput, Mockito.times(1)).emitWatermark(Watermark.MAX_WATERMARK); - } - - @Test public void testSnapshotState() throws Exception { StateInitializationContext stateContext = getStateContext(); operator.initializeState(stateContext); @@ -196,13 +182,9 @@ public class SourceOperatorTest { // ---------------- helper methods ------------------------- private StateInitializationContext getStateContext() throws Exception { - return getStateContext(MOCK_SPLIT); - } - - private StateInitializationContext getStateContext(MockSourceSplit mockSplit) throws Exception { // Create a mock split. byte[] serializedSplitWithVersion = SimpleVersionedSerialization - .writeVersionAndSerialize(new MockSourceSplitSerializer(), mockSplit); + .writeVersionAndSerialize(new MockSourceSplitSerializer(), MOCK_SPLIT); // Crate the state context. OperatorStateStore operatorStateStore = createOperatorStateStore(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java index d47d309..34e22eb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java @@ -67,9 +67,7 @@ public class SourceOperatorEventTimeTest { assertThat(result, contains( new Watermark(100L), - new Watermark(120L), - Watermark.MAX_WATERMARK - )); + new Watermark(120L))); } @Test @@ -86,9 +84,7 @@ public class SourceOperatorEventTimeTest { assertThat(result, contains( new Watermark(100L), - new Watermark(120L), - Watermark.MAX_WATERMARK - )); + new Watermark(120L))); } @Test @@ -112,9 +108,7 @@ public class SourceOperatorEventTimeTest { assertThat(result, contains( new Watermark(100L), new Watermark(150L), - new Watermark(200L), - Watermark.MAX_WATERMARK - )); + new Watermark(200L))); } @Test @@ -138,9 +132,7 @@ public class SourceOperatorEventTimeTest { assertThat(result, contains( new Watermark(100L), new Watermark(150L), - new Watermark(200L), - Watermark.MAX_WATERMARK - )); + new Watermark(200L))); } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java index 189bdff..8c4dc0a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java @@ -52,7 +52,10 @@ import java.util.function.Supplier; import java.util.stream.IntStream; import static org.apache.flink.streaming.util.TestHarnessUtil.assertOutputEquals; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; /** * Tests for verifying that the {@link SourceOperator} as a task input can be integrated @@ -99,6 +102,29 @@ public class SourceOperatorStreamTaskTest { } } + @Test + public void testEmittingMaxWatermarkAfterReadingAllRecords() throws Exception { + try (StreamTaskMailboxTestHarness<Integer> testHarness = createTestHarness()) { + testHarness.processWhileAvailable(); + testHarness.finishProcessing(); + + List<Object> expectedOutput = Collections.singletonList( + Watermark.MAX_WATERMARK + ); + assertThat(testHarness.getOutput().toArray(), equalTo(expectedOutput.toArray())); + } + } + + @Test + public void testNotEmittingMaxWatermarkAfterCancelling() throws Exception { + try (StreamTaskMailboxTestHarness<Integer> testHarness = createTestHarness()) { + testHarness.getStreamTask().cancel(); + testHarness.finishProcessing(); + + assertThat(testHarness.getOutput(), hasSize(0)); + } + } + private TaskStateSnapshot executeAndWaitForCheckpoint( long checkpointId, TaskStateSnapshot initialSnapshot, @@ -110,7 +136,7 @@ public class SourceOperatorStreamTaskTest { // Add records to the split and update expected output. addRecords(split, NUM_RECORDS); // Process all the records. - processUntil(testHarness, () -> !testHarness.getStreamTask().inputProcessor.getAvailableFuture().isDone()); + testHarness.processWhileAvailable(); CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(); triggerCheckpointWaitForFinish(testHarness, checkpointId, checkpointOptions); @@ -154,6 +180,10 @@ public class SourceOperatorStreamTaskTest { } while (!condition.get()); } + private StreamTaskMailboxTestHarness<Integer> createTestHarness() throws Exception { + return createTestHarness(0, null); + } + private StreamTaskMailboxTestHarness<Integer> createTestHarness( long checkpointId, TaskStateSnapshot snapshot) throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java index a8761ff..a0eec21 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java @@ -150,12 +150,17 @@ public class StreamTaskMailboxTestHarness<OUT> implements AutoCloseable { } } - @Override - public void close() throws Exception { - streamTask.cancel(); - + public void finishProcessing() throws Exception { streamTask.afterInvoke(); streamTask.cleanUpInvoke(); + } + + @Override + public void close() throws Exception { + if (streamTask.isRunning()) { + streamTask.cancel(); + finishProcessing(); + } streamMockEnvironment.getIOManager().close(); MemoryManager memMan = this.streamMockEnvironment.getMemoryManager();
