[FLINK-5716] [streaming] Make StreamSourceContexts aware of source idleness
This closes #3347. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0f0f372 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0f0f372 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0f0f372 Branch: refs/heads/master Commit: b0f0f3722fac4726fba879736c7ee85993b392db Parents: 646490c Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Fri Feb 17 02:43:44 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Thu Feb 23 01:19:58 2017 +0800 ---------------------------------------------------------------------- .../connectors/kafka/Kafka010FetcherTest.java | 7 +- .../connectors/kafka/Kafka09FetcherTest.java | 5 + .../AbstractFetcherTimestampsTest.java | 6 + .../connectors/rabbitmq/RMQSourceTest.java | 6 + .../flink/storm/wrappers/TestContext.java | 7 +- .../hdfstests/ContinuousFileProcessingTest.java | 4 + .../source/ContinuousFileReaderOperator.java | 8 +- .../api/functions/source/SourceFunction.java | 14 + .../streaming/api/operators/StreamSource.java | 22 +- .../api/operators/StreamSourceContexts.java | 350 +++++++++++++++---- .../runtime/io/StreamInputProcessor.java | 10 +- .../runtime/io/StreamTwoInputProcessor.java | 20 +- .../streamstatus/StreamStatusMaintainer.java | 36 ++ .../runtime/tasks/OneInputStreamTask.java | 6 +- .../streaming/runtime/tasks/OperatorChain.java | 6 +- .../runtime/tasks/SourceStreamTask.java | 2 +- .../streaming/runtime/tasks/StreamTask.java | 5 + .../runtime/tasks/TwoInputStreamTask.java | 6 +- .../api/functions/ListSourceContext.java | 7 +- .../functions/StatefulSequenceSourceTest.java | 6 + .../source/FileMonitoringFunctionTest.java | 5 +- .../source/InputFormatSourceFunctionTest.java | 5 + .../source/SocketTextStreamFunctionTest.java | 11 +- .../AbstractUdfStreamOperatorLifecycleTest.java | 7 +- .../StreamSourceContextIdleDetectionTests.java | 325 +++++++++++++++++ .../operators/StreamSourceOperatorTest.java | 25 +- .../streaming/runtime/tasks/StreamTaskTest.java | 5 +- .../util/AbstractStreamOperatorTestHarness.java | 19 + .../streaming/util/CollectingSourceContext.java | 5 + 29 files changed, 814 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java index 3bc154e..5718986 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java @@ -447,7 +447,12 @@ public class Kafka010FetcherTest { block(); } - @Override + @Override + public void markAsTemporarilyIdle() { + throw new UnsupportedOperationException(); + } + + @Override public Object getCheckpointLock() { return new Object(); } http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java index 4526aa0..abd75cc 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java @@ -447,6 +447,11 @@ public class Kafka09FetcherTest { } @Override + public void markAsTemporarilyIdle() { + throw new UnsupportedOperationException(); + } + + @Override public Object getCheckpointLock() { return new Object(); } http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java index f2091f0..6887518 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java @@ -271,6 +271,12 @@ public class AbstractFetcherTimestampsTest { } } + + @Override + public void markAsTemporarilyIdle() { + throw new UnsupportedOperationException(); + } + @Override public Object getCheckpointLock() { return checkpointLock; http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java index 8474f8a..26434ed 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java +++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java @@ -443,6 +443,12 @@ public class RMQSourceTest { @Override public void emitWatermark(Watermark mark) { + throw new UnsupportedOperationException(); + } + + @Override + public void markAsTemporarilyIdle() { + throw new UnsupportedOperationException(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java index 4c4749a..58aad7b 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java @@ -41,7 +41,12 @@ class TestContext implements SourceContext<Tuple1<Integer>> { @Override public void emitWatermark(Watermark mark) { - // ignore it + throw new UnsupportedOperationException(); + } + + @Override + public void markAsTemporarilyIdle() { + throw new UnsupportedOperationException(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java index cc5cb8e..f579345 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java @@ -1001,6 +1001,10 @@ public class ContinuousFileProcessingTest { } @Override + public void markAsTemporarilyIdle() { + } + + @Override public Object getCheckpointLock() { return lock; } http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index ab1ad1d..b86d97c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -136,7 +136,13 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); this.readerContext = StreamSourceContexts.getSourceContext( - timeCharacteristic, getProcessingTimeService(), checkpointLock, output, watermarkInterval); + timeCharacteristic, + getProcessingTimeService(), + checkpointLock, + getContainingTask().getStreamStatusMaintainer(), + output, + watermarkInterval, + -1); // and initialize the split reading thread this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, restoredReaderState); http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java index f1619b2..fc7f793 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java @@ -216,6 +216,20 @@ public interface SourceFunction<T> extends Function, Serializable { @PublicEvolving void emitWatermark(Watermark mark); + /** + * Marks the source to be temporarily idle. This tells the system that this source will + * temporarily stop emitting records and watermarks for an indefinite amount of time. This + * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and + * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their + * watermarks without the need to wait for watermarks from this source while it is idle. + * + * <p>Source functions should make a best effort to call this method as soon as they + * acknowledge themselves to be idle. The system will consider the source to resume activity + * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)}, + * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source. + */ + @PublicEvolving + void markAsTemporarilyIdle(); /** * Returns the checkpoint lock. Please refer to the class-level comment in http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 84330b6..36f7c6a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -51,12 +52,15 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> this.chainingStrategy = ChainingStrategy.HEAD; } - public void run(final Object lockingObject) throws Exception { - run(lockingObject, output); + public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception { + run(lockingObject, streamStatusMaintainer, output); } - public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception { + public void run(final Object lockingObject, + final StreamStatusMaintainer streamStatusMaintainer, + final Output<StreamRecord<OUT>> collector) throws Exception { + final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); LatencyMarksEmitter latencyEmitter = null; @@ -68,11 +72,17 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> getOperatorConfig().getVertexID(), getRuntimeContext().getIndexOfThisSubtask()); } - + final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); this.ctx = StreamSourceContexts.getSourceContext( - timeCharacteristic, getProcessingTimeService(), lockingObject, collector, watermarkInterval); + timeCharacteristic, + getProcessingTimeService(), + lockingObject, + streamStatusMaintainer, + collector, + watermarkInterval, + -1); try { userFunction.run(ctx); @@ -108,7 +118,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> /** * Marks this source as canceled or stopped. * - * <p>This indicates that any exit of the {@link #run(Object, Output)} method + * <p>This indicates that any exit of the {@link #run(Object, StreamStatusMaintainer, Output)} method * cannot be interpreted as the result of a finite source. */ protected void markCanceledOrStopped() { http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java index a6a273f..98281c4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java @@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -42,16 +44,34 @@ public class StreamSourceContexts { * </ul> * */ public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext( - TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService, - Object checkpointLock, Output<StreamRecord<OUT>> output, long watermarkInterval) { + TimeCharacteristic timeCharacteristic, + ProcessingTimeService processingTimeService, + Object checkpointLock, + StreamStatusMaintainer streamStatusMaintainer, + Output<StreamRecord<OUT>> output, + long watermarkInterval, + long idleTimeout) { final SourceFunction.SourceContext<OUT> ctx; switch (timeCharacteristic) { case EventTime: - ctx = new ManualWatermarkContext<>(checkpointLock, output); + ctx = new ManualWatermarkContext<>( + output, + processingTimeService, + checkpointLock, + streamStatusMaintainer, + idleTimeout); + break; case IngestionTime: - ctx = new AutomaticWatermarkContext<>(processingTimeService, checkpointLock, output, watermarkInterval); + ctx = new AutomaticWatermarkContext<>( + output, + watermarkInterval, + processingTimeService, + checkpointLock, + streamStatusMaintainer, + idleTimeout); + break; case ProcessingTime: ctx = new NonTimestampContext<>(checkpointLock, output); @@ -97,6 +117,11 @@ public class StreamSourceContexts { } @Override + public void markAsTemporarilyIdle() { + // do nothing + } + + @Override public Object getCheckpointLock() { return lock; } @@ -109,10 +134,8 @@ public class StreamSourceContexts { * {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps * and watermark emission. */ - private static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> { + private static class AutomaticWatermarkContext<T> extends WatermarkContext<T> { - private final ProcessingTimeService timeService; - private final Object lock; private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; @@ -121,14 +144,18 @@ public class StreamSourceContexts { private volatile ScheduledFuture<?> nextWatermarkTimer; private volatile long nextWatermarkTime; + private long lastRecordTime; + private AutomaticWatermarkContext( - final ProcessingTimeService timeService, - final Object checkpointLock, - final Output<StreamRecord<T>> output, - final long watermarkInterval) { + final Output<StreamRecord<T>> output, + final long watermarkInterval, + final ProcessingTimeService timeService, + final Object checkpointLock, + final StreamStatusMaintainer streamStatusMaintainer, + final long idleTimeout) { + + super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout); - this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null."); - this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null."); this.output = Preconditions.checkNotNull(output, "The output cannot be null."); Preconditions.checkArgument(watermarkInterval >= 1L, "The watermark interval cannot be smaller than 1 ms."); @@ -136,63 +163,62 @@ public class StreamSourceContexts { this.reuse = new StreamRecord<>(null); + this.lastRecordTime = Long.MIN_VALUE; + long now = this.timeService.getCurrentProcessingTime(); this.nextWatermarkTimer = this.timeService.registerTimer(now + watermarkInterval, - new WatermarkEmittingTask(this.timeService, lock, output)); + new WatermarkEmittingTask(this.timeService, checkpointLock, output)); } @Override - public void collect(T element) { - synchronized (lock) { - final long currentTime = this.timeService.getCurrentProcessingTime(); - output.collect(reuse.replace(element, currentTime)); - - // this is to avoid lock contention in the lockingObject by - // sending the watermark before the firing of the watermark - // emission task. - - if (currentTime > nextWatermarkTime) { - // in case we jumped some watermarks, recompute the next watermark time - final long watermarkTime = currentTime - (currentTime % watermarkInterval); - nextWatermarkTime = watermarkTime + watermarkInterval; - output.emitWatermark(new Watermark(watermarkTime)); - - // we do not need to register another timer here - // because the emitting task will do so. - } + protected void processAndCollect(T element) { + lastRecordTime = this.timeService.getCurrentProcessingTime(); + output.collect(reuse.replace(element, lastRecordTime)); + + // this is to avoid lock contention in the lockingObject by + // sending the watermark before the firing of the watermark + // emission task. + if (lastRecordTime > nextWatermarkTime) { + // in case we jumped some watermarks, recompute the next watermark time + final long watermarkTime = lastRecordTime - (lastRecordTime % watermarkInterval); + nextWatermarkTime = watermarkTime + watermarkInterval; + output.emitWatermark(new Watermark(watermarkTime)); + + // we do not need to register another timer here + // because the emitting task will do so. } } @Override - public void collectWithTimestamp(T element, long timestamp) { - collect(element); + protected void processAndCollectWithTimestamp(T element, long timestamp) { + processAndCollect(element); } @Override - public void emitWatermark(Watermark mark) { - - if (mark.getTimestamp() == Long.MAX_VALUE) { - // allow it since this is the special end-watermark that for example the Kafka source emits - synchronized (lock) { - nextWatermarkTime = Long.MAX_VALUE; - output.emitWatermark(mark); - } - - // we can shutdown the timer now, no watermarks will be needed any more - final ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer; - if (nextWatermarkTimer != null) { - nextWatermarkTimer.cancel(true); - } - } + protected boolean allowWatermark(Watermark mark) { + // allow Long.MAX_VALUE since this is the special end-watermark that for example the Kafka source emits + return mark.getTimestamp() == Long.MAX_VALUE && nextWatermarkTime != Long.MAX_VALUE; } + /** This will only be called if allowWatermark returned {@code true} */ @Override - public Object getCheckpointLock() { - return lock; + protected void processAndEmitWatermark(Watermark mark) { + nextWatermarkTime = Long.MAX_VALUE; + output.emitWatermark(mark); + + // we can shutdown the watermark timer now, no watermarks will be needed any more. + // Note that this procedure actually doesn't need to be synchronized with the lock, + // but since it's only a one-time thing, doesn't hurt either + final ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer; + if (nextWatermarkTimer != null) { + nextWatermarkTimer.cancel(true); + } } @Override public void close() { + super.close(); + final ScheduledFuture<?> nextWatermarkTimer = this.nextWatermarkTimer; if (nextWatermarkTimer != null) { nextWatermarkTimer.cancel(true); @@ -218,14 +244,23 @@ public class StreamSourceContexts { public void onProcessingTime(long timestamp) { final long currentTime = timeService.getCurrentProcessingTime(); - if (currentTime > nextWatermarkTime) { - // align the watermarks across all machines. this will ensure that we - // don't have watermarks that creep along at different intervals because - // the machine clocks are out of sync - final long watermarkTime = currentTime - (currentTime % watermarkInterval); + synchronized (lock) { + // we should continue to automatically emit watermarks if we are active + if (streamStatusMaintainer.getStreamStatus().isActive()) { + if (idleTimeout != -1 && currentTime - lastRecordTime > idleTimeout) { + // if we are configured to detect idleness, piggy-back the idle detection check on the + // watermark interval, so that we may possibly discover idle sources faster before waiting + // for the next idle check to fire + markAsTemporarilyIdle(); + + // no need to finish the next check, as we are now idle. + cancelNextIdleDetectionTask(); + } else if (currentTime > nextWatermarkTime) { + // align the watermarks across all machines. this will ensure that we + // don't have watermarks that creep along at different intervals because + // the machine clocks are out of sync + final long watermarkTime = currentTime - (currentTime % watermarkInterval); - synchronized (lock) { - if (currentTime > nextWatermarkTime) { output.emitWatermark(new Watermark(watermarkTime)); nextWatermarkTime = watermarkTime + watermarkInterval; } @@ -247,45 +282,220 @@ public class StreamSourceContexts { * Streaming topologies can use timestamp assigner functions to override the timestamps * assigned here. */ - private static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> { + private static class ManualWatermarkContext<T> extends WatermarkContext<T> { - private final Object lock; private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; - private ManualWatermarkContext(Object checkpointLock, Output<StreamRecord<T>> output) { - this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null."); + private ManualWatermarkContext( + final Output<StreamRecord<T>> output, + final ProcessingTimeService timeService, + final Object checkpointLock, + final StreamStatusMaintainer streamStatusMaintainer, + final long idleTimeout) { + + super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout); + this.output = Preconditions.checkNotNull(output, "The output cannot be null."); this.reuse = new StreamRecord<>(null); } @Override + protected void processAndCollect(T element) { + output.collect(reuse.replace(element)); + } + + @Override + protected void processAndCollectWithTimestamp(T element, long timestamp) { + output.collect(reuse.replace(element, timestamp)); + } + + @Override + protected void processAndEmitWatermark(Watermark mark) { + output.emitWatermark(mark); + } + + @Override + protected boolean allowWatermark(Watermark mark) { + return true; + } + } + + /** + * An abstract {@link SourceFunction.SourceContext} that should be used as the base for + * stream source contexts that are relevant with {@link Watermark}s. + * + * Stream source contexts that are relevant with watermarks are responsible of manipulating + * the current {@link StreamStatus}, so that stream status can be correctly propagated + * downstream. Please refer to the class-level documentation of {@link StreamStatus} for + * information on how stream status affects watermark advancement at downstream tasks. + * + * This class implements the logic of idleness detection. It fires idleness detection + * tasks at a given interval; if no records or watermarks were collected by the source context + * between 2 consecutive checks, it determines the source to be IDLE and correspondingly + * toggles the status. ACTIVE status resumes as soon as some record or watermark is collected + * again. + */ + private static abstract class WatermarkContext<T> implements SourceFunction.SourceContext<T> { + + protected final ProcessingTimeService timeService; + protected final Object checkpointLock; + protected final StreamStatusMaintainer streamStatusMaintainer; + protected final long idleTimeout; + + private ScheduledFuture<?> nextCheck; + + /** + * This flag will be reset to {@code true} every time the next check is scheduled. + * Whenever a record or watermark is collected, the flag will be set to {@code false}. + * + * When the scheduled check is fired, if the flag remains to be {@code true}, the check will fail, + * and our current status will determined to be IDLE. + */ + private volatile boolean failOnNextCheck; + + /** + * Create a watermark context. + * + * @param timeService the time service to schedule idleness detection tasks + * @param checkpointLock the checkpoint lock + * @param streamStatusMaintainer the stream status maintainer to toggle and retrieve current status + * @param idleTimeout (-1 if idleness checking is disabled) + */ + public WatermarkContext( + final ProcessingTimeService timeService, + final Object checkpointLock, + final StreamStatusMaintainer streamStatusMaintainer, + final long idleTimeout) { + + this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null."); + this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "Checkpoint Lock cannot be null."); + this.streamStatusMaintainer = Preconditions.checkNotNull(streamStatusMaintainer, "Stream Status Maintainer cannot be null."); + + if (idleTimeout != -1) { + Preconditions.checkArgument(idleTimeout >= 1, "The idle timeout cannot be smaller than 1 ms."); + } + this.idleTimeout = idleTimeout; + + scheduleNextIdleDetectionTask(); + } + + @Override public void collect(T element) { - synchronized (lock) { - output.collect(reuse.replace(element)); + synchronized (checkpointLock) { + streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE); + + if (nextCheck != null) { + this.failOnNextCheck = false; + } else { + scheduleNextIdleDetectionTask(); + } + + processAndCollect(element); } } @Override public void collectWithTimestamp(T element, long timestamp) { - synchronized (lock) { - output.collect(reuse.replace(element, timestamp)); + synchronized (checkpointLock) { + streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE); + + if (nextCheck != null) { + this.failOnNextCheck = false; + } else { + scheduleNextIdleDetectionTask(); + } + + processAndCollectWithTimestamp(element, timestamp); } } @Override public void emitWatermark(Watermark mark) { - synchronized (lock) { - output.emitWatermark(mark); + if (allowWatermark(mark)) { + synchronized (checkpointLock) { + streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE); + + if (nextCheck != null) { + this.failOnNextCheck = false; + } else { + scheduleNextIdleDetectionTask(); + } + + processAndEmitWatermark(mark); + } + } + } + + @Override + public void markAsTemporarilyIdle() { + synchronized (checkpointLock) { + streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE); } } @Override public Object getCheckpointLock() { - return lock; + return checkpointLock; } @Override - public void close() {} + public void close() { + cancelNextIdleDetectionTask(); + } + + private class IdlenessDetectionTask implements ProcessingTimeCallback { + @Override + public void onProcessingTime(long timestamp) throws Exception { + synchronized (checkpointLock) { + // set this to null now; + // the next idleness detection will be scheduled again + // depending on the below failOnNextCheck condition + nextCheck = null; + + if (failOnNextCheck) { + markAsTemporarilyIdle(); + } else { + scheduleNextIdleDetectionTask(); + } + } + } + } + + private void scheduleNextIdleDetectionTask() { + if (idleTimeout != -1) { + // reset flag; if it remains true when task fires, we have detected idleness + failOnNextCheck = true; + nextCheck = this.timeService.registerTimer( + this.timeService.getCurrentProcessingTime() + idleTimeout, + new IdlenessDetectionTask()); + } + } + + protected void cancelNextIdleDetectionTask() { + final ScheduledFuture<?> nextCheck = this.nextCheck; + if (nextCheck != null) { + nextCheck.cancel(true); + } + } + + // ------------------------------------------------------------------------ + // Abstract methods for concrete subclasses to implement. + // These methods are guaranteed to be synchronized on the checkpoint lock, + // so implementations don't need to do so. + // ------------------------------------------------------------------------ + + /** Process and collect record. */ + protected abstract void processAndCollect(T element); + + /** Process and collect record with timestamp. */ + protected abstract void processAndCollectWithTimestamp(T element, long timestamp); + + /** Whether or not a watermark should be allowed */ + protected abstract boolean allowWatermark(Watermark mark); + + /** Process and emit watermark. Only called if {@link WatermarkContext#allowWatermark(Watermark)} returns {@code true} */ + protected abstract void processAndEmitWatermark(Watermark mark); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index e2061c3..3feaa52 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -49,7 +49,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorChain; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -95,7 +95,7 @@ public class StreamInputProcessor<IN> { */ private int currentChannel = -1; - private final OperatorChain<?, OneInputStreamOperator<IN, ?>> operatorChain; + private final StreamStatusMaintainer streamStatusMaintainer; private final OneInputStreamOperator<IN, ?> streamOperator; @@ -115,7 +115,7 @@ public class StreamInputProcessor<IN> { Object lock, IOManager ioManager, Configuration taskManagerConfig, - OperatorChain<?, OneInputStreamOperator<IN, ?>> operatorChain, + StreamStatusMaintainer streamStatusMaintainer, OneInputStreamOperator<IN, ?> streamOperator) throws IOException { InputGate inputGate = InputGateUtil.createInputGate(inputGates); @@ -157,7 +157,7 @@ public class StreamInputProcessor<IN> { this.lastEmittedWatermark = Long.MIN_VALUE; - this.operatorChain = checkNotNull(operatorChain); + this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer); this.streamOperator = checkNotNull(streamOperator); this.statusWatermarkValve = new StatusWatermarkValve( @@ -297,7 +297,7 @@ public class StreamInputProcessor<IN> { public void handleStreamStatus(StreamStatus streamStatus) { try { synchronized (lock) { - operatorChain.setStreamStatus(streamStatus); + streamStatusMaintainer.toggleStreamStatus(streamStatus); } } catch (Exception e) { throw new RuntimeException("Exception occurred while processing valve output stream status: ", e); http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index a295395..a8ec972 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -45,7 +45,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; -import org.apache.flink.streaming.runtime.tasks.OperatorChain; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import java.io.IOException; import java.util.Collection; @@ -107,7 +107,7 @@ public class StreamTwoInputProcessor<IN1, IN2> { */ private int currentChannel = -1; - private final OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> operatorChain; + private final StreamStatusMaintainer streamStatusMaintainer; private final TwoInputStreamOperator<IN1, IN2, ?> streamOperator; @@ -129,7 +129,7 @@ public class StreamTwoInputProcessor<IN1, IN2> { Object lock, IOManager ioManager, Configuration taskManagerConfig, - OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> operatorChain, + StreamStatusMaintainer streamStatusMaintainer, TwoInputStreamOperator<IN1, IN2, ?> streamOperator) throws IOException { final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2); @@ -185,7 +185,7 @@ public class StreamTwoInputProcessor<IN1, IN2> { this.firstStatus = StreamStatus.ACTIVE; this.secondStatus = StreamStatus.ACTIVE; - this.operatorChain = checkNotNull(operatorChain); + this.streamStatusMaintainer = checkNotNull(streamStatusMaintainer); this.streamOperator = checkNotNull(streamOperator); this.statusWatermarkValve1 = new StatusWatermarkValve(numInputChannels1, new ForwardingValveOutputHandler1(streamOperator, lock)); @@ -355,13 +355,13 @@ public class StreamTwoInputProcessor<IN1, IN2> { firstStatus = streamStatus; // check if we need to toggle the task's stream status - if (!streamStatus.equals(operatorChain.getStreamStatus())) { + if (!streamStatus.equals(streamStatusMaintainer.getStreamStatus())) { if (streamStatus.isActive()) { // we're no longer idle if at least one input has become active - operatorChain.setStreamStatus(StreamStatus.ACTIVE); + streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE); } else if (secondStatus.isIdle()) { // we're idle once both inputs are idle - operatorChain.setStreamStatus(StreamStatus.IDLE); + streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE); } } } @@ -399,13 +399,13 @@ public class StreamTwoInputProcessor<IN1, IN2> { secondStatus = streamStatus; // check if we need to toggle the task's stream status - if (!streamStatus.equals(operatorChain.getStreamStatus())) { + if (!streamStatus.equals(streamStatusMaintainer.getStreamStatus())) { if (streamStatus.isActive()) { // we're no longer idle if at least one input has become active - operatorChain.setStreamStatus(StreamStatus.ACTIVE); + streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE); } else if (firstStatus.isIdle()) { // we're idle once both inputs are idle - operatorChain.setStreamStatus(StreamStatus.IDLE); + streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java new file mode 100644 index 0000000..d964cef --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.streamstatus; + +import org.apache.flink.annotation.Internal; + +/** + * Interface that allows toggling the current {@link StreamStatus} as well as retrieving it. + */ +@Internal +public interface StreamStatusMaintainer extends StreamStatusProvider { + + /** + * Toggles the current stream status. This method should only have effect + * if the supplied stream status is different from the current status. + * + * @param streamStatus the new status to toggle to + */ + void toggleStreamStatus(StreamStatus streamStatus); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index e559ad0..e04d316 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -42,10 +42,6 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO if (numberOfInputs > 0) { InputGate[] inputGates = getEnvironment().getAllInputGates(); - @SuppressWarnings("unchecked") - OperatorChain<?, OneInputStreamOperator<IN, ?>> operatorChain = - (OperatorChain) this.operatorChain; - inputProcessor = new StreamInputProcessor<>( inputGates, inSerializer, @@ -54,7 +50,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO getCheckpointLock(), getEnvironment().getIOManager(), getEnvironment().getTaskManagerInfo().getConfiguration(), - operatorChain, + getStreamStatusMaintainer(), this.headOperator); // make sure that stream tasks report their I/O statistics http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 591ed3c..4f07182 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -43,6 +43,7 @@ import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitio import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider; import org.apache.flink.util.XORShiftRandom; import org.slf4j.Logger; @@ -63,7 +64,7 @@ import java.util.Random; * head operator. */ @Internal -public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusProvider { +public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer { private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class); @@ -151,7 +152,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea return streamStatus; } - public void setStreamStatus(StreamStatus status) throws IOException { + @Override + public void toggleStreamStatus(StreamStatus status) { if (!status.equals(this.streamStatus)) { this.streamStatus = status; http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 7ae99f6..63b40ad 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -53,7 +53,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S @Override protected void run() throws Exception { - headOperator.run(getCheckpointLock()); + headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 60afd60..62cfb8f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -57,6 +57,7 @@ import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FutureUtil; @@ -497,6 +498,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> return accumulatorMap; } + public StreamStatusMaintainer getStreamStatusMaintainer() { + return operatorChain; + } + Output<StreamRecord<OUT>> getHeadOutput() { return operatorChain.getChainEntryPoint(); } http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 175bd76..71346b8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -65,10 +65,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS } } - @SuppressWarnings("unchecked") - OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> operatorChain = - (OperatorChain) this.operatorChain; - this.inputProcessor = new StreamTwoInputProcessor<>( inputList1, inputList2, inputDeserializer1, inputDeserializer2, @@ -77,7 +73,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS getCheckpointLock(), getEnvironment().getIOManager(), getEnvironment().getTaskManagerInfo().getConfiguration(), - operatorChain, + getStreamStatusMaintainer(), this.headOperator); // make sure that stream tasks report their I/O statistics http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java index e4dadf0..a4c1bea 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java @@ -67,7 +67,12 @@ public class ListSourceContext<T> implements SourceFunction.SourceContext<T> { @Override public void emitWatermark(Watermark mark) { - // don't do anything + throw new UnsupportedOperationException(); + } + + @Override + public void markAsTemporarilyIdle() { + throw new UnsupportedOperationException(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java index 8332cb3..9030e9d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java @@ -228,6 +228,12 @@ public class StatefulSequenceSourceTest { @Override public void emitWatermark(Watermark mark) { + throw new UnsupportedOperationException(); + } + + @Override + public void markAsTemporarilyIdle() { + throw new UnsupportedOperationException(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java index 6b36419..d81b440 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java @@ -58,10 +58,13 @@ public class FileMonitoringFunctionTest { public void emitWatermark(Watermark mark) {} @Override + public void markAsTemporarilyIdle() {} + + @Override public Object getCheckpointLock() { return null; } @Override public void close() {} }); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java index d1131b4..bb80228 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java @@ -225,6 +225,11 @@ public class InputFormatSourceFunctionTest { } @Override + public void markAsTemporarilyIdle() { + throw new UnsupportedOperationException(); + } + + @Override public Object getCheckpointLock() { return null; } http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java index 3e274cf..87376e7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java @@ -283,7 +283,14 @@ public class SocketTextStreamFunctionTest { } @Override - public void emitWatermark(Watermark mark) {} + public void emitWatermark(Watermark mark) { + throw new UnsupportedOperationException(); + } + + @Override + public void markAsTemporarilyIdle() { + throw new UnsupportedOperationException(); + } @Override public Object getCheckpointLock() { @@ -346,4 +353,4 @@ public class SocketTextStreamFunctionTest { } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java index 357163c..c4ddea8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTaskTest; @@ -219,9 +220,11 @@ public class AbstractUdfStreamOperatorLifecycleTest { } @Override - public void run(Object lockingObject, Output<StreamRecord<OUT>> collector) throws Exception { + public void run(Object lockingObject, + StreamStatusMaintainer streamStatusMaintainer, + Output<StreamRecord<OUT>> collector) throws Exception { ACTUAL_ORDER_TRACKING.add("OPERATOR::run"); - super.run(lockingObject, collector); + super.run(lockingObject, streamStatusMaintainer, collector); runStarted.trigger(); runFinish.await(); } http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java new file mode 100644 index 0000000..3695120 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.CollectorOutput; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class StreamSourceContextIdleDetectionTests { + + /** The tests in this class will be parameterized with these enumerations.*/ + private enum TestMethod { + + /** test idleness detection using the {@link SourceFunction.SourceContext#collect(Object)} method */ + COLLECT, + + /** test idleness detection using the {@link SourceFunction.SourceContext#collectWithTimestamp(Object, long)} method */ + COLLECT_WITH_TIMESTAMP, + + /** test idleness detection using the {@link SourceFunction.SourceContext#emitWatermark(Watermark)} method */ + EMIT_WATERMARK + } + + private TestMethod testMethod; + + public StreamSourceContextIdleDetectionTests(TestMethod testMethod) { + this.testMethod = testMethod; + } + + /** + * Test scenario (idleTimeout = 100): + * (1) Start from 0 as initial time. + * (2) As soon as time reaches 100, status should have been toggled to IDLE. + * (3) After some arbitrary time (until 300), the status should remain IDLE. + * (4) Emit a record at 310. Status should become ACTIVE. This should fire a idleness detection at 410. + * (5) Emit another record at 320 (which is before the next check). This should make the idleness check pass. + * (6) Advance time to 410 and trigger idleness detection. + * The status should still be ACTIVE due to step (5). Another idleness detection should be fired at 510. + * (7) Advance time to 510 and trigger idleness detection. Since no records were collected in-between the two + * idleness detections, status should have been toggle back to IDLE. + * + * Inline comments will refer to the corresponding tested steps in the scenario. + */ + @Test + public void testManualWatermarkContext() throws Exception { + long idleTimeout = 100; + + long initialTime = 0; + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + processingTimeService.setCurrentTime(initialTime); + + final List<StreamElement> output = new ArrayList<>(); + + MockStreamStatusMaintainer mockStreamStatusMaintainer = new MockStreamStatusMaintainer(); + + SourceFunction.SourceContext<String> context = StreamSourceContexts.getSourceContext( + TimeCharacteristic.EventTime, + processingTimeService, + new Object(), + mockStreamStatusMaintainer, + new CollectorOutput<String>(output), + 0, + idleTimeout); + + // -------------------------- begin test scenario -------------------------- + + // corresponds to step (2) of scenario (please see method-level Javadoc comment) + processingTimeService.setCurrentTime(initialTime + idleTimeout); + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); + + // corresponds to step (3) of scenario (please see method-level Javadoc comment) + processingTimeService.setCurrentTime(initialTime + 2*idleTimeout); + processingTimeService.setCurrentTime(initialTime + 3*idleTimeout); + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); + + // corresponds to step (4) of scenario (please see method-level Javadoc comment) + processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + idleTimeout/10); + switch (testMethod) { + case COLLECT: + context.collect("msg"); + break; + case COLLECT_WITH_TIMESTAMP: + context.collectWithTimestamp("msg", processingTimeService.getCurrentProcessingTime()); + break; + case EMIT_WATERMARK: + context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime())); + break; + } + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive()); + + // corresponds to step (5) of scenario (please see method-level Javadoc comment) + processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + 2*idleTimeout/10); + switch (testMethod) { + case COLLECT: + context.collect("msg"); + break; + case COLLECT_WITH_TIMESTAMP: + context.collectWithTimestamp("msg", processingTimeService.getCurrentProcessingTime()); + break; + case EMIT_WATERMARK: + context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime())); + break; + } + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive()); + + // corresponds to step (6) of scenario (please see method-level Javadoc comment) + processingTimeService.setCurrentTime(initialTime + 4*idleTimeout + idleTimeout/10); + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive()); + + // corresponds to step (7) of scenario (please see method-level Javadoc comment) + processingTimeService.setCurrentTime(initialTime + 5*idleTimeout + idleTimeout/10); + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); + } + + /** + * Test scenario (idleTimeout = 100, watermarkInterval = 40): + * (1) Start from 20 as initial time. + * (2) As soon as time reaches 120, status should have been toggled to IDLE. + * (3) After some arbitrary time (until 320), the status should remain IDLE, and no watermarks should have been emitted. + * (4) Emit a record at 330. Status should become ACTIVE. This should schedule a idleness detection to be fired at 430. + * (5) Emit another record at 350 (which is before the next check). This should make the idleness check pass. + * (6) Advance time to 430 and trigger idleness detection. The status should still be ACTIVE due to step (5). + * This should schedule a idleness detection to be fired at 530. + * (7) Advance time to 460, in which a watermark emission task should be fired. Idleness detection + * should have been "piggy-backed" in the task, allowing the status to be toggled to IDLE before the next + * actual idle detection task at 530. + * + * Inline comments will refer to the corresponding tested steps in the scenario. + */ + @Test + public void testAutomaticWatermarkContext() throws Exception { + long watermarkInterval = 40; + long idleTimeout = 100; + long initialTime = 20; + + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + processingTimeService.setCurrentTime(initialTime); + + MockStreamStatusMaintainer mockStreamStatusMaintainer = new MockStreamStatusMaintainer(); + + final List<StreamElement> output = new ArrayList<>(); + final List<StreamElement> expectedOutput = new ArrayList<>(); + + SourceFunction.SourceContext<String> context = StreamSourceContexts.getSourceContext( + TimeCharacteristic.IngestionTime, + processingTimeService, + new Object(), + mockStreamStatusMaintainer, + new CollectorOutput<String>(output), + watermarkInterval, + idleTimeout); + + // -------------------------- begin test scenario -------------------------- + + // corresponds to step (2) of scenario (please see method-level Javadoc comment) + processingTimeService.setCurrentTime(initialTime + watermarkInterval); + expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval))); + processingTimeService.setCurrentTime(initialTime + 2*watermarkInterval); + expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval))); + processingTimeService.setCurrentTime(initialTime + idleTimeout); + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); + assertEquals(expectedOutput, output); + + // corresponds to step (3) of scenario (please see method-level Javadoc comment) + processingTimeService.setCurrentTime(initialTime + 3*watermarkInterval); + processingTimeService.setCurrentTime(initialTime + 4*watermarkInterval); + processingTimeService.setCurrentTime(initialTime + 2*idleTimeout); + processingTimeService.setCurrentTime(initialTime + 6*watermarkInterval); + processingTimeService.setCurrentTime(initialTime + 7*watermarkInterval); + processingTimeService.setCurrentTime(initialTime + 3*idleTimeout); + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); + assertEquals(expectedOutput, output); + + // corresponds to step (4) of scenario (please see method-level Javadoc comment) + processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + idleTimeout/10); + switch (testMethod) { + case COLLECT: + context.collect("msg"); + expectedOutput.add(new StreamRecord<>("msg", processingTimeService.getCurrentProcessingTime())); + expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval))); + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive()); + assertEquals(expectedOutput, output); + break; + case COLLECT_WITH_TIMESTAMP: + context.collectWithTimestamp("msg", processingTimeService.getCurrentProcessingTime()); + expectedOutput.add(new StreamRecord<>("msg", processingTimeService.getCurrentProcessingTime())); + expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval))); + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive()); + assertEquals(expectedOutput, output); + break; + case EMIT_WATERMARK: + // for emitWatermark, since the watermark will be blocked, + // it should not make the status become active; + // from here on, the status should remain idle for the emitWatermark variant test + context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime())); + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); + assertEquals(expectedOutput, output); + } + + // corresponds to step (5) of scenario (please see method-level Javadoc comment) + processingTimeService.setCurrentTime(initialTime + 8*watermarkInterval); + processingTimeService.setCurrentTime(initialTime + 3*idleTimeout + 3*idleTimeout/10); + switch (testMethod) { + case COLLECT: + context.collect("msg"); + expectedOutput.add(new StreamRecord<>("msg", processingTimeService.getCurrentProcessingTime())); + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive()); + assertEquals(expectedOutput, output); + break; + case COLLECT_WITH_TIMESTAMP: + context.collectWithTimestamp("msg", processingTimeService.getCurrentProcessingTime()); + expectedOutput.add(new StreamRecord<>("msg", processingTimeService.getCurrentProcessingTime())); + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive()); + assertEquals(expectedOutput, output); + break; + case EMIT_WATERMARK: + context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime())); + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); + assertEquals(expectedOutput, output); + } + + processingTimeService.setCurrentTime(initialTime + 9 * watermarkInterval); + switch (testMethod) { + case COLLECT: + case COLLECT_WITH_TIMESTAMP: + expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval))); + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive()); + assertEquals(expectedOutput, output); + break; + case EMIT_WATERMARK: + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); + assertEquals(expectedOutput, output); + } + + processingTimeService.setCurrentTime(initialTime + 10*watermarkInterval); + switch (testMethod) { + case COLLECT: + case COLLECT_WITH_TIMESTAMP: + expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - (processingTimeService.getCurrentProcessingTime() % watermarkInterval))); + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive()); + assertEquals(expectedOutput, output); + break; + case EMIT_WATERMARK: + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); + assertEquals(expectedOutput, output); + } + + // corresponds to step (6) of scenario (please see method-level Javadoc comment) + processingTimeService.setCurrentTime(initialTime + 4*idleTimeout + idleTimeout/10); + switch (testMethod) { + case COLLECT: + case COLLECT_WITH_TIMESTAMP: + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive()); + assertEquals(expectedOutput, output); + break; + case EMIT_WATERMARK: + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); + assertEquals(expectedOutput, output); + } + + // corresponds to step (7) of scenario (please see method-level Javadoc comment) + processingTimeService.setCurrentTime(initialTime + 11*watermarkInterval); + assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle()); + assertEquals(expectedOutput, output); + } + + private static class MockStreamStatusMaintainer implements StreamStatusMaintainer { + StreamStatus currentStreamStatus = StreamStatus.ACTIVE; + + @Override + public void toggleStreamStatus(StreamStatus streamStatus) { + if (!currentStreamStatus.equals(streamStatus)) { + currentStreamStatus = streamStatus; + } + } + + @Override + public StreamStatus getStreamStatus() { + return currentStreamStatus; + } + } + + @Parameterized.Parameters(name = "TestMethod = {0}") + @SuppressWarnings("unchecked") + public static Collection<TestMethod[]> timeCharacteristic(){ + return Arrays.asList( + new TestMethod[]{TestMethod.COLLECT}, + new TestMethod[]{TestMethod.COLLECT_WITH_TIMESTAMP}, + new TestMethod[]{TestMethod.EMIT_WATERMARK}); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java index b153de9..ae74c9c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.memory.MemoryStateBackend; @@ -35,6 +36,8 @@ import org.apache.flink.streaming.api.operators.StreamSourceContexts; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; @@ -69,7 +72,7 @@ public class StreamSourceOperatorTest { final List<StreamElement> output = new ArrayList<>(); setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0); - operator.run(new Object(), new CollectorOutput<String>(output)); + operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output)); assertEquals(1, output.size()); assertEquals(Watermark.MAX_WATERMARK, output.get(0)); @@ -89,7 +92,7 @@ public class StreamSourceOperatorTest { operator.cancel(); // run and exit - operator.run(new Object(), new CollectorOutput<String>(output)); + operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output)); assertTrue(output.isEmpty()); } @@ -121,7 +124,7 @@ public class StreamSourceOperatorTest { // run and wait to be canceled try { - operator.run(new Object(), new CollectorOutput<String>(output)); + operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output)); } catch (InterruptedException ignored) {} @@ -142,7 +145,7 @@ public class StreamSourceOperatorTest { operator.stop(); // run and stop - operator.run(new Object(), new CollectorOutput<String>(output)); + operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output)); assertTrue(output.isEmpty()); } @@ -171,7 +174,7 @@ public class StreamSourceOperatorTest { }.start(); // run and wait to be stopped - operator.run(new Object(), new CollectorOutput<String>(output)); + operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output)); assertTrue(output.isEmpty()); } @@ -198,7 +201,7 @@ public class StreamSourceOperatorTest { setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, latencyMarkInterval, testProcessingTimeService); // run and wait to be stopped - operator.run(new Object(), new CollectorOutput<Long>(output)); + operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<Long>(output)); int numberLatencyMarkers = (int) (maxProcessingTime / latencyMarkInterval) + 1; @@ -224,11 +227,6 @@ public class StreamSourceOperatorTest { } @Test - public void testLatencyMarksEmitterLifecycleIntegration() { - - } - - @Test public void testAutomaticWatermarkContext() throws Exception { // regular stream source operator @@ -246,8 +244,10 @@ public class StreamSourceOperatorTest { StreamSourceContexts.getSourceContext(TimeCharacteristic.IngestionTime, operator.getContainingTask().getProcessingTimeService(), operator.getContainingTask().getCheckpointLock(), + operator.getContainingTask().getStreamStatusMaintainer(), new CollectorOutput<String>(output), - operator.getExecutionConfig().getAutoWatermarkInterval()); + operator.getExecutionConfig().getAutoWatermarkInterval(), + -1); // periodically emit the watermarks // even though we start from 1 the watermark are still @@ -302,6 +302,7 @@ public class StreamSourceOperatorTest { when(mockTask.getEnvironment()).thenReturn(env); when(mockTask.getExecutionConfig()).thenReturn(executionConfig); when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap()); + when(mockTask.getStreamStatusMaintainer()).thenReturn(mock(StreamStatusMaintainer.class)); doAnswer(new Answer<ProcessingTimeService>() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index d33d1b6..1e74c3e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -84,6 +84,7 @@ import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; @@ -759,7 +760,9 @@ public class StreamTaskTest extends TestLogger { } @Override - public void run(Object lockingObject, Output<StreamRecord<Long>> collector) throws Exception { + public void run(Object lockingObject, + StreamStatusMaintainer streamStatusMaintainer, + Output<StreamRecord<Long>> collector) throws Exception { while (!canceled) { try { Thread.sleep(500); http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 2df4efd..01afec6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -54,6 +54,8 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -157,6 +159,22 @@ public class AbstractStreamOperatorTestHarness<OUT> { processingTimeService = new TestProcessingTimeService(); processingTimeService.setCurrentTime(0); + StreamStatusMaintainer mockStreamStatusMaintainer = new StreamStatusMaintainer() { + StreamStatus currentStreamStatus = StreamStatus.ACTIVE; + + @Override + public void toggleStreamStatus(StreamStatus streamStatus) { + if (!currentStreamStatus.equals(streamStatus)) { + currentStreamStatus = streamStatus; + } + } + + @Override + public StreamStatus getStreamStatus() { + return currentStreamStatus; + } + }; + when(mockTask.getName()).thenReturn("Mock Task"); when(mockTask.getCheckpointLock()).thenReturn(checkpointLock); when(mockTask.getConfiguration()).thenReturn(config); @@ -165,6 +183,7 @@ public class AbstractStreamOperatorTestHarness<OUT> { when(mockTask.getExecutionConfig()).thenReturn(executionConfig); when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader()); when(mockTask.getCancelables()).thenReturn(this.closableRegistry); + when(mockTask.getStreamStatusMaintainer()).thenReturn(mockStreamStatusMaintainer); doAnswer(new Answer<Void>() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java index fe2b03e..d9ad24d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java @@ -56,6 +56,11 @@ public class CollectingSourceContext<T extends Serializable> implements SourceFu } @Override + public void markAsTemporarilyIdle() { + throw new UnsupportedOperationException(); + } + + @Override public Object getCheckpointLock() { return lock; }
