[ 
https://issues.apache.org/jira/browse/FLINK-21469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17341661#comment-17341661
 ] 

Yuan Mei edited comment on FLINK-21469 at 5/10/21, 4:56 AM:
------------------------------------------------------------

For stop with save point, `StreamTask#advanceToEndOfEventTime()`  is called (in 
source tasks) to advance to the max watermark; However, this is not the case 
for chained sources in `MultipleInputStreamTask` since 
`advanceToEndOfEventTime` is not implemented. This PR is to fix this problem.

*The fix is as follows:*

Consider `MultipleInputStreamTask` as a chained task with some inputs are 
sources(StreamTaskSourceInput) and the rest from 
network(StreamTaskNetworkInput). `MAX_WATERMARK` is injected from the source 
when advanceToEndOfEventTime is called, together with `MAX_WATERMARK` 
propagated from the NetworkInput, `MultipleInputStreamTask` emits 
`MAX_WATERMARK`.


was (Author: ym):
For stop with save point, `StreamTask#advanceToEndOfEventTime()`  is called (in 
source tasks) to advance to the max watermark; However, this is not the case 
for chained sources in `MultipleInputStreamTask` since 
`advanceToEndOfEventTime` is not implemented. This PR is to fix this problem.

*The fix is as follows:
Consider `MultipleInputStreamTask` as a chained task with some inputs are 
sources(StreamTaskSourceInput) and the rest from 
network(StreamTaskNetworkInput). `MAX_WATERMARK` is injected from the source 
when advanceToEndOfEventTime is called, together with `MAX_WATERMARK` 
propagated from the NetworkInput, `MultipleInputStreamTask` emits 
`MAX_WATERMARK`.

> stop-with-savepoint --drain doesn't advance watermark for sources chained to 
> MultipleInputStreamTask
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21469
>                 URL: https://issues.apache.org/jira/browse/FLINK-21469
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, Runtime / Task
>    Affects Versions: 1.12.1, 1.13.0
>            Reporter: Piotr Nowojski
>            Priority: Critical
>              Labels: pull-request-available
>
> {{StreamTask#advanceToEndOfEventTime()}} is used to bump the watermark to 
> {{MAX_WATERMARK}}. However this method is only implemented for the source 
> tasks (legacy and FLIP-27). Because of that watermarks are not advanced to 
> end of time if job has sources chained to {{MultipleInputStreamTask}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to