[
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16911275#comment-16911275
]
zhijiang commented on FLINK-13798:
----------------------------------
Thanks for the offline discussion and suggestion. [~pnowojski] [~tzulitai]
Could you double check my above understanding is right?
> Refactor the process of checking stream status while emitting watermark in
> source
> ---------------------------------------------------------------------------------
>
> Key: FLINK-13798
> URL: https://issues.apache.org/jira/browse/FLINK-13798
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Task
> Reporter: zhijiang
> Assignee: zhijiang
> Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream
> status is active. For the downstream task we already have the component of
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for
> the source task the current implementation of this logic seems a bit tricky.
> There are two scenarios for the source case:
> * Emit watermark via source context: In the specific WatermarkContext, it
> would toggle the stream status as active before collecting/emitting
> records/watermarks. Then in the implementation of RecordWriterOutput, it
> would check the status always active before really emitting watermark.
> * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by
> timer in interval time. When it happens, it would call output stack to emit
> watermark. Then the RecordWriterOutput could take the role of checking status
> before really emitting watermark.
> So we can see that the checking status logic in RecordWriterOutput only works
> for above second scenario, and this logic seems redundant for the first
> scenario because WatermarkContext always toggle active status before
> emitting. Even worse, the logic is RecordWriterOutput would bring cycle
> dependency with StreamStatusMaintainer, which is a blocker for the following
> work of integrating source processing on runtime side.
> The solution is that we could migrate the checking logic from
> RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And we could
> also remove the toggle active logic in existing WatermarkContext.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)