[ 
https://issues.apache.org/jira/browse/FLINK-21132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17271999#comment-17271999
 ] 

Yun Gao commented on FLINK-21132:
---------------------------------

Hi [~kezhuw], very thanks for brought up this issue, I have some thoughts from 
the sink's perspective: I also think as [~pnowojski] said, there will be two 
cases, namely the job is indeed bounded and stop with savepoint. 

First of all, I think there would be some problem fro the current 
implementation that committed in endOfInput(), considering if the job is 
bounded (namely the first case). The problem is that if the failover happens 
right after commit() in the endOfInput, then the job will be restarted and 
fallback to the last checkpoint, which will cause replay of the data committed 
in endOfInput. [FLIP-147: Support checkpoint after tasks 
finished|https://cwiki.apache.org/confluence/x/mw-ZCQ] is a precedent step to 
solve the commit problem of the sinks in the first case. It tries to support 
checkpoints after some tasks finished (e.g., all the tasks before the sink are 
finished), and the expected behavior of the sink operator is  

- endOfInput (close all the opening files)
- wait for a new checkpoint
- snapshotState
- notifyCheckpointComplete (here we would do the commit)
- close().

For the second case, namely stop with savepoint, the current process would be
- snapshotState() for the savepoint
- notifyCheckpointComplete() for the savepoint
- possibly process some data
- endOfInput()
- close()

In this case, the sink should still do not need to commit inside endOfInput() 
because all the state are expected to be committed in the second step, namely 
notifyCheckpointComplete(). 

The current issue to me might be that we do not solve the first case yet so 
that we would have to do some workaround, like commit in endOfInput()

> BoundedOneInput.endInput is called when taking synchronous savepoint
> --------------------------------------------------------------------
>
>                 Key: FLINK-21132
>                 URL: https://issues.apache.org/jira/browse/FLINK-21132
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.10.2, 1.10.3, 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Priority: Major
>
> [~elkhand](?) reported on project iceberg that {{BoundedOneInput.endInput}} 
> was 
> [called|https://github.com/apache/iceberg/issues/2033#issuecomment-765864038] 
> when [stopping job with 
> savepoint|https://github.com/apache/iceberg/issues/2033#issuecomment-765557995].
> I think it is a bug of Flink and was introduced in FLINK-14230. The 
> [changes|https://github.com/apache/flink/pull/9854/files#diff-0c5fe245445b932fa83fdaf5c4802dbe671b73e8d76f39058c6eaaaffd9639faL577]
>  rely on {{StreamTask.afterInvoke}} and {{OperatorChain.closeOperators}} will 
> only be invoked after *end of input*. But that is not true long before after 
> [FLIP-34: Terminate/Suspend Job with 
> Savepoint|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212].
>  Task could enter state called 
> [*finished*|https://github.com/apache/flink/blob/3a8e06cd16480eacbbf0c10f36b8c79a6f741814/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L467]
>  after synchronous savepoint, that is an expected job suspension and stopping.
> [~sunhaibotb] [~pnowojski] [~roman_khachatryan] Could you help confirm this ?
> For full context, see 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]. I have 
> pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. Test case 
> {{SavepointITCase.testStopSavepointWithBoundedInput}} failed due to 
> {{BoundedOneInput.endInput}} called.
> I am also aware of [FLIP-147: Support Checkpoints After Tasks 
> Finished|https://cwiki.apache.org/confluence/x/mw-ZCQ], maybe the three 
> should align on what *finished* means exactly. [~kkl0u] [~chesnay] 
> [~gaoyunhaii]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to