[ 
https://issues.apache.org/jira/browse/FLINK-21132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17276297#comment-17276297
 ] 

Roman Khachatryan commented on FLINK-21132:
-------------------------------------------

Hi [~kezhuw] , thanks for the clarification.

I'm concerned about the order in which tasks would terminate. If we rely on 
checkpoint completion notifications for that then downstream can terminate 
before upstream which can fail the job if there are any more data.

I see that StopTaskException is similar to CancelTaskException but adding more 
logic like CancelTaskException is what I'd like to avoid :) Maybe others won't 
agree with me though.

 

Also we are discussing an issue of potentially missing endInput if a true 
end-of-input is encountered after sync-savepoint but before job termination. It 
looks like the above approach is also subject to it.

 

> BoundedOneInput.endInput is called when taking synchronous savepoint
> --------------------------------------------------------------------
>
>                 Key: FLINK-21132
>                 URL: https://issues.apache.org/jira/browse/FLINK-21132
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.10.2, 1.10.3, 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Assignee: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.11.4, 1.12.2, 1.13.0
>
>
> [~elkhand](?) reported on project iceberg that {{BoundedOneInput.endInput}} 
> was 
> [called|https://github.com/apache/iceberg/issues/2033#issuecomment-765864038] 
> when [stopping job with 
> savepoint|https://github.com/apache/iceberg/issues/2033#issuecomment-765557995].
> I think it is a bug of Flink and was introduced in FLINK-14230. The 
> [changes|https://github.com/apache/flink/pull/9854/files#diff-0c5fe245445b932fa83fdaf5c4802dbe671b73e8d76f39058c6eaaaffd9639faL577]
>  rely on {{StreamTask.afterInvoke}} and {{OperatorChain.closeOperators}} will 
> only be invoked after *end of input*. But that is not true long before after 
> [FLIP-34: Terminate/Suspend Job with 
> Savepoint|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212].
>  Task could enter state called 
> [*finished*|https://github.com/apache/flink/blob/3a8e06cd16480eacbbf0c10f36b8c79a6f741814/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L467]
>  after synchronous savepoint, that is an expected job suspension and stopping.
> [~sunhaibotb] [~pnowojski] [~roman_khachatryan] Could you help confirm this ?
> For full context, see 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]. I have 
> pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. Test case 
> {{SavepointITCase.testStopSavepointWithBoundedInput}} failed due to 
> {{BoundedOneInput.endInput}} called.
> I am also aware of [FLIP-147: Support Checkpoints After Tasks 
> Finished|https://cwiki.apache.org/confluence/x/mw-ZCQ], maybe the three 
> should align on what *finished* means exactly. [~kkl0u] [~chesnay] 
> [~gaoyunhaii]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to