[ 
https://issues.apache.org/jira/browse/FLINK-13798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16914044#comment-16914044
 ] 

zhijiang commented on FLINK-13798:
----------------------------------

After further researching on this issue, it seems not feasible to do this 
refactoring now.

The previous way of checking status before emitting watermark works both on the 
input and output sides.
 * For the header operator in task, the StatusWatermarkValve in InputProcessor 
would check status.
 * For the chained operators in task, the chained output generated by 
OperatorChain would check status.
 * For the tail operator in task, the RecordWriterOutput would check status.

So the internal logics of {{emitWatermark/onProcessingTime}} in different 
operators do not need to do anything. But if we remove the check in 
RecordWriterOutput, that means we need to add this logic in all involved 
operators which covers many places like 
(TimestampsAndPeriodicWatermarksOperator, 
KeyedCoProcessOperatorWithWatermarkDelay, ExtractTimestampsOperator, 
MiniBatchAssignerOperator, WatermarkAssignerOperator, AbstractStreamOperator, 
etc).  So it is easy to miss some points also for some corner cases if we 
migrate this check logic in upper layer. The previous way of checking in output 
is just for avoiding missing any places in different kinds of operators. 

I already confirmed this issue with [~tzulitai] ,  [~pnowojski]  and reach the 
agreement to keep the way as now.  The new source api would trigger watermark 
only from the source, then it might make the logics easy for handling. And we 
might remove/integrate the existing checks in different outputs then.

> Refactor the process of checking stream status while emitting watermark in 
> source
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-13798
>                 URL: https://issues.apache.org/jira/browse/FLINK-13798
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Task
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Minor
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> As we know, the watermark could be emitted to downstream only when the stream 
> status is active. For the downstream task we already have the component of 
> StatusWatermarkValve in StreamInputProcessor to handle this logic. But for 
> the source task the current implementation of this logic seems a bit tricky. 
> There are two scenarios for the source case:
>  * In the source WatermarkContext, it would toggle the status as active while 
> collecting/emitting and the status is checked in RecordWriterOutput. If the 
> watermark is triggered by timer for AutomaticWatermarkContext, the timer task 
> would check the status before emitting watermark. 
>  * TimestampsAndPeriodicWatermarksOperator: The watermark is triggered by 
> timer, but it still relies on RecordWriterOutput to check the status before 
> emitting.
> So the check logic in RecordWriterOutput only makes sense for the last 
> scenario, and seems redundant for the first scenario.
> Even worse, this logic in RecordWriterOutput would bring cycle dependency 
> with StreamStatusMaintainer, which is a blocker for the following work of 
> integrating source processing on runtime side.
> To solve above issues, the basic idea is to refactor this check logic in 
> upper layer instead of current low level RecordWriterOutput. The solution is 
> migrating the check logic from RecordWriterOutput to 
> TimestampsAndPeriodicWatermarksOperator. And we could further remove the 
> logic of toggling active in WatermarkContext



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to