[ https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16911275#comment-16911275 ]
zhijiang commented on FLINK-13798: ---------------------------------- Thanks for the offline discussion and suggestion. [~pnowojski] [~tzulitai] Could you double check my above understanding is right? > Refactor the process of checking stream status while emitting watermark in > source > --------------------------------------------------------------------------------- > > Key: FLINK-13798 > URL: https://issues.apache.org/jira/browse/FLINK-13798 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Task > Reporter: zhijiang > Assignee: zhijiang > Priority: Minor > > As we know, the watermark could be emitted to downstream only when the stream > status is active. For the downstream task we already have the component of > StatusWatermarkValve in StreamInputProcessor to handle this logic. But for > the source task the current implementation of this logic seems a bit tricky. > There are two scenarios for the source case: > * Emit watermark via source context: In the specific WatermarkContext, it > would toggle the stream status as active before collecting/emitting > records/watermarks. Then in the implementation of RecordWriterOutput, it > would check the status always active before really emitting watermark. > * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by > timer in interval time. When it happens, it would call output stack to emit > watermark. Then the RecordWriterOutput could take the role of checking status > before really emitting watermark. > So we can see that the checking status logic in RecordWriterOutput only works > for above second scenario, and this logic seems redundant for the first > scenario because WatermarkContext always toggle active status before > emitting. Even worse, the logic is RecordWriterOutput would bring cycle > dependency with StreamStatusMaintainer, which is a blocker for the following > work of integrating source processing on runtime side. > The solution is that we could migrate the checking logic from > RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And we could > also remove the toggle active logic in existing WatermarkContext. -- This message was sent by Atlassian Jira (v8.3.2#803003)