[FLINK-5716] [streaming] Make StreamSourceContexts aware of source idleness

This closes #3347.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0f0f372
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0f0f372
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0f0f372

Branch: refs/heads/master
Commit: b0f0f3722fac4726fba879736c7ee85993b392db
Parents: 646490c
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Fri Feb 17 02:43:44 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Thu Feb 23 01:19:58 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/Kafka010FetcherTest.java   |   7 +-
 .../connectors/kafka/Kafka09FetcherTest.java    |   5 +
 .../AbstractFetcherTimestampsTest.java          |   6 +
 .../connectors/rabbitmq/RMQSourceTest.java      |   6 +
 .../flink/storm/wrappers/TestContext.java       |   7 +-
 .../hdfstests/ContinuousFileProcessingTest.java |   4 +
 .../source/ContinuousFileReaderOperator.java    |   8 +-
 .../api/functions/source/SourceFunction.java    |  14 +
 .../streaming/api/operators/StreamSource.java   |  22 +-
 .../api/operators/StreamSourceContexts.java     | 350 +++++++++++++++----
 .../runtime/io/StreamInputProcessor.java        |  10 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  20 +-
 .../streamstatus/StreamStatusMaintainer.java    |  36 ++
 .../runtime/tasks/OneInputStreamTask.java       |   6 +-
 .../streaming/runtime/tasks/OperatorChain.java  |   6 +-
 .../runtime/tasks/SourceStreamTask.java         |   2 +-
 .../streaming/runtime/tasks/StreamTask.java     |   5 +
 .../runtime/tasks/TwoInputStreamTask.java       |   6 +-
 .../api/functions/ListSourceContext.java        |   7 +-
 .../functions/StatefulSequenceSourceTest.java   |   6 +
 .../source/FileMonitoringFunctionTest.java      |   5 +-
 .../source/InputFormatSourceFunctionTest.java   |   5 +
 .../source/SocketTextStreamFunctionTest.java    |  11 +-
 .../AbstractUdfStreamOperatorLifecycleTest.java |   7 +-
 .../StreamSourceContextIdleDetectionTests.java  | 325 +++++++++++++++++
 .../operators/StreamSourceOperatorTest.java     |  25 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   5 +-
 .../util/AbstractStreamOperatorTestHarness.java |  19 +
 .../streaming/util/CollectingSourceContext.java |   5 +
 29 files changed, 814 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 3bc154e..5718986 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -447,7 +447,12 @@ public class Kafka010FetcherTest {
             block();
         }
 
-        @Override
+               @Override
+               public void markAsTemporarilyIdle() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
         public Object getCheckpointLock() {
             return new Object();
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 4526aa0..abd75cc 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -447,6 +447,11 @@ public class Kafka09FetcherTest {
                }
 
                @Override
+               public void markAsTemporarilyIdle() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
                public Object getCheckpointLock() {
                        return new Object();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index f2091f0..6887518 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -271,6 +271,12 @@ public class AbstractFetcherTimestampsTest {
                        }
                }
 
+
+               @Override
+               public void markAsTemporarilyIdle() {
+                       throw new UnsupportedOperationException();
+               }
+
                @Override
                public Object getCheckpointLock() {
                        return checkpointLock;

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 8474f8a..26434ed 100644
--- 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -443,6 +443,12 @@ public class RMQSourceTest {
 
                @Override
                public void emitWatermark(Watermark mark) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void markAsTemporarilyIdle() {
+                       throw new UnsupportedOperationException();
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
index 4c4749a..58aad7b 100644
--- 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
@@ -41,7 +41,12 @@ class TestContext implements SourceContext<Tuple1<Integer>> {
 
        @Override
        public void emitWatermark(Watermark mark) {
-               // ignore it
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void markAsTemporarilyIdle() {
+               throw new UnsupportedOperationException();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index cc5cb8e..f579345 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -1001,6 +1001,10 @@ public class ContinuousFileProcessingTest {
                }
 
                @Override
+               public void markAsTemporarilyIdle() {
+               }
+
+               @Override
                public Object getCheckpointLock() {
                        return lock;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index ab1ad1d..b86d97c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -136,7 +136,13 @@ public class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OU
                final TimeCharacteristic timeCharacteristic = 
getOperatorConfig().getTimeCharacteristic();
                final long watermarkInterval = 
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
                this.readerContext = StreamSourceContexts.getSourceContext(
-                       timeCharacteristic, getProcessingTimeService(), 
checkpointLock, output, watermarkInterval);
+                       timeCharacteristic,
+                       getProcessingTimeService(),
+                       checkpointLock,
+                       getContainingTask().getStreamStatusMaintainer(),
+                       output,
+                       watermarkInterval,
+                       -1);
 
                // and initialize the split reading thread
                this.reader = new SplitReader<>(format, serializer, 
readerContext, checkpointLock, restoredReaderState);

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index f1619b2..fc7f793 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -216,6 +216,20 @@ public interface SourceFunction<T> extends Function, 
Serializable {
                @PublicEvolving
                void emitWatermark(Watermark mark);
 
+               /**
+                * Marks the source to be temporarily idle. This tells the 
system that this source will
+                * temporarily stop emitting records and watermarks for an 
indefinite amount of time. This
+                * is only relevant when running on {@link 
TimeCharacteristic#IngestionTime} and
+                * {@link TimeCharacteristic#EventTime}, allowing downstream 
tasks to advance their
+                * watermarks without the need to wait for watermarks from this 
source while it is idle.
+                *
+                * <p>Source functions should make a best effort to call this 
method as soon as they
+                * acknowledge themselves to be idle. The system will consider 
the source to resume activity
+                * again once {@link SourceContext#collect(T)}, {@link 
SourceContext#collectWithTimestamp(T, long)},
+                * or {@link SourceContext#emitWatermark(Watermark)} is called 
to emit elements or watermarks from the source.
+                */
+               @PublicEvolving
+               void markAsTemporarilyIdle();
 
                /**
                 * Returns the checkpoint lock. Please refer to the class-level 
comment in

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 84330b6..36f7c6a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
@@ -51,12 +52,15 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                this.chainingStrategy = ChainingStrategy.HEAD;
        }
 
-       public void run(final Object lockingObject) throws Exception {
-               run(lockingObject, output);
+       public void run(final Object lockingObject, final 
StreamStatusMaintainer streamStatusMaintainer) throws Exception {
+               run(lockingObject, streamStatusMaintainer, output);
        }
 
        
-       public void run(final Object lockingObject, final 
Output<StreamRecord<OUT>> collector) throws Exception {
+       public void run(final Object lockingObject,
+                       final StreamStatusMaintainer streamStatusMaintainer,
+                       final Output<StreamRecord<OUT>> collector) throws 
Exception {
+
                final TimeCharacteristic timeCharacteristic = 
getOperatorConfig().getTimeCharacteristic();
 
                LatencyMarksEmitter latencyEmitter = null;
@@ -68,11 +72,17 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                                getOperatorConfig().getVertexID(),
                                getRuntimeContext().getIndexOfThisSubtask());
                }
-               
+
                final long watermarkInterval = 
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
 
                this.ctx = StreamSourceContexts.getSourceContext(
-                       timeCharacteristic, getProcessingTimeService(), 
lockingObject, collector, watermarkInterval);
+                       timeCharacteristic,
+                       getProcessingTimeService(),
+                       lockingObject,
+                       streamStatusMaintainer,
+                       collector,
+                       watermarkInterval,
+                       -1);
 
                try {
                        userFunction.run(ctx);
@@ -108,7 +118,7 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
        /**
         * Marks this source as canceled or stopped.
         * 
-        * <p>This indicates that any exit of the {@link #run(Object, Output)} 
method
+        * <p>This indicates that any exit of the {@link #run(Object, 
StreamStatusMaintainer, Output)} method
         * cannot be interpreted as the result of a finite source.  
         */
        protected void markCanceledOrStopped() {

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index a6a273f..98281c4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -42,16 +44,34 @@ public class StreamSourceContexts {
         * </ul>
         * */
        public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
-                       TimeCharacteristic timeCharacteristic, 
ProcessingTimeService processingTimeService,
-                       Object checkpointLock, Output<StreamRecord<OUT>> 
output, long watermarkInterval) {
+                       TimeCharacteristic timeCharacteristic,
+                       ProcessingTimeService processingTimeService,
+                       Object checkpointLock,
+                       StreamStatusMaintainer streamStatusMaintainer,
+                       Output<StreamRecord<OUT>> output,
+                       long watermarkInterval,
+                       long idleTimeout) {
 
                final SourceFunction.SourceContext<OUT> ctx;
                switch (timeCharacteristic) {
                        case EventTime:
-                               ctx = new 
ManualWatermarkContext<>(checkpointLock, output);
+                               ctx = new ManualWatermarkContext<>(
+                                       output,
+                                       processingTimeService,
+                                       checkpointLock,
+                                       streamStatusMaintainer,
+                                       idleTimeout);
+
                                break;
                        case IngestionTime:
-                               ctx = new 
AutomaticWatermarkContext<>(processingTimeService, checkpointLock, output, 
watermarkInterval);
+                               ctx = new AutomaticWatermarkContext<>(
+                                       output,
+                                       watermarkInterval,
+                                       processingTimeService,
+                                       checkpointLock,
+                                       streamStatusMaintainer,
+                                       idleTimeout);
+
                                break;
                        case ProcessingTime:
                                ctx = new NonTimestampContext<>(checkpointLock, 
output);
@@ -97,6 +117,11 @@ public class StreamSourceContexts {
                }
 
                @Override
+               public void markAsTemporarilyIdle() {
+                       // do nothing
+               }
+
+               @Override
                public Object getCheckpointLock() {
                        return lock;
                }
@@ -109,10 +134,8 @@ public class StreamSourceContexts {
         * {@link SourceFunction.SourceContext} to be used for sources with 
automatic timestamps
         * and watermark emission.
         */
-       private static class AutomaticWatermarkContext<T> implements 
SourceFunction.SourceContext<T> {
+       private static class AutomaticWatermarkContext<T> extends 
WatermarkContext<T> {
 
-               private final ProcessingTimeService timeService;
-               private final Object lock;
                private final Output<StreamRecord<T>> output;
                private final StreamRecord<T> reuse;
 
@@ -121,14 +144,18 @@ public class StreamSourceContexts {
                private volatile ScheduledFuture<?> nextWatermarkTimer;
                private volatile long nextWatermarkTime;
 
+               private long lastRecordTime;
+
                private AutomaticWatermarkContext(
-                       final ProcessingTimeService timeService,
-                       final Object checkpointLock,
-                       final Output<StreamRecord<T>> output,
-                       final long watermarkInterval) {
+                               final Output<StreamRecord<T>> output,
+                               final long watermarkInterval,
+                               final ProcessingTimeService timeService,
+                               final Object checkpointLock,
+                               final StreamStatusMaintainer 
streamStatusMaintainer,
+                               final long idleTimeout) {
+
+                       super(timeService, checkpointLock, 
streamStatusMaintainer, idleTimeout);
 
-                       this.timeService = 
Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
-                       this.lock = Preconditions.checkNotNull(checkpointLock, 
"The checkpoint lock cannot be null.");
                        this.output = Preconditions.checkNotNull(output, "The 
output cannot be null.");
 
                        Preconditions.checkArgument(watermarkInterval >= 1L, 
"The watermark interval cannot be smaller than 1 ms.");
@@ -136,63 +163,62 @@ public class StreamSourceContexts {
 
                        this.reuse = new StreamRecord<>(null);
 
+                       this.lastRecordTime = Long.MIN_VALUE;
+
                        long now = this.timeService.getCurrentProcessingTime();
                        this.nextWatermarkTimer = 
this.timeService.registerTimer(now + watermarkInterval,
-                               new WatermarkEmittingTask(this.timeService, 
lock, output));
+                               new WatermarkEmittingTask(this.timeService, 
checkpointLock, output));
                }
 
                @Override
-               public void collect(T element) {
-                       synchronized (lock) {
-                               final long currentTime = 
this.timeService.getCurrentProcessingTime();
-                               output.collect(reuse.replace(element, 
currentTime));
-
-                               // this is to avoid lock contention in the 
lockingObject by
-                               // sending the watermark before the firing of 
the watermark
-                               // emission task.
-
-                               if (currentTime > nextWatermarkTime) {
-                                       // in case we jumped some watermarks, 
recompute the next watermark time
-                                       final long watermarkTime = currentTime 
- (currentTime % watermarkInterval);
-                                       nextWatermarkTime = watermarkTime + 
watermarkInterval;
-                                       output.emitWatermark(new 
Watermark(watermarkTime));
-
-                                       // we do not need to register another 
timer here
-                                       // because the emitting task will do so.
-                               }
+               protected void processAndCollect(T element) {
+                       lastRecordTime = 
this.timeService.getCurrentProcessingTime();
+                       output.collect(reuse.replace(element, lastRecordTime));
+
+                       // this is to avoid lock contention in the 
lockingObject by
+                       // sending the watermark before the firing of the 
watermark
+                       // emission task.
+                       if (lastRecordTime > nextWatermarkTime) {
+                               // in case we jumped some watermarks, recompute 
the next watermark time
+                               final long watermarkTime = lastRecordTime - 
(lastRecordTime % watermarkInterval);
+                               nextWatermarkTime = watermarkTime + 
watermarkInterval;
+                               output.emitWatermark(new 
Watermark(watermarkTime));
+
+                               // we do not need to register another timer here
+                               // because the emitting task will do so.
                        }
                }
 
                @Override
-               public void collectWithTimestamp(T element, long timestamp) {
-                       collect(element);
+               protected void processAndCollectWithTimestamp(T element, long 
timestamp) {
+                       processAndCollect(element);
                }
 
                @Override
-               public void emitWatermark(Watermark mark) {
-
-                       if (mark.getTimestamp() == Long.MAX_VALUE) {
-                               // allow it since this is the special 
end-watermark that for example the Kafka source emits
-                               synchronized (lock) {
-                                       nextWatermarkTime = Long.MAX_VALUE;
-                                       output.emitWatermark(mark);
-                               }
-
-                               // we can shutdown the timer now, no watermarks 
will be needed any more
-                               final ScheduledFuture<?> nextWatermarkTimer = 
this.nextWatermarkTimer;
-                               if (nextWatermarkTimer != null) {
-                                       nextWatermarkTimer.cancel(true);
-                               }
-                       }
+               protected boolean allowWatermark(Watermark mark) {
+                       // allow Long.MAX_VALUE since this is the special 
end-watermark that for example the Kafka source emits
+                       return mark.getTimestamp() == Long.MAX_VALUE && 
nextWatermarkTime != Long.MAX_VALUE;
                }
 
+               /** This will only be called if allowWatermark returned {@code 
true} */
                @Override
-               public Object getCheckpointLock() {
-                       return lock;
+               protected void processAndEmitWatermark(Watermark mark) {
+                       nextWatermarkTime = Long.MAX_VALUE;
+                       output.emitWatermark(mark);
+
+                       // we can shutdown the watermark timer now, no 
watermarks will be needed any more.
+                       // Note that this procedure actually doesn't need to be 
synchronized with the lock,
+                       // but since it's only a one-time thing, doesn't hurt 
either
+                       final ScheduledFuture<?> nextWatermarkTimer = 
this.nextWatermarkTimer;
+                       if (nextWatermarkTimer != null) {
+                               nextWatermarkTimer.cancel(true);
+                       }
                }
 
                @Override
                public void close() {
+                       super.close();
+
                        final ScheduledFuture<?> nextWatermarkTimer = 
this.nextWatermarkTimer;
                        if (nextWatermarkTimer != null) {
                                nextWatermarkTimer.cancel(true);
@@ -218,14 +244,23 @@ public class StreamSourceContexts {
                        public void onProcessingTime(long timestamp) {
                                final long currentTime = 
timeService.getCurrentProcessingTime();
 
-                               if (currentTime > nextWatermarkTime) {
-                                       // align the watermarks across all 
machines. this will ensure that we
-                                       // don't have watermarks that creep 
along at different intervals because
-                                       // the machine clocks are out of sync
-                                       final long watermarkTime = currentTime 
- (currentTime % watermarkInterval);
+                               synchronized (lock) {
+                                       // we should continue to automatically 
emit watermarks if we are active
+                                       if 
(streamStatusMaintainer.getStreamStatus().isActive()) {
+                                               if (idleTimeout != -1 && 
currentTime - lastRecordTime > idleTimeout) {
+                                                       // if we are configured 
to detect idleness, piggy-back the idle detection check on the
+                                                       // watermark interval, 
so that we may possibly discover idle sources faster before waiting
+                                                       // for the next idle 
check to fire
+                                                       markAsTemporarilyIdle();
+
+                                                       // no need to finish 
the next check, as we are now idle.
+                                                       
cancelNextIdleDetectionTask();
+                                               } else if (currentTime > 
nextWatermarkTime) {
+                                                       // align the watermarks 
across all machines. this will ensure that we
+                                                       // don't have 
watermarks that creep along at different intervals because
+                                                       // the machine clocks 
are out of sync
+                                                       final long 
watermarkTime = currentTime - (currentTime % watermarkInterval);
 
-                                       synchronized (lock) {
-                                               if (currentTime > 
nextWatermarkTime) {
                                                        
output.emitWatermark(new Watermark(watermarkTime));
                                                        nextWatermarkTime = 
watermarkTime + watermarkInterval;
                                                }
@@ -247,45 +282,220 @@ public class StreamSourceContexts {
         * Streaming topologies can use timestamp assigner functions to 
override the timestamps
         * assigned here.
         */
-       private static class ManualWatermarkContext<T> implements 
SourceFunction.SourceContext<T> {
+       private static class ManualWatermarkContext<T> extends 
WatermarkContext<T> {
 
-               private final Object lock;
                private final Output<StreamRecord<T>> output;
                private final StreamRecord<T> reuse;
 
-               private ManualWatermarkContext(Object checkpointLock, 
Output<StreamRecord<T>> output) {
-                       this.lock = Preconditions.checkNotNull(checkpointLock, 
"The checkpoint lock cannot be null.");
+               private ManualWatermarkContext(
+                               final Output<StreamRecord<T>> output,
+                               final ProcessingTimeService timeService,
+                               final Object checkpointLock,
+                               final StreamStatusMaintainer 
streamStatusMaintainer,
+                               final long idleTimeout) {
+
+                       super(timeService, checkpointLock, 
streamStatusMaintainer, idleTimeout);
+
                        this.output = Preconditions.checkNotNull(output, "The 
output cannot be null.");
                        this.reuse = new StreamRecord<>(null);
                }
 
                @Override
+               protected void processAndCollect(T element) {
+                       output.collect(reuse.replace(element));
+               }
+
+               @Override
+               protected void processAndCollectWithTimestamp(T element, long 
timestamp) {
+                       output.collect(reuse.replace(element, timestamp));
+               }
+
+               @Override
+               protected void processAndEmitWatermark(Watermark mark) {
+                       output.emitWatermark(mark);
+               }
+
+               @Override
+               protected boolean allowWatermark(Watermark mark) {
+                       return true;
+               }
+       }
+
+       /**
+        * An abstract {@link SourceFunction.SourceContext} that should be used 
as the base for
+        * stream source contexts that are relevant with {@link Watermark}s.
+        *
+        * Stream source contexts that are relevant with watermarks are 
responsible of manipulating
+        * the current {@link StreamStatus}, so that stream status can be 
correctly propagated
+        * downstream. Please refer to the class-level documentation of {@link 
StreamStatus} for
+        * information on how stream status affects watermark advancement at 
downstream tasks.
+        *
+        * This class implements the logic of idleness detection. It fires 
idleness detection
+        * tasks at a given interval; if no records or watermarks were 
collected by the source context
+        * between 2 consecutive checks, it determines the source to be IDLE 
and correspondingly
+        * toggles the status. ACTIVE status resumes as soon as some record or 
watermark is collected
+        * again.
+        */
+       private static abstract class WatermarkContext<T> implements 
SourceFunction.SourceContext<T> {
+
+               protected final ProcessingTimeService timeService;
+               protected final Object checkpointLock;
+               protected final StreamStatusMaintainer streamStatusMaintainer;
+               protected final long idleTimeout;
+
+               private ScheduledFuture<?> nextCheck;
+
+               /**
+                * This flag will be reset to {@code true} every time the next 
check is scheduled.
+                * Whenever a record or watermark is collected, the flag will 
be set to {@code false}.
+                *
+                * When the scheduled check is fired, if the flag remains to be 
{@code true}, the check will fail,
+                * and our current status will determined to be IDLE.
+                */
+               private volatile boolean failOnNextCheck;
+
+               /**
+                * Create a watermark context.
+                *
+                * @param timeService the time service to schedule idleness 
detection tasks
+                * @param checkpointLock the checkpoint lock
+                * @param streamStatusMaintainer the stream status maintainer 
to toggle and retrieve current status
+                * @param idleTimeout (-1 if idleness checking is disabled)
+                */
+               public WatermarkContext(
+                               final ProcessingTimeService timeService,
+                               final Object checkpointLock,
+                               final StreamStatusMaintainer 
streamStatusMaintainer,
+                               final long idleTimeout) {
+
+                       this.timeService = 
Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
+                       this.checkpointLock = 
Preconditions.checkNotNull(checkpointLock, "Checkpoint Lock cannot be null.");
+                       this.streamStatusMaintainer = 
Preconditions.checkNotNull(streamStatusMaintainer, "Stream Status Maintainer 
cannot be null.");
+
+                       if (idleTimeout != -1) {
+                               Preconditions.checkArgument(idleTimeout >= 1, 
"The idle timeout cannot be smaller than 1 ms.");
+                       }
+                       this.idleTimeout = idleTimeout;
+
+                       scheduleNextIdleDetectionTask();
+               }
+
+               @Override
                public void collect(T element) {
-                       synchronized (lock) {
-                               output.collect(reuse.replace(element));
+                       synchronized (checkpointLock) {
+                               
streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+
+                               if (nextCheck != null) {
+                                       this.failOnNextCheck = false;
+                               } else {
+                                       scheduleNextIdleDetectionTask();
+                               }
+
+                               processAndCollect(element);
                        }
                }
 
                @Override
                public void collectWithTimestamp(T element, long timestamp) {
-                       synchronized (lock) {
-                               output.collect(reuse.replace(element, 
timestamp));
+                       synchronized (checkpointLock) {
+                               
streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+
+                               if (nextCheck != null) {
+                                       this.failOnNextCheck = false;
+                               } else {
+                                       scheduleNextIdleDetectionTask();
+                               }
+
+                               processAndCollectWithTimestamp(element, 
timestamp);
                        }
                }
 
                @Override
                public void emitWatermark(Watermark mark) {
-                       synchronized (lock) {
-                               output.emitWatermark(mark);
+                       if (allowWatermark(mark)) {
+                               synchronized (checkpointLock) {
+                                       
streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+
+                                       if (nextCheck != null) {
+                                               this.failOnNextCheck = false;
+                                       } else {
+                                               scheduleNextIdleDetectionTask();
+                                       }
+
+                                       processAndEmitWatermark(mark);
+                               }
+                       }
+               }
+
+               @Override
+               public void markAsTemporarilyIdle() {
+                       synchronized (checkpointLock) {
+                               
streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                        }
                }
 
                @Override
                public Object getCheckpointLock() {
-                       return lock;
+                       return checkpointLock;
                }
 
                @Override
-               public void close() {}
+               public void close() {
+                       cancelNextIdleDetectionTask();
+               }
+
+               private class IdlenessDetectionTask implements 
ProcessingTimeCallback {
+                       @Override
+                       public void onProcessingTime(long timestamp) throws 
Exception {
+                               synchronized (checkpointLock) {
+                                       // set this to null now;
+                                       // the next idleness detection will be 
scheduled again
+                                       // depending on the below 
failOnNextCheck condition
+                                       nextCheck = null;
+
+                                       if (failOnNextCheck) {
+                                               markAsTemporarilyIdle();
+                                       } else {
+                                               scheduleNextIdleDetectionTask();
+                                       }
+                               }
+                       }
+               }
+
+               private void scheduleNextIdleDetectionTask() {
+                       if (idleTimeout != -1) {
+                               // reset flag; if it remains true when task 
fires, we have detected idleness
+                               failOnNextCheck = true;
+                               nextCheck = this.timeService.registerTimer(
+                                       
this.timeService.getCurrentProcessingTime() + idleTimeout,
+                                       new IdlenessDetectionTask());
+                       }
+               }
+
+               protected void cancelNextIdleDetectionTask() {
+                       final ScheduledFuture<?> nextCheck = this.nextCheck;
+                       if (nextCheck != null) {
+                               nextCheck.cancel(true);
+                       }
+               }
+
+               // 
------------------------------------------------------------------------
+               //      Abstract methods for concrete subclasses to implement.
+               //  These methods are guaranteed to be synchronized on the 
checkpoint lock,
+               //  so implementations don't need to do so.
+               // 
------------------------------------------------------------------------
+
+               /** Process and collect record. */
+               protected abstract void processAndCollect(T element);
+
+               /** Process and collect record with timestamp. */
+               protected abstract void processAndCollectWithTimestamp(T 
element, long timestamp);
+
+               /** Whether or not a watermark should be allowed */
+               protected abstract boolean allowWatermark(Watermark mark);
+
+               /** Process and emit watermark. Only called if {@link 
WatermarkContext#allowWatermark(Watermark)} returns {@code true} */
+               protected abstract void processAndEmitWatermark(Watermark mark);
+
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index e2061c3..3feaa52 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -49,7 +49,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -95,7 +95,7 @@ public class StreamInputProcessor<IN> {
         */
        private int currentChannel = -1;
 
-       private final OperatorChain<?, OneInputStreamOperator<IN, ?>> 
operatorChain;
+       private final StreamStatusMaintainer streamStatusMaintainer;
        
        private final OneInputStreamOperator<IN, ?> streamOperator;
 
@@ -115,7 +115,7 @@ public class StreamInputProcessor<IN> {
                        Object lock,
                        IOManager ioManager,
                        Configuration taskManagerConfig,
-                       OperatorChain<?, OneInputStreamOperator<IN, ?>> 
operatorChain,
+                       StreamStatusMaintainer streamStatusMaintainer,
                        OneInputStreamOperator<IN, ?> streamOperator) throws 
IOException {
 
                InputGate inputGate = InputGateUtil.createInputGate(inputGates);
@@ -157,7 +157,7 @@ public class StreamInputProcessor<IN> {
 
                this.lastEmittedWatermark = Long.MIN_VALUE;
 
-               this.operatorChain = checkNotNull(operatorChain);
+               this.streamStatusMaintainer = 
checkNotNull(streamStatusMaintainer);
                this.streamOperator = checkNotNull(streamOperator);
 
                this.statusWatermarkValve = new StatusWatermarkValve(
@@ -297,7 +297,7 @@ public class StreamInputProcessor<IN> {
                public void handleStreamStatus(StreamStatus streamStatus) {
                        try {
                                synchronized (lock) {
-                                       
operatorChain.setStreamStatus(streamStatus);
+                                       
streamStatusMaintainer.toggleStreamStatus(streamStatus);
                                }
                        } catch (Exception e) {
                                throw new RuntimeException("Exception occurred 
while processing valve output stream status: ", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index a295395..a8ec972 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -45,7 +45,7 @@ import 
org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
-import org.apache.flink.streaming.runtime.tasks.OperatorChain;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -107,7 +107,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
         */
        private int currentChannel = -1;
 
-       private final OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> 
operatorChain;
+       private final StreamStatusMaintainer streamStatusMaintainer;
 
        private final TwoInputStreamOperator<IN1, IN2, ?> streamOperator;
 
@@ -129,7 +129,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                        Object lock,
                        IOManager ioManager,
                        Configuration taskManagerConfig,
-                       OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> 
operatorChain,
+                       StreamStatusMaintainer streamStatusMaintainer,
                        TwoInputStreamOperator<IN1, IN2, ?> streamOperator) 
throws IOException {
 
                final InputGate inputGate = 
InputGateUtil.createInputGate(inputGates1, inputGates2);
@@ -185,7 +185,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                this.firstStatus = StreamStatus.ACTIVE;
                this.secondStatus = StreamStatus.ACTIVE;
 
-               this.operatorChain = checkNotNull(operatorChain);
+               this.streamStatusMaintainer = 
checkNotNull(streamStatusMaintainer);
                this.streamOperator = checkNotNull(streamOperator);
 
                this.statusWatermarkValve1 = new 
StatusWatermarkValve(numInputChannels1, new 
ForwardingValveOutputHandler1(streamOperator, lock));
@@ -355,13 +355,13 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                                        firstStatus = streamStatus;
 
                                        // check if we need to toggle the 
task's stream status
-                                       if 
(!streamStatus.equals(operatorChain.getStreamStatus())) {
+                                       if 
(!streamStatus.equals(streamStatusMaintainer.getStreamStatus())) {
                                                if (streamStatus.isActive()) {
                                                        // we're no longer idle 
if at least one input has become active
-                                                       
operatorChain.setStreamStatus(StreamStatus.ACTIVE);
+                                                       
streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                                                } else if 
(secondStatus.isIdle()) {
                                                        // we're idle once both 
inputs are idle
-                                                       
operatorChain.setStreamStatus(StreamStatus.IDLE);
+                                                       
streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                                                }
                                        }
                                }
@@ -399,13 +399,13 @@ public class StreamTwoInputProcessor<IN1, IN2> {
                                        secondStatus = streamStatus;
 
                                        // check if we need to toggle the 
task's stream status
-                                       if 
(!streamStatus.equals(operatorChain.getStreamStatus())) {
+                                       if 
(!streamStatus.equals(streamStatusMaintainer.getStreamStatus())) {
                                                if (streamStatus.isActive()) {
                                                        // we're no longer idle 
if at least one input has become active
-                                                       
operatorChain.setStreamStatus(StreamStatus.ACTIVE);
+                                                       
streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                                                } else if 
(firstStatus.isIdle()) {
                                                        // we're idle once both 
inputs are idle
-                                                       
operatorChain.setStreamStatus(StreamStatus.IDLE);
+                                                       
streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                                                }
                                        }
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java
new file mode 100644
index 0000000..d964cef
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatusMaintainer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.streamstatus;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface that allows toggling the current {@link StreamStatus} as well as 
retrieving it.
+ */
+@Internal
+public interface StreamStatusMaintainer extends StreamStatusProvider {
+
+       /**
+        * Toggles the current stream status. This method should only have 
effect
+        * if the supplied stream status is different from the current status.
+        *
+        * @param streamStatus the new status to toggle to
+        */
+       void toggleStreamStatus(StreamStatus streamStatus);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index e559ad0..e04d316 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -42,10 +42,6 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                if (numberOfInputs > 0) {
                        InputGate[] inputGates = 
getEnvironment().getAllInputGates();
 
-                       @SuppressWarnings("unchecked")
-                       OperatorChain<?, OneInputStreamOperator<IN, ?>> 
operatorChain =
-                                       (OperatorChain) this.operatorChain;
-
                        inputProcessor = new StreamInputProcessor<>(
                                        inputGates,
                                        inSerializer,
@@ -54,7 +50,7 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                                        getCheckpointLock(),
                                        getEnvironment().getIOManager(),
                                        
getEnvironment().getTaskManagerInfo().getConfiguration(),
-                                       operatorChain,
+                                       getStreamStatusMaintainer(),
                                        this.headOperator);
 
                        // make sure that stream tasks report their I/O 
statistics

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 591ed3c..4f07182 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitio
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
 import org.apache.flink.util.XORShiftRandom;
 import org.slf4j.Logger;
@@ -63,7 +64,7 @@ import java.util.Random;
  *              head operator.
  */
 @Internal
-public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements 
StreamStatusProvider {
+public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements 
StreamStatusMaintainer {
        
        private static final Logger LOG = 
LoggerFactory.getLogger(OperatorChain.class);
        
@@ -151,7 +152,8 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                return streamStatus;
        }
 
-       public void setStreamStatus(StreamStatus status) throws IOException {
+       @Override
+       public void toggleStreamStatus(StreamStatus status) {
                if (!status.equals(this.streamStatus)) {
                        this.streamStatus = status;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 7ae99f6..63b40ad 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -53,7 +53,7 @@ public class SourceStreamTask<OUT, SRC extends 
SourceFunction<OUT>, OP extends S
 
        @Override
        protected void run() throws Exception {
-               headOperator.run(getCheckpointLock());
+               headOperator.run(getCheckpointLock(), 
getStreamStatusMaintainer());
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 60afd60..62cfb8f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -57,6 +57,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FutureUtil;
@@ -497,6 +498,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                return accumulatorMap;
        }
 
+       public StreamStatusMaintainer getStreamStatusMaintainer() {
+               return operatorChain;
+       }
+
        Output<StreamRecord<OUT>> getHeadOutput() {
                return operatorChain.getChainEntryPoint();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 175bd76..71346b8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -65,10 +65,6 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
                        }
                }
 
-               @SuppressWarnings("unchecked")
-               OperatorChain<?, TwoInputStreamOperator<IN1, IN2, ?>> 
operatorChain =
-                               (OperatorChain) this.operatorChain;
-
                this.inputProcessor = new StreamTwoInputProcessor<>(
                                inputList1, inputList2,
                                inputDeserializer1, inputDeserializer2,
@@ -77,7 +73,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
                                getCheckpointLock(),
                                getEnvironment().getIOManager(),
                                
getEnvironment().getTaskManagerInfo().getConfiguration(),
-                               operatorChain,
+                               getStreamStatusMaintainer(),
                                this.headOperator);
 
                // make sure that stream tasks report their I/O statistics

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
index e4dadf0..a4c1bea 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
@@ -67,7 +67,12 @@ public class ListSourceContext<T> implements 
SourceFunction.SourceContext<T> {
 
        @Override
        public void emitWatermark(Watermark mark) {
-               // don't do anything
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void markAsTemporarilyIdle() {
+               throw new UnsupportedOperationException();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
index 8332cb3..9030e9d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java
@@ -228,6 +228,12 @@ public class StatefulSequenceSourceTest {
 
                @Override
                public void emitWatermark(Watermark mark) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void markAsTemporarilyIdle() {
+                       throw new UnsupportedOperationException();
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
index 6b36419..d81b440 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
@@ -58,10 +58,13 @@ public class FileMonitoringFunctionTest {
                                        public void emitWatermark(Watermark 
mark) {}
 
                                        @Override
+                                       public void markAsTemporarilyIdle() {}
+
+                                       @Override
                                        public Object getCheckpointLock() { 
return null; }
 
                                        @Override
                                        public void close() {}
                                });
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
index d1131b4..bb80228 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java
@@ -225,6 +225,11 @@ public class InputFormatSourceFunctionTest {
                }
 
                @Override
+               public void markAsTemporarilyIdle() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
                public Object getCheckpointLock() {
                        return null;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
index 3e274cf..87376e7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunctionTest.java
@@ -283,7 +283,14 @@ public class SocketTextStreamFunctionTest {
                                        }
 
                                        @Override
-                                       public void emitWatermark(Watermark 
mark) {}
+                                       public void emitWatermark(Watermark 
mark) {
+                                               throw new 
UnsupportedOperationException();
+                                       }
+
+                                       @Override
+                                       public void markAsTemporarilyIdle() {
+                                               throw new 
UnsupportedOperationException();
+                                       }
 
                                        @Override
                                        public Object getCheckpointLock() {
@@ -346,4 +353,4 @@ public class SocketTextStreamFunctionTest {
                        }
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 357163c..c4ddea8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
@@ -219,9 +220,11 @@ public class AbstractUdfStreamOperatorLifecycleTest {
                }
 
                @Override
-               public void run(Object lockingObject, Output<StreamRecord<OUT>> 
collector) throws Exception {
+               public void run(Object lockingObject,
+                                               StreamStatusMaintainer 
streamStatusMaintainer,
+                                               Output<StreamRecord<OUT>> 
collector) throws Exception {
                        ACTUAL_ORDER_TRACKING.add("OPERATOR::run");
-                       super.run(lockingObject, collector);
+                       super.run(lockingObject, streamStatusMaintainer, 
collector);
                        runStarted.trigger();
                        runFinish.await();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
new file mode 100644
index 0000000..3695120
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.CollectorOutput;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class StreamSourceContextIdleDetectionTests {
+
+       /** The tests in this class will be parameterized with these 
enumerations.*/
+       private enum TestMethod {
+
+               /** test idleness detection using the {@link 
SourceFunction.SourceContext#collect(Object)} method */
+               COLLECT,
+
+               /** test idleness detection using the {@link 
SourceFunction.SourceContext#collectWithTimestamp(Object, long)} method */
+               COLLECT_WITH_TIMESTAMP,
+
+               /** test idleness detection using the {@link 
SourceFunction.SourceContext#emitWatermark(Watermark)} method */
+               EMIT_WATERMARK
+       }
+
+       private TestMethod testMethod;
+
+       public StreamSourceContextIdleDetectionTests(TestMethod testMethod) {
+               this.testMethod = testMethod;
+       }
+
+       /**
+        * Test scenario (idleTimeout = 100):
+        * (1) Start from 0 as initial time.
+        * (2) As soon as time reaches 100, status should have been toggled to 
IDLE.
+        * (3) After some arbitrary time (until 300), the status should remain 
IDLE.
+        * (4) Emit a record at 310. Status should become ACTIVE. This should 
fire a idleness detection at 410.
+        * (5) Emit another record at 320 (which is before the next check). 
This should make the idleness check pass.
+        * (6) Advance time to 410 and trigger idleness detection.
+        *     The status should still be ACTIVE due to step (5). Another 
idleness detection should be fired at 510.
+        * (7) Advance time to 510 and trigger idleness detection. Since no 
records were collected in-between the two
+        *     idleness detections, status should have been toggle back to IDLE.
+        *
+        * Inline comments will refer to the corresponding tested steps in the 
scenario.
+        */
+       @Test
+       public void testManualWatermarkContext() throws Exception {
+               long idleTimeout = 100;
+
+               long initialTime = 0;
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+               processingTimeService.setCurrentTime(initialTime);
+
+               final List<StreamElement> output = new ArrayList<>();
+
+               MockStreamStatusMaintainer mockStreamStatusMaintainer = new 
MockStreamStatusMaintainer();
+
+               SourceFunction.SourceContext<String> context = 
StreamSourceContexts.getSourceContext(
+                       TimeCharacteristic.EventTime,
+                       processingTimeService,
+                       new Object(),
+                       mockStreamStatusMaintainer,
+                       new CollectorOutput<String>(output),
+                       0,
+                       idleTimeout);
+
+               // -------------------------- begin test scenario 
--------------------------
+
+               // corresponds to step (2) of scenario (please see method-level 
Javadoc comment)
+               processingTimeService.setCurrentTime(initialTime + idleTimeout);
+               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+
+               // corresponds to step (3) of scenario (please see method-level 
Javadoc comment)
+               processingTimeService.setCurrentTime(initialTime + 
2*idleTimeout);
+               processingTimeService.setCurrentTime(initialTime + 
3*idleTimeout);
+               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+
+               // corresponds to step (4) of scenario (please see method-level 
Javadoc comment)
+               processingTimeService.setCurrentTime(initialTime + 
3*idleTimeout + idleTimeout/10);
+               switch (testMethod) {
+                       case COLLECT:
+                               context.collect("msg");
+                               break;
+                       case COLLECT_WITH_TIMESTAMP:
+                               context.collectWithTimestamp("msg", 
processingTimeService.getCurrentProcessingTime());
+                               break;
+                       case EMIT_WATERMARK:
+                               context.emitWatermark(new 
Watermark(processingTimeService.getCurrentProcessingTime()));
+                               break;
+               }
+               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+
+               // corresponds to step (5) of scenario (please see method-level 
Javadoc comment)
+               processingTimeService.setCurrentTime(initialTime + 
3*idleTimeout + 2*idleTimeout/10);
+               switch (testMethod) {
+                       case COLLECT:
+                               context.collect("msg");
+                               break;
+                       case COLLECT_WITH_TIMESTAMP:
+                               context.collectWithTimestamp("msg", 
processingTimeService.getCurrentProcessingTime());
+                               break;
+                       case EMIT_WATERMARK:
+                               context.emitWatermark(new 
Watermark(processingTimeService.getCurrentProcessingTime()));
+                               break;
+               }
+               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+
+               // corresponds to step (6) of scenario (please see method-level 
Javadoc comment)
+               processingTimeService.setCurrentTime(initialTime + 
4*idleTimeout + idleTimeout/10);
+               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+
+               // corresponds to step (7) of scenario (please see method-level 
Javadoc comment)
+               processingTimeService.setCurrentTime(initialTime + 
5*idleTimeout + idleTimeout/10);
+               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+       }
+
+       /**
+        * Test scenario (idleTimeout = 100, watermarkInterval = 40):
+        * (1) Start from 20 as initial time.
+        * (2) As soon as time reaches 120, status should have been toggled to 
IDLE.
+        * (3) After some arbitrary time (until 320), the status should remain 
IDLE, and no watermarks should have been emitted.
+        * (4) Emit a record at 330. Status should become ACTIVE. This should 
schedule a idleness detection to be fired at 430.
+        * (5) Emit another record at 350 (which is before the next check). 
This should make the idleness check pass.
+        * (6) Advance time to 430 and trigger idleness detection. The status 
should still be ACTIVE due to step (5).
+        *     This should schedule a idleness detection to be fired at 530.
+        * (7) Advance time to 460, in which a watermark emission task should 
be fired. Idleness detection
+        *     should have been "piggy-backed" in the task, allowing the status 
to be toggled to IDLE before the next
+        *     actual idle detection task at 530.
+        *
+        * Inline comments will refer to the corresponding tested steps in the 
scenario.
+        */
+       @Test
+       public void testAutomaticWatermarkContext() throws Exception {
+               long watermarkInterval = 40;
+               long idleTimeout = 100;
+               long initialTime = 20;
+
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+               processingTimeService.setCurrentTime(initialTime);
+
+               MockStreamStatusMaintainer mockStreamStatusMaintainer = new 
MockStreamStatusMaintainer();
+
+               final List<StreamElement> output = new ArrayList<>();
+               final List<StreamElement> expectedOutput = new ArrayList<>();
+
+               SourceFunction.SourceContext<String> context = 
StreamSourceContexts.getSourceContext(
+                       TimeCharacteristic.IngestionTime,
+                       processingTimeService,
+                       new Object(),
+                       mockStreamStatusMaintainer,
+                       new CollectorOutput<String>(output),
+                       watermarkInterval,
+                       idleTimeout);
+
+               // -------------------------- begin test scenario 
--------------------------
+
+               // corresponds to step (2) of scenario (please see method-level 
Javadoc comment)
+               processingTimeService.setCurrentTime(initialTime + 
watermarkInterval);
+               expectedOutput.add(new 
Watermark(processingTimeService.getCurrentProcessingTime() - 
(processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+               processingTimeService.setCurrentTime(initialTime + 
2*watermarkInterval);
+               expectedOutput.add(new 
Watermark(processingTimeService.getCurrentProcessingTime() - 
(processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+               processingTimeService.setCurrentTime(initialTime + idleTimeout);
+               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+               assertEquals(expectedOutput, output);
+
+               // corresponds to step (3) of scenario (please see method-level 
Javadoc comment)
+               processingTimeService.setCurrentTime(initialTime + 
3*watermarkInterval);
+               processingTimeService.setCurrentTime(initialTime + 
4*watermarkInterval);
+               processingTimeService.setCurrentTime(initialTime + 
2*idleTimeout);
+               processingTimeService.setCurrentTime(initialTime + 
6*watermarkInterval);
+               processingTimeService.setCurrentTime(initialTime + 
7*watermarkInterval);
+               processingTimeService.setCurrentTime(initialTime + 
3*idleTimeout);
+               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+               assertEquals(expectedOutput, output);
+
+               // corresponds to step (4) of scenario (please see method-level 
Javadoc comment)
+               processingTimeService.setCurrentTime(initialTime + 
3*idleTimeout + idleTimeout/10);
+               switch (testMethod) {
+                       case COLLECT:
+                               context.collect("msg");
+                               expectedOutput.add(new StreamRecord<>("msg", 
processingTimeService.getCurrentProcessingTime()));
+                               expectedOutput.add(new 
Watermark(processingTimeService.getCurrentProcessingTime() - 
(processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+                               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+                               assertEquals(expectedOutput, output);
+                               break;
+                       case COLLECT_WITH_TIMESTAMP:
+                               context.collectWithTimestamp("msg", 
processingTimeService.getCurrentProcessingTime());
+                               expectedOutput.add(new StreamRecord<>("msg", 
processingTimeService.getCurrentProcessingTime()));
+                               expectedOutput.add(new 
Watermark(processingTimeService.getCurrentProcessingTime() - 
(processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+                               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+                               assertEquals(expectedOutput, output);
+                               break;
+                       case EMIT_WATERMARK:
+                               // for emitWatermark, since the watermark will 
be blocked,
+                               // it should not make the status become active;
+                               // from here on, the status should remain idle 
for the emitWatermark variant test
+                               context.emitWatermark(new 
Watermark(processingTimeService.getCurrentProcessingTime()));
+                               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+                               assertEquals(expectedOutput, output);
+               }
+
+               // corresponds to step (5) of scenario (please see method-level 
Javadoc comment)
+               processingTimeService.setCurrentTime(initialTime + 
8*watermarkInterval);
+               processingTimeService.setCurrentTime(initialTime + 
3*idleTimeout + 3*idleTimeout/10);
+               switch (testMethod) {
+                       case COLLECT:
+                               context.collect("msg");
+                               expectedOutput.add(new StreamRecord<>("msg", 
processingTimeService.getCurrentProcessingTime()));
+                               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+                               assertEquals(expectedOutput, output);
+                               break;
+                       case COLLECT_WITH_TIMESTAMP:
+                               context.collectWithTimestamp("msg", 
processingTimeService.getCurrentProcessingTime());
+                               expectedOutput.add(new StreamRecord<>("msg", 
processingTimeService.getCurrentProcessingTime()));
+                               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+                               assertEquals(expectedOutput, output);
+                               break;
+                       case EMIT_WATERMARK:
+                               context.emitWatermark(new 
Watermark(processingTimeService.getCurrentProcessingTime()));
+                               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+                               assertEquals(expectedOutput, output);
+               }
+
+               processingTimeService.setCurrentTime(initialTime + 9 * 
watermarkInterval);
+               switch (testMethod) {
+                       case COLLECT:
+                       case COLLECT_WITH_TIMESTAMP:
+                               expectedOutput.add(new 
Watermark(processingTimeService.getCurrentProcessingTime() - 
(processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+                               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+                               assertEquals(expectedOutput, output);
+                               break;
+                       case EMIT_WATERMARK:
+                               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+                               assertEquals(expectedOutput, output);
+               }
+
+               processingTimeService.setCurrentTime(initialTime + 
10*watermarkInterval);
+               switch (testMethod) {
+                       case COLLECT:
+                       case COLLECT_WITH_TIMESTAMP:
+                               expectedOutput.add(new 
Watermark(processingTimeService.getCurrentProcessingTime() - 
(processingTimeService.getCurrentProcessingTime() % watermarkInterval)));
+                               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+                               assertEquals(expectedOutput, output);
+                               break;
+                       case EMIT_WATERMARK:
+                               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+                               assertEquals(expectedOutput, output);
+               }
+
+               // corresponds to step (6) of scenario (please see method-level 
Javadoc comment)
+               processingTimeService.setCurrentTime(initialTime + 
4*idleTimeout + idleTimeout/10);
+               switch (testMethod) {
+                       case COLLECT:
+                       case COLLECT_WITH_TIMESTAMP:
+                               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isActive());
+                               assertEquals(expectedOutput, output);
+                               break;
+                       case EMIT_WATERMARK:
+                               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+                               assertEquals(expectedOutput, output);
+               }
+
+               // corresponds to step (7) of scenario (please see method-level 
Javadoc comment)
+               processingTimeService.setCurrentTime(initialTime + 
11*watermarkInterval);
+               
assertTrue(mockStreamStatusMaintainer.getStreamStatus().isIdle());
+               assertEquals(expectedOutput, output);
+       }
+
+       private static class MockStreamStatusMaintainer implements 
StreamStatusMaintainer {
+               StreamStatus currentStreamStatus = StreamStatus.ACTIVE;
+
+               @Override
+               public void toggleStreamStatus(StreamStatus streamStatus) {
+                       if (!currentStreamStatus.equals(streamStatus)) {
+                               currentStreamStatus = streamStatus;
+                       }
+               }
+
+               @Override
+               public StreamStatus getStreamStatus() {
+                       return currentStreamStatus;
+               }
+       }
+
+       @Parameterized.Parameters(name = "TestMethod = {0}")
+       @SuppressWarnings("unchecked")
+       public static Collection<TestMethod[]> timeCharacteristic(){
+               return Arrays.asList(
+                       new TestMethod[]{TestMethod.COLLECT},
+                       new TestMethod[]{TestMethod.COLLECT_WITH_TIMESTAMP},
+                       new TestMethod[]{TestMethod.EMIT_WATERMARK});
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index b153de9..ae74c9c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -35,6 +36,8 @@ import 
org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -69,7 +72,7 @@ public class StreamSourceOperatorTest {
                final List<StreamElement> output = new ArrayList<>();
                
                setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0);
-               operator.run(new Object(), new CollectorOutput<String>(output));
+               operator.run(new Object(), mock(StreamStatusMaintainer.class), 
new CollectorOutput<String>(output));
                
                assertEquals(1, output.size());
                assertEquals(Watermark.MAX_WATERMARK, output.get(0));
@@ -89,7 +92,7 @@ public class StreamSourceOperatorTest {
                operator.cancel();
 
                // run and exit
-               operator.run(new Object(), new CollectorOutput<String>(output));
+               operator.run(new Object(), mock(StreamStatusMaintainer.class), 
new CollectorOutput<String>(output));
                
                assertTrue(output.isEmpty());
        }
@@ -121,7 +124,7 @@ public class StreamSourceOperatorTest {
                
                // run and wait to be canceled
                try {
-                       operator.run(new Object(), new 
CollectorOutput<String>(output));
+                       operator.run(new Object(), 
mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
                }
                catch (InterruptedException ignored) {}
 
@@ -142,7 +145,7 @@ public class StreamSourceOperatorTest {
                operator.stop();
 
                // run and stop
-               operator.run(new Object(), new CollectorOutput<String>(output));
+               operator.run(new Object(), mock(StreamStatusMaintainer.class), 
new CollectorOutput<String>(output));
 
                assertTrue(output.isEmpty());
        }
@@ -171,7 +174,7 @@ public class StreamSourceOperatorTest {
                }.start();
 
                // run and wait to be stopped
-               operator.run(new Object(), new CollectorOutput<String>(output));
+               operator.run(new Object(), mock(StreamStatusMaintainer.class), 
new CollectorOutput<String>(output));
 
                assertTrue(output.isEmpty());
        }
@@ -198,7 +201,7 @@ public class StreamSourceOperatorTest {
                setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
latencyMarkInterval, testProcessingTimeService);
 
                // run and wait to be stopped
-               operator.run(new Object(), new CollectorOutput<Long>(output));
+               operator.run(new Object(), mock(StreamStatusMaintainer.class), 
new CollectorOutput<Long>(output));
 
                int numberLatencyMarkers = (int) (maxProcessingTime / 
latencyMarkInterval) + 1;
 
@@ -224,11 +227,6 @@ public class StreamSourceOperatorTest {
        }
 
        @Test
-       public void testLatencyMarksEmitterLifecycleIntegration() {
-
-       }
-
-       @Test
        public void testAutomaticWatermarkContext() throws Exception {
 
                // regular stream source operator
@@ -246,8 +244,10 @@ public class StreamSourceOperatorTest {
                
StreamSourceContexts.getSourceContext(TimeCharacteristic.IngestionTime,
                        operator.getContainingTask().getProcessingTimeService(),
                        operator.getContainingTask().getCheckpointLock(),
+                       
operator.getContainingTask().getStreamStatusMaintainer(),
                        new CollectorOutput<String>(output),
-                       
operator.getExecutionConfig().getAutoWatermarkInterval());
+                       
operator.getExecutionConfig().getAutoWatermarkInterval(),
+                       -1);
 
                // periodically emit the watermarks
                // even though we start from 1 the watermark are still
@@ -302,6 +302,7 @@ public class StreamSourceOperatorTest {
                when(mockTask.getEnvironment()).thenReturn(env);
                when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
                
when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, 
Accumulator<?, ?>>emptyMap());
+               
when(mockTask.getStreamStatusMaintainer()).thenReturn(mock(StreamStatusMaintainer.class));
 
                doAnswer(new Answer<ProcessingTimeService>() {
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index d33d1b6..1e74c3e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -84,6 +84,7 @@ import 
org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -759,7 +760,9 @@ public class StreamTaskTest extends TestLogger {
                }
 
                @Override
-               public void run(Object lockingObject, 
Output<StreamRecord<Long>> collector) throws Exception {
+               public void run(Object lockingObject,
+                                               StreamStatusMaintainer 
streamStatusMaintainer,
+                                               Output<StreamRecord<Long>> 
collector) throws Exception {
                        while (!canceled) {
                                try {
                                        Thread.sleep(500);

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 2df4efd..01afec6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -54,6 +54,8 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -157,6 +159,22 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                processingTimeService = new TestProcessingTimeService();
                processingTimeService.setCurrentTime(0);
 
+               StreamStatusMaintainer mockStreamStatusMaintainer = new 
StreamStatusMaintainer() {
+                       StreamStatus currentStreamStatus = StreamStatus.ACTIVE;
+
+                       @Override
+                       public void toggleStreamStatus(StreamStatus 
streamStatus) {
+                               if (!currentStreamStatus.equals(streamStatus)) {
+                                       currentStreamStatus = streamStatus;
+                               }
+                       }
+
+                       @Override
+                       public StreamStatus getStreamStatus() {
+                               return currentStreamStatus;
+                       }
+               };
+
                when(mockTask.getName()).thenReturn("Mock Task");
                when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
                when(mockTask.getConfiguration()).thenReturn(config);
@@ -165,6 +183,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
                
when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
                
when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
+               
when(mockTask.getStreamStatusMaintainer()).thenReturn(mockStreamStatusMaintainer);
 
                doAnswer(new Answer<Void>() {
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b0f0f372/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
index fe2b03e..d9ad24d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
@@ -56,6 +56,11 @@ public class CollectingSourceContext<T extends Serializable> 
implements SourceFu
        }
 
        @Override
+       public void markAsTemporarilyIdle() {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
        public Object getCheckpointLock() {
                return lock;
        }

Reply via email to