This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 63183d8a2a1267f31fdfafebb628bf61409a094e
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Jun 25 14:11:54 2021 +0200

    [FLINK-22972][datastream] Forbid emitting from closed StreamSourceContexts
---
 .../api/operators/StreamSourceContexts.java        | 26 ++++++++++-
 .../StreamSourceOperatorWatermarksTest.java        | 52 ++--------------------
 2 files changed, 29 insertions(+), 49 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index 990585f..5dcf566 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.ScheduledFuture;
@@ -172,6 +173,7 @@ public class StreamSourceContexts {
 
         @Override
         protected void processAndCollect(T element) {
+            checkNotClosed();
             lastRecordTime = this.timeService.getCurrentProcessingTime();
             output.collect(reuse.replace(element, lastRecordTime));
 
@@ -191,6 +193,7 @@ public class StreamSourceContexts {
 
         @Override
         protected void processAndCollectWithTimestamp(T element, long 
timestamp) {
+            checkNotClosed();
             processAndCollect(element);
         }
 
@@ -204,6 +207,7 @@ public class StreamSourceContexts {
         /** This will only be called if allowWatermark returned {@code true}. 
*/
         @Override
         protected void processAndEmitWatermark(Watermark mark) {
+            checkNotClosed();
             nextWatermarkTime = Long.MAX_VALUE;
             output.emitWatermark(mark);
 
@@ -218,6 +222,7 @@ public class StreamSourceContexts {
 
         @Override
         protected void processAndEmitStreamStatus(StreamStatus streamStatus) {
+            checkNotClosed();
             if (idle != streamStatus.isIdle()) {
                 output.emitStreamStatus(streamStatus);
             }
@@ -227,7 +232,6 @@ public class StreamSourceContexts {
         @Override
         public void close() {
             super.close();
-
             final ScheduledFuture<?> nextWatermarkTimer = 
this.nextWatermarkTimer;
             if (nextWatermarkTimer != null) {
                 nextWatermarkTimer.cancel(true);
@@ -316,21 +320,25 @@ public class StreamSourceContexts {
 
         @Override
         protected void processAndCollect(T element) {
+            checkNotClosed();
             output.collect(reuse.replace(element));
         }
 
         @Override
         protected void processAndCollectWithTimestamp(T element, long 
timestamp) {
+            checkNotClosed();
             output.collect(reuse.replace(element, timestamp));
         }
 
         @Override
         protected void processAndEmitWatermark(Watermark mark) {
+            checkNotClosed();
             output.emitWatermark(mark);
         }
 
         @Override
         protected void processAndEmitStreamStatus(StreamStatus streamStatus) {
+            checkNotClosed();
             if (idle != streamStatus.isIdle()) {
                 output.emitStreamStatus(streamStatus);
             }
@@ -364,6 +372,7 @@ public class StreamSourceContexts {
         protected final long idleTimeout;
 
         private ScheduledFuture<?> nextCheck;
+        private boolean closed = false;
 
         /**
          * This flag will be reset to {@code true} every time the next check 
is scheduled. Whenever
@@ -402,6 +411,8 @@ public class StreamSourceContexts {
 
         @Override
         public void collect(T element) {
+            checkNotClosed();
+
             synchronized (checkpointLock) {
                 processAndEmitStreamStatus(StreamStatus.ACTIVE);
 
@@ -415,8 +426,16 @@ public class StreamSourceContexts {
             }
         }
 
+        protected void checkNotClosed() {
+            if (closed) {
+                throw new FlinkRuntimeException("The Source Context has been 
closed already.");
+            }
+        }
+
         @Override
         public void collectWithTimestamp(T element, long timestamp) {
+            checkNotClosed();
+
             synchronized (checkpointLock) {
                 processAndEmitStreamStatus(StreamStatus.ACTIVE);
 
@@ -432,6 +451,8 @@ public class StreamSourceContexts {
 
         @Override
         public void emitWatermark(Watermark mark) {
+            checkNotClosed();
+
             if (allowWatermark(mark)) {
                 synchronized (checkpointLock) {
                     processAndEmitStreamStatus(StreamStatus.ACTIVE);
@@ -449,6 +470,8 @@ public class StreamSourceContexts {
 
         @Override
         public void markAsTemporarilyIdle() {
+            checkNotClosed();
+
             synchronized (checkpointLock) {
                 processAndEmitStreamStatus(StreamStatus.IDLE);
             }
@@ -461,6 +484,7 @@ public class StreamSourceContexts {
 
         @Override
         public void close() {
+            this.closed = true;
             cancelNextIdleDetectionTask();
         }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
index fcdaa4d..6f7377c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
@@ -44,14 +44,12 @@ import 
org.apache.flink.streaming.runtime.tasks.TimerService;
 import org.apache.flink.streaming.util.CollectorOutput;
 import org.apache.flink.streaming.util.MockStreamTask;
 import org.apache.flink.streaming.util.MockStreamTaskBuilder;
-import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -64,7 +62,7 @@ public class StreamSourceOperatorWatermarksTest {
 
     @Test
     public void testEmitMaxWatermarkForFiniteSource() throws Exception {
-        StreamSource<String, ?> sourceOperator = new StreamSource<>(new 
FiniteSource());
+        StreamSource<String, ?> sourceOperator = new StreamSource<>(new 
FiniteSource<>());
         StreamTaskTestHarness<String> testHarness =
                 setupSourceStreamTask(sourceOperator, 
BasicTypeInfo.STRING_TYPE_INFO);
 
@@ -76,23 +74,6 @@ public class StreamSourceOperatorWatermarksTest {
     }
 
     @Test
-    public void testMaxWatermarkIsForwardedLastForFiniteSource() throws 
Exception {
-        StreamSource<String, ?> sourceOperator = new StreamSource<>(new 
FiniteSource(true));
-        StreamTaskTestHarness<String> testHarness =
-                setupSourceStreamTask(sourceOperator, 
BasicTypeInfo.STRING_TYPE_INFO);
-
-        testHarness.invoke();
-        testHarness.waitForTaskCompletion();
-
-        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-        expectedOutput.add(new StreamRecord<>("Hello"));
-        expectedOutput.add(Watermark.MAX_WATERMARK);
-
-        TestHarnessUtil.assertOutputEquals(
-                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
-    }
-
-    @Test
     public void testNoMaxWatermarkOnImmediateCancel() throws Exception {
         StreamSource<String, ?> sourceOperator = new StreamSource<>(new 
InfiniteSource<>());
         StreamTaskTestHarness<String> testHarness =
@@ -245,38 +226,13 @@ public class StreamSourceOperatorWatermarksTest {
 
     // ------------------------------------------------------------------------
 
-    private static final class FiniteSource extends RichSourceFunction<String> 
{
-
-        private transient volatile boolean canceled = false;
-
-        private transient SourceContext<String> context;
-
-        private final boolean outputingARecordWhenClosing;
-
-        public FiniteSource() {
-            this(false);
-        }
-
-        public FiniteSource(boolean outputingARecordWhenClosing) {
-            this.outputingARecordWhenClosing = outputingARecordWhenClosing;
-        }
-
-        @Override
-        public void run(SourceContext<String> ctx) {
-            context = ctx;
-        }
+    private static final class FiniteSource<T> extends RichSourceFunction<T> {
 
         @Override
-        public void close() {
-            if (!canceled && outputingARecordWhenClosing) {
-                context.collect("Hello");
-            }
-        }
+        public void run(SourceContext<T> ctx) {}
 
         @Override
-        public void cancel() {
-            canceled = true;
-        }
+        public void cancel() {}
     }
 
     private static final class InfiniteSource<T> implements SourceFunction<T> {

Reply via email to