[
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zhijiang updated FLINK-13798:
-----------------------------
Description:
As we know, the watermark could be emitted to downstream only when the stream
status is active. For the downstream task we already have the component of
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the
source task the current implementation of this logic seems a bit tricky. There
are two scenarios for the source case:
* In the source WatermarkContext, it would toggle the status as active while
collecting/emitting and the status is checked in RecordWriterOutput. If the
watermark is triggered by timer for AutomaticWatermarkContext, the timer task
would check the status before emitting watermark.
* TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by
timer, but it still relies on RecordWriterOutput to check the status before
emitting.
So the check logic in RecordWriterOutput only makes sense for the last
scenario, and seems redundant for the first scenario.
Even worse, this logic in RecordWriterOutput would bring cycle dependency with
StreamStatusMaintainer, which is a blocker for the following work of
integrating source processing on runtime side.
To solve above issues, the basic idea is to refactor this check logic in upper
layer instead of current low level RecordWriterOutput. The solution is
migrating the check logic from RecordWriterOutput to
TimestampsAndPeriodicWatermarksOperator. And we could further remove the logic
of toggling active in WatermarkContext
was:
As we know, the watermark could be emitted to downstream only when the stream
status is active. For the downstream task we already have the component of
StatusWatermarkValve in StreamInputProcessor to handle this logic. But for the
source task the current implementation of this logic seems a bit tricky. There
are two scenarios for the source case:
* In the source WatermarkContext, it would toggle the status as active while
collecting/emitting and the status is checked in RecordWriterOutput. If the
watermark is triggered by timer for AutomaticWatermarkContext, the timer task
would check the status before emitting watermark.
* TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by
timer, but it still relies on RecordWriterOutput to check the status before
emitting.
So the check logic in RecordWriterOutput only makes sense for the last
scenario, and seems redundant for the first scenario.
Even worse, this logic in RecordWriterOutput would bring cycle dependency with
StreamStatusMaintainer, which is a blocker for the following work of
integrating source processing on runtime side.
To solve above issues, the basic idea is to refactor this check logic in upper
layer instead of current low level RecordWriterOutput. The solution is
migrating the check logic from RecordWriterOutput to
TimestampsAndPeriodicWatermarksOperator.
> Refactor the process of checking stream status while emitting watermark in
> source
> ---------------------------------------------------------------------------------
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Task
> Reporter: zhijiang
> Assignee: zhijiang
> Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream
> status is active. For the downstream task we already have the component of
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for
> the source task the current implementation of this logic seems a bit tricky.
> There are two scenarios for the source case:
> * In the source WatermarkContext, it would toggle the status as active while
> collecting/emitting and the status is checked in RecordWriterOutput. If the
> watermark is triggered by timer for AutomaticWatermarkContext, the timer task
> would check the status before emitting watermark.
> * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by
> timer, but it still relies on RecordWriterOutput to check the status before
> emitting.
> So the check logic in RecordWriterOutput only makes sense for the last
> scenario, and seems redundant for the first scenario.
> Even worse, this logic in RecordWriterOutput would bring cycle dependency
> with StreamStatusMaintainer, which is a blocker for the following work of
> integrating source processing on runtime side.
> To solve above issues, the basic idea is to refactor this check logic in
> upper layer instead of current low level RecordWriterOutput. The solution is
> migrating the check logic from RecordWriterOutput to
> TimestampsAndPeriodicWatermarksOperator. And we could further remove the
> logic of toggling active in WatermarkContext
--
This message was sent by Atlassian Jira
(v8.3.2#803003)