[
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16911537#comment-16911537
]
zhijiang commented on FLINK-13798:
----------------------------------
Thanks for this notification [~StephanEwen]. Wish it would make the watermark
logic easy to go in the new source operator. But now we have to solve this
issue by refactoring TimestampsAndPeriodicWatermarksOperator. :(
> Refactor the process of checking stream status while emitting watermark in
> source
> ---------------------------------------------------------------------------------
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Task
> Reporter: zhijiang
> Assignee: zhijiang
> Priority: Minor
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> As we know, the watermark could be emitted to downstream only when the stream
> status is active. For the downstream task we already have the component of
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for
> the source task the current implementation of this logic seems a bit tricky.
> There are two scenarios for the source case:
> * In the source WatermarkContext, it would toggle the status as active while
> collecting/emitting and the status is checked in RecordWriterOutput. If the
> watermark is triggered by timer for AutomaticWatermarkContext, the timer task
> would check the status before emitting watermark.
> * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by
> timer, but it still relies on RecordWriterOutput to check the status before
> emitting.
> So the check logic in RecordWriterOutput only makes sense for the last
> scenario, and seems redundant for the first scenario.
> Even worse, this logic in RecordWriterOutput would bring cycle dependency
> with StreamStatusMaintainer, which is a blocker for the following work of
> integrating source processing on runtime side.
> To solve above issues, the basic idea is to refactor this check logic in
> upper layer instead of current low level RecordWriterOutput. The solution is
> migrating the check logic from RecordWriterOutput to
> TimestampsAndPeriodicWatermarksOperator. And we could further remove the
> logic of toggling active in WatermarkContext
--
This message was sent by Atlassian Jira
(v8.3.2#803003)