GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/2801

    [FLINK-5017] [streaming] Introduce StreamStatus to facilitate idle sources

    This PR is the first part of making temporarily idle sources in Flink 
possible, by adding a new `StreamStatus` element that flows with the records. 
The second part, allowing source operators to emit `StreamStatus` elements, 
will be submitted as a separate PR based on this one.
    
    `StreamStatus` elements are generated at the sources, and affect how 
operators advance their watermarks with the presence of idle sources.
    
    Prior to this PR, when advancing watermarks at downstream operators, the 
new min watermark is found by simply determining if the min watermark across 
all input channels has advanced. This resulted in
    watermark-stalling downstream operators when there are idle sources.  With 
this change, operators can
    now mark input channels to be idle, and ignore them when advancing their 
watermark.
    
    # Design Choices
    
    ### `StreamStatus` are only generated at sources
    
    Sources guarantee that no records or watermarks will be emitted between a 
`IDLE` and `ACTIVE` (this would be implemented in the second part PR). All 
downstream operators need to process statuses that they receive, and 
appropriately forward status changes. `StreamStatus` can not be generated 
mid-topology.
    
    ### Using two status markers - `IDLE` and `ACTIVE`
    
    We need 2 markers, instead of only a `IDLE`, because operators at the 
`AbstractStreamOperator` level need to clearly know the end and start of 
idleness. I had considered using only a `IDLE` marker and seeing any watermarks 
/ records received afterwards as a sign of resuming to be active, but that 
would deny us of correctly determining whether or not watermarks generated at 
timestamp extractors in the middle of topologies should actually be ignored 
(watermarks generated in the middle of topologies need to be ignored when the 
operator is actually idle, because they can be generated even if sources are 
idle and records aren't flowing through).
    
    Despite 2 markers, I plan to only have a single new 
`markAsTemporarilyIdle()` method on `SourceContext`s that serve as the only 
means for user source functions to express that the source is idle (included in 
second part PR). `SourceContext` implementations are responsible for 
controlling how actual `StreamStatus` elements are sent downstream.
    
    ### Consolidate watermark / status forwarding as a `StatusWatermarkValve`
    
    Since the forwarding logic is rather complex now with this change, all 
forwarding logic is bundled into a "valve", that `OneInputStreamTask`, 
`TwoInputStreamTask`, and `AbstractStreamOperator` use to control watermark and 
status forwarding.
    
    `StatusWatermarkValve` takes a implementation of `ValveOutputHandler`. 
Implementations decide what to do when a watermark or status is emitted from 
the valve. For example, `OneInputStreamTask` and `TwoInputStreamTask` simply 
forwards it to the head operator; `AbstractStreamOperator` needs to advance 
timers when the valve outputs a new watermark.
    
    I didn't want to use the `Output` interface, because record elements and 
latency markers have nothing to do with the valve's control logic; they are 
always simply forwarded.
    
    ### Adding a `setup()` life cycle method to `StreamInputProcessor` and 
`StreamTwoInputProcessor`
    
    This is mainly to facilitate creating `StatusWatermarkValve`s in the input 
processors. They need reference to the head operator when being created. I 
considered passing it in to input processors as a constructor argument in 
`init()`, but the `StreamIterationTail` operator forbids doing so, because for 
that operator, the `headOperator` is created only after `init()`.
    
    We could also consider moving creation of input processors into the 
beginning of `run()`, which would avoid the need of a `setup()` method on the 
input processors, but I wasn't sure if that would break anything.
    
    ### Block watermarks generated mid-topology at the `AbstractStreamOperator` 
level when idle
    
    Two catches for this:
    
    - `processStreamStatus` must NOT be overriden by concrete implementations. 
We need to rely on that to correctly block watermarks generated by timestamp 
extractors that emit watermarks completely bypassing the valve's forwarding 
logic.
    
    - The current implementation only works for one-input stream operators that 
generate watermarks. Since we don't seem to have two-input stream operators 
that generate watermarks also, this should be fine for now.
    
    # Testing
    
    - Add new test in `AbstractStreamOperatorTest` to test that for concrete 
one-input operators that bypass the valve and directly emit watermarks, the 
watermarks are blocked if the operator is idle.
    - Unit tests for `StatusWatermarkValve` for complex forwarding cases.
    - Extended `testWatermarkForwarding` tests in both `OneInputStreamTaskTest` 
and `TwoInputStreamTaskTest` to also test stream status forwarding. They are 
relatively simple compared to unit test for `StatusWatermarkValve`, just to 
implicitly ensure that tasks are using valves correctly.
    
    I plan to add IT tests as second part PR, when source operators can start 
emitting `StreamStatus`.
    
    # Other Remarks
    
    While working on this task, I have a feeling that perhaps we can consider 
to start differentiating between what process element methods can be overriden 
by concrete operator implementations, and what can't.
    
    For example, it would be best if 
`AbstractStreamOperator#processStreamStatus()` method can be `final`, to keep 
Stream Status processing logic away from concrete implementations, forbidding 
any possibility of overriding that.
    
    Right now this isn't possible, as we need to tie processing methods to the 
`OneInputStreamOperator` / `TwoInputStreamOperator` interfaces for a mixin 
pattern with the `AbstractStreamOperator`.
    
    What we could probably do, is to let input processors access the head 
operator as a `AbstractStreamOperator` so that the processing methods only 
visible at the abstract level can be called, and we keep only the processing 
methods we allow to override in `OneInputStreamOperator` / 
`TwoInputStreamOperator`.
    
    Processing methods that I think should only be visible at the abstract 
level are: `processElement()`, `processStreamStatus()`, and 
`processLatencyMarker()`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-5017

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2801.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2801
    
----
commit c70d4dca464220ae63f596a474bdd4d957934838
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2016-11-14T02:53:18Z

    [FLINK-5017] [streaming] Introduce StreamStatus to facilitate idle sources
    
    This commit is the first part of making idle streaming sources in Flink 
possible. It introduces a new
    element, StreamStatus, that flows with other records in streams. 
StreamStatus elements are generated
    at the sources, and affect how operators advance their watermarks with the 
presence of idle sources.
    
    Prior to this commit, when advancing watermarks at downstream operators, 
the new min watermark is found
    by simply determining if the min watermark across all input channels has 
advanced. This resulted in
    watermark-stalling downstream operators when there are idle sources.  With 
this change, operators can
    now mark input channels to be idle, and ignore them when advancing their 
watermark.
    
    This commit also includes refactoring of previous watermark forwarding 
logic into a single class,
    StatusWatermarkVavle. OneInputStreamTasks, TwoInputStreamTasks, and 
AbstractStreamOperator use valves
    to help them determine how watermarks and stream statuses are forwarded.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to