This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7926668fca0d2d63528754458e63a405c228ef92
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Mon Nov 16 10:18:43 2020 +0100

    [FLINK-20169] Move emitting MAX_WATERMARK out of the SourceOperator 
processing loop
    
    This commit reverts some of the changes introduced in
    fada6fb6ac9fd7f6510f1f2d77b6baa06563e222.
    
    Instead of checking for END_OF_INPUT in the SourceOperator, I
    emit the MAX_WATERMARK from SourceOperatorStreamTask#afterInvoke. I check 
if the Task was cancelled or not. If it was not
    that means the Task finished succesfully and we emit the MAX_WATERMARK.
---
 .../streaming/api/operators/SourceOperator.java    | 13 ++-------
 .../runtime/tasks/SourceOperatorStreamTask.java    |  8 ++++++
 .../api/operators/SourceOperatorTest.java          | 20 +-------------
 .../source/SourceOperatorEventTimeTest.java        | 16 +++--------
 .../tasks/SourceOperatorStreamTaskTest.java        | 32 +++++++++++++++++++++-
 .../tasks/StreamTaskMailboxTestHarness.java        | 13 ++++++---
 6 files changed, 55 insertions(+), 47 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 8ac54ea..c0e6e21 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -44,7 +44,6 @@ import 
org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.CollectionUtil;
@@ -213,21 +212,13 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
 
                // short circuit the common case (every invocation except the 
first)
                if (currentMainOutput != null) {
-                       return pollNextRecord(output);
+                       return sourceReader.pollNext(currentMainOutput);
                }
 
                // this creates a batch or streaming output based on the 
runtime mode
                currentMainOutput = eventTimeLogic.createMainOutput(output);
                lastInvokedOutput = output;
-               return pollNextRecord(output);
-       }
-
-       private InputStatus pollNextRecord(DataOutput<OUT> output) throws 
Exception {
-               InputStatus inputStatus = 
sourceReader.pollNext(currentMainOutput);
-               if (inputStatus == InputStatus.END_OF_INPUT) {
-                       output.emitWatermark(Watermark.MAX_WATERMARK);
-               }
-               return inputStatus;
+               return sourceReader.pollNext(currentMainOutput);
        }
 
        @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index c461dca..f32f7e3f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -63,6 +63,14 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
                output.emitWatermark(Watermark.MAX_WATERMARK);
        }
 
+       @Override
+       protected void afterInvoke() throws Exception {
+               if (!isCanceled()) {
+                       advanceToEndOfEventTime();
+               }
+               super.afterInvoke();
+       }
+
        /**
         * Implementation of {@link DataOutput} that wraps a specific {@link 
Output}.
         */
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 65228f0..80b9646 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -43,8 +43,6 @@ import 
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
 import org.apache.flink.streaming.util.MockOutput;
@@ -54,7 +52,6 @@ import org.apache.flink.util.CollectionUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -148,17 +145,6 @@ public class SourceOperatorTest {
        }
 
        @Test
-       public void testCloseWillSendMaxWatermark() throws Exception {
-               MockSourceSplit mockSplit = new MockSourceSplit(1, 0, 0);
-               operator.initializeState(getStateContext(mockSplit));
-               PushingAsyncDataInput.DataOutput<Integer> dataOutput =
-                       Mockito.mock(PushingAsyncDataInput.DataOutput.class);
-               operator.open();
-               operator.emitNext(dataOutput);
-               Mockito.verify(dataOutput, 
Mockito.times(1)).emitWatermark(Watermark.MAX_WATERMARK);
-       }
-
-       @Test
        public void testSnapshotState() throws Exception {
                StateInitializationContext stateContext = getStateContext();
                operator.initializeState(stateContext);
@@ -196,13 +182,9 @@ public class SourceOperatorTest {
        // ---------------- helper methods -------------------------
 
        private StateInitializationContext getStateContext() throws Exception {
-               return getStateContext(MOCK_SPLIT);
-       }
-
-       private StateInitializationContext getStateContext(MockSourceSplit 
mockSplit) throws Exception {
                // Create a mock split.
                byte[] serializedSplitWithVersion = SimpleVersionedSerialization
-                       .writeVersionAndSerialize(new 
MockSourceSplitSerializer(), mockSplit);
+                       .writeVersionAndSerialize(new 
MockSourceSplitSerializer(), MOCK_SPLIT);
 
                // Crate the state context.
                OperatorStateStore operatorStateStore = 
createOperatorStateStore();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
index d47d309..34e22eb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
@@ -67,9 +67,7 @@ public class SourceOperatorEventTimeTest {
 
                assertThat(result, contains(
                        new Watermark(100L),
-                       new Watermark(120L),
-                       Watermark.MAX_WATERMARK
-               ));
+                       new Watermark(120L)));
        }
 
        @Test
@@ -86,9 +84,7 @@ public class SourceOperatorEventTimeTest {
 
                assertThat(result, contains(
                        new Watermark(100L),
-                       new Watermark(120L),
-                       Watermark.MAX_WATERMARK
-               ));
+                       new Watermark(120L)));
        }
 
        @Test
@@ -112,9 +108,7 @@ public class SourceOperatorEventTimeTest {
                assertThat(result, contains(
                        new Watermark(100L),
                        new Watermark(150L),
-                       new Watermark(200L),
-                       Watermark.MAX_WATERMARK
-               ));
+                       new Watermark(200L)));
        }
 
        @Test
@@ -138,9 +132,7 @@ public class SourceOperatorEventTimeTest {
                assertThat(result, contains(
                        new Watermark(100L),
                        new Watermark(150L),
-                       new Watermark(200L),
-                       Watermark.MAX_WATERMARK
-               ));
+                       new Watermark(200L)));
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index 189bdff..8c4dc0a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -52,7 +52,10 @@ import java.util.function.Supplier;
 import java.util.stream.IntStream;
 
 import static 
org.apache.flink.streaming.util.TestHarnessUtil.assertOutputEquals;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 /**
  * Tests for verifying that the {@link SourceOperator} as a task input can be 
integrated
@@ -99,6 +102,29 @@ public class SourceOperatorStreamTaskTest {
                }
        }
 
+       @Test
+       public void testEmittingMaxWatermarkAfterReadingAllRecords() throws 
Exception {
+               try (StreamTaskMailboxTestHarness<Integer> testHarness = 
createTestHarness()) {
+                       testHarness.processWhileAvailable();
+                       testHarness.finishProcessing();
+
+                       List<Object> expectedOutput = Collections.singletonList(
+                               Watermark.MAX_WATERMARK
+                       );
+                       assertThat(testHarness.getOutput().toArray(), 
equalTo(expectedOutput.toArray()));
+               }
+       }
+
+       @Test
+       public void testNotEmittingMaxWatermarkAfterCancelling() throws 
Exception {
+               try (StreamTaskMailboxTestHarness<Integer> testHarness = 
createTestHarness()) {
+                       testHarness.getStreamTask().cancel();
+                       testHarness.finishProcessing();
+
+                       assertThat(testHarness.getOutput(), hasSize(0));
+               }
+       }
+
        private TaskStateSnapshot executeAndWaitForCheckpoint(
                        long checkpointId,
                        TaskStateSnapshot initialSnapshot,
@@ -110,7 +136,7 @@ public class SourceOperatorStreamTaskTest {
                        // Add records to the split and update expected output.
                        addRecords(split, NUM_RECORDS);
                        // Process all the records.
-                       processUntil(testHarness, () -> 
!testHarness.getStreamTask().inputProcessor.getAvailableFuture().isDone());
+                       testHarness.processWhileAvailable();
 
                        CheckpointOptions checkpointOptions = 
CheckpointOptions.forCheckpointWithDefaultLocation();
                        triggerCheckpointWaitForFinish(testHarness, 
checkpointId, checkpointOptions);
@@ -154,6 +180,10 @@ public class SourceOperatorStreamTaskTest {
                } while (!condition.get());
        }
 
+       private StreamTaskMailboxTestHarness<Integer> createTestHarness() 
throws Exception {
+               return createTestHarness(0, null);
+       }
+
        private StreamTaskMailboxTestHarness<Integer> createTestHarness(
                        long checkpointId,
                        TaskStateSnapshot snapshot) throws Exception {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
index a8761ff..a0eec21 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
@@ -150,12 +150,17 @@ public class StreamTaskMailboxTestHarness<OUT> implements 
AutoCloseable {
                }
        }
 
-       @Override
-       public void close() throws Exception {
-               streamTask.cancel();
-
+       public void finishProcessing() throws Exception {
                streamTask.afterInvoke();
                streamTask.cleanUpInvoke();
+       }
+
+       @Override
+       public void close() throws Exception {
+               if (streamTask.isRunning()) {
+                       streamTask.cancel();
+                       finishProcessing();
+               }
 
                streamMockEnvironment.getIOManager().close();
                MemoryManager memMan = 
this.streamMockEnvironment.getMemoryManager();

Reply via email to