[ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16911275#comment-16911275
 ] 

zhijiang commented on FLINK-13798:
----------------------------------

Thanks for the offline discussion and suggestion.  [~pnowojski]  [~tzulitai]

Could you double check my above understanding is right?

> Refactor the process of checking stream status while emitting watermark in 
> source
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-13798
>                 URL: https://issues.apache.org/jira/browse/FLINK-13798
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Minor
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are two scenarios for the source case:
>  * Emit watermark via source context: In the specific WatermarkContext, it 
> would toggle the  stream status as active before collecting/emitting 
> records/watermarks. Then in the implementation of RecordWriterOutput, it 
> would check the status always active before really emitting watermark.
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
> timer in interval time. When it happens, it would call output stack to emit 
> watermark. Then the RecordWriterOutput could take the role of checking status 
> before really emitting watermark.
> So we can see that the checking status logic in RecordWriterOutput only works 
> for above second scenario, and this logic seems redundant for the first 
> scenario because WatermarkContext always toggle active status before 
> emitting. Even worse, the logic is RecordWriterOutput would bring cycle 
> dependency with StreamStatusMaintainer, which is a blocker for the following 
> work of integrating source processing on runtime side.
> The solution is that we could migrate the checking logic from 
> RecordWriterOutput to TimestampsAndPeriodicWatermarksOperator. And we could 
> also remove the toggle active logic  in existing WatermarkContext.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to