This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 63183d8a2a1267f31fdfafebb628bf61409a094e Author: Dawid Wysakowicz <[email protected]> AuthorDate: Fri Jun 25 14:11:54 2021 +0200 [FLINK-22972][datastream] Forbid emitting from closed StreamSourceContexts --- .../api/operators/StreamSourceContexts.java | 26 ++++++++++- .../StreamSourceOperatorWatermarksTest.java | 52 ++-------------------- 2 files changed, 29 insertions(+), 49 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java index 990585f..5dcf566 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import java.util.concurrent.ScheduledFuture; @@ -172,6 +173,7 @@ public class StreamSourceContexts { @Override protected void processAndCollect(T element) { + checkNotClosed(); lastRecordTime = this.timeService.getCurrentProcessingTime(); output.collect(reuse.replace(element, lastRecordTime)); @@ -191,6 +193,7 @@ public class StreamSourceContexts { @Override protected void processAndCollectWithTimestamp(T element, long timestamp) { + checkNotClosed(); processAndCollect(element); } @@ -204,6 +207,7 @@ public class StreamSourceContexts { /** This will only be called if allowWatermark returned {@code true}. */ @Override protected void processAndEmitWatermark(Watermark mark) { + checkNotClosed(); nextWatermarkTime = Long.MAX_VALUE; output.emitWatermark(mark); @@ -218,6 +222,7 @@ public class StreamSourceContexts { @Override protected void processAndEmitStreamStatus(StreamStatus streamStatus) { + checkNotClosed(); if (idle != streamStatus.isIdle()) { output.emitStreamStatus(streamStatus); } @@ -227,7 +232,6 @@ public class StreamSourceContexts { @Override public void close() { super.close(); - final ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer; if (nextWatermarkTimer != null) { nextWatermarkTimer.cancel(true); @@ -316,21 +320,25 @@ public class StreamSourceContexts { @Override protected void processAndCollect(T element) { + checkNotClosed(); output.collect(reuse.replace(element)); } @Override protected void processAndCollectWithTimestamp(T element, long timestamp) { + checkNotClosed(); output.collect(reuse.replace(element, timestamp)); } @Override protected void processAndEmitWatermark(Watermark mark) { + checkNotClosed(); output.emitWatermark(mark); } @Override protected void processAndEmitStreamStatus(StreamStatus streamStatus) { + checkNotClosed(); if (idle != streamStatus.isIdle()) { output.emitStreamStatus(streamStatus); } @@ -364,6 +372,7 @@ public class StreamSourceContexts { protected final long idleTimeout; private ScheduledFuture<?> nextCheck; + private boolean closed = false; /** * This flag will be reset to {@code true} every time the next check is scheduled. Whenever @@ -402,6 +411,8 @@ public class StreamSourceContexts { @Override public void collect(T element) { + checkNotClosed(); + synchronized (checkpointLock) { processAndEmitStreamStatus(StreamStatus.ACTIVE); @@ -415,8 +426,16 @@ public class StreamSourceContexts { } } + protected void checkNotClosed() { + if (closed) { + throw new FlinkRuntimeException("The Source Context has been closed already."); + } + } + @Override public void collectWithTimestamp(T element, long timestamp) { + checkNotClosed(); + synchronized (checkpointLock) { processAndEmitStreamStatus(StreamStatus.ACTIVE); @@ -432,6 +451,8 @@ public class StreamSourceContexts { @Override public void emitWatermark(Watermark mark) { + checkNotClosed(); + if (allowWatermark(mark)) { synchronized (checkpointLock) { processAndEmitStreamStatus(StreamStatus.ACTIVE); @@ -449,6 +470,8 @@ public class StreamSourceContexts { @Override public void markAsTemporarilyIdle() { + checkNotClosed(); + synchronized (checkpointLock) { processAndEmitStreamStatus(StreamStatus.IDLE); } @@ -461,6 +484,7 @@ public class StreamSourceContexts { @Override public void close() { + this.closed = true; cancelNextIdleDetectionTask(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java index fcdaa4d..6f7377c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java @@ -44,14 +44,12 @@ import org.apache.flink.streaming.runtime.tasks.TimerService; import org.apache.flink.streaming.util.CollectorOutput; import org.apache.flink.streaming.util.MockStreamTask; import org.apache.flink.streaming.util.MockStreamTaskBuilder; -import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.ExceptionUtils; import org.junit.Test; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -64,7 +62,7 @@ public class StreamSourceOperatorWatermarksTest { @Test public void testEmitMaxWatermarkForFiniteSource() throws Exception { - StreamSource<String, ?> sourceOperator = new StreamSource<>(new FiniteSource()); + StreamSource<String, ?> sourceOperator = new StreamSource<>(new FiniteSource<>()); StreamTaskTestHarness<String> testHarness = setupSourceStreamTask(sourceOperator, BasicTypeInfo.STRING_TYPE_INFO); @@ -76,23 +74,6 @@ public class StreamSourceOperatorWatermarksTest { } @Test - public void testMaxWatermarkIsForwardedLastForFiniteSource() throws Exception { - StreamSource<String, ?> sourceOperator = new StreamSource<>(new FiniteSource(true)); - StreamTaskTestHarness<String> testHarness = - setupSourceStreamTask(sourceOperator, BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.invoke(); - testHarness.waitForTaskCompletion(); - - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); - expectedOutput.add(new StreamRecord<>("Hello")); - expectedOutput.add(Watermark.MAX_WATERMARK); - - TestHarnessUtil.assertOutputEquals( - "Output was not correct.", expectedOutput, testHarness.getOutput()); - } - - @Test public void testNoMaxWatermarkOnImmediateCancel() throws Exception { StreamSource<String, ?> sourceOperator = new StreamSource<>(new InfiniteSource<>()); StreamTaskTestHarness<String> testHarness = @@ -245,38 +226,13 @@ public class StreamSourceOperatorWatermarksTest { // ------------------------------------------------------------------------ - private static final class FiniteSource extends RichSourceFunction<String> { - - private transient volatile boolean canceled = false; - - private transient SourceContext<String> context; - - private final boolean outputingARecordWhenClosing; - - public FiniteSource() { - this(false); - } - - public FiniteSource(boolean outputingARecordWhenClosing) { - this.outputingARecordWhenClosing = outputingARecordWhenClosing; - } - - @Override - public void run(SourceContext<String> ctx) { - context = ctx; - } + private static final class FiniteSource<T> extends RichSourceFunction<T> { @Override - public void close() { - if (!canceled && outputingARecordWhenClosing) { - context.collect("Hello"); - } - } + public void run(SourceContext<T> ctx) {} @Override - public void cancel() { - canceled = true; - } + public void cancel() {} } private static final class InfiniteSource<T> implements SourceFunction<T> {
