GitHub user tzulitai opened a pull request:
https://github.com/apache/flink/pull/3347
[FLINK-5716] [streaming] Make StreamSourceContexts aware of source idleness
This PR is the second part of adding `StreamStatus` to the streaming
runtime to facilitate idle sources in Flink.
It allows the `AutomaticWatermarkContext` for ingestion time and
`ManualWatermarkContext` for event time to detect source idleness. It works
based on a fixed-interval check implementation, where we check if the context
has collected any records or watermarks from the source UDF in-between two
checks. If no records or watermarks were collected in-between 2 checks, we
toggle the `SourceStreamTask` to be `IDLE`. As soon as a record or watermark is
collected afterwards, we toggle back to be `ACTIVE`.
This idleness check is disabled by default. It is not yet configurable with
this PR (that would be worked on as a separate issue
[FLINK-5018](https://issues.apache.org/jira/browse/FLINK-5018)).
It also introduces a new user-exposed method to the
`SourceFunction.SourceContext` interface:
```
interface SourceFunction.SourceContext {
...
void markAsTemporarilyIdle();
...
}
```
The purpose of this method is to allow the source UDF to proactively mark
the source as `IDLE` without waiting for the underlying interval checks. UDFs
should make a best effort to call this. For example, the Kafka and Kinesis
consumers source instances can call this as soon as they determine they will
not have subscribed partitions on startup.
## Others
Introduced an internal interface `StreamStatusMaintainer` -
```
interface StreamStatusMaintainer extends StreamStatusProvider {
void toggleStreamStatus(StreamStatus streamStatus);
}
```
Main reason for this is to keep the `StreamSource` operator unaccessible to
the `OperatorChain`.
## Tests
Added `StreamSourceContextIdleDetectionTests` to test the proposed
functionality. I did not add task-level tests that uses `StreamTaskTestHarness`
yet because the idle timeout is not yet configurable beyond the `SourceContext`
interfaces. I propose to add tests with task harnesses along with
[FLINK-5018](https://issues.apache.org/jira/browse/FLINK-5018).
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tzulitai/flink FLINK-5716
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3347.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3347
----
commit 66851ad987dedceb74475472d13a267101a95f61
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date: 2017-02-16T18:43:44Z
[FLINK-5716] [streaming] Make StreamSourceContexts aware of source idleness
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---