GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3347

    [FLINK-5716] [streaming] Make StreamSourceContexts aware of source idleness

    This PR is the second part of adding `StreamStatus` to the streaming 
runtime to facilitate idle sources in Flink.
    
    It allows the `AutomaticWatermarkContext` for ingestion time and 
`ManualWatermarkContext` for event time to detect source idleness. It works 
based on a fixed-interval check implementation, where we check if the context 
has collected any records or watermarks from the source UDF in-between two 
checks. If no records or watermarks were collected in-between 2 checks, we 
toggle the `SourceStreamTask` to be `IDLE`. As soon as a record or watermark is 
collected afterwards, we toggle back to be `ACTIVE`.
    
    This idleness check is disabled by default. It is not yet configurable with 
this PR (that would be worked on as a separate issue 
[FLINK-5018](https://issues.apache.org/jira/browse/FLINK-5018)).
    
    It also introduces a new user-exposed method to the 
`SourceFunction.SourceContext` interface:
    ```
    interface SourceFunction.SourceContext {
        ...
        void markAsTemporarilyIdle();
        ...
    }
    ```
    
    The purpose of this method is to allow the source UDF to proactively mark 
the source as `IDLE` without waiting for the underlying interval checks. UDFs 
should make a best effort to call this. For example, the Kafka and Kinesis 
consumers source instances can call this as soon as they determine they will 
not have subscribed partitions on startup.
    
    ## Others
    
    Introduced an internal interface `StreamStatusMaintainer` -
    ```
    interface StreamStatusMaintainer extends StreamStatusProvider {
        void toggleStreamStatus(StreamStatus streamStatus);
    }
    ```
    
    Main reason for this is to keep the `StreamSource` operator unaccessible to 
the `OperatorChain`.
    
    ## Tests
    
    Added `StreamSourceContextIdleDetectionTests` to test the proposed 
functionality. I did not add task-level tests that uses `StreamTaskTestHarness` 
yet because the idle timeout is not yet configurable beyond the `SourceContext` 
interfaces. I propose to add tests with task harnesses along with 
[FLINK-5018](https://issues.apache.org/jira/browse/FLINK-5018).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-5716

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3347.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3347
    
----
commit 66851ad987dedceb74475472d13a267101a95f61
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2017-02-16T18:43:44Z

    [FLINK-5716] [streaming] Make StreamSourceContexts aware of source idleness

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to