[
https://issues.apache.org/jira/browse/FLINK-21132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17274538#comment-17274538
]
Roman Khachatryan commented on FLINK-21132:
-------------------------------------------
Thanks for your feedback [~aljoscha] and [~kezhuw] . I've also discussed it
offline with [~pnowojski].
We agreed that endInpt should NOT be called (no operators should rely on it, if
any do they should be fixed).
(so no StopWithSavepointAware needed)
As for the detection (stop-with-savepoint vs other cases), we agree the
proposed solution isn't ideal. However, it's much less invasive and we'd like
to provide fix ASAP so we lean toward it.
I'm adjusting it to all cases of chaining (these are different code paths).
> BoundedOneInput.endInput is called when taking synchronous savepoint
> --------------------------------------------------------------------
>
> Key: FLINK-21132
> URL: https://issues.apache.org/jira/browse/FLINK-21132
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.10.2, 1.10.3, 1.11.3, 1.12.1
> Reporter: Kezhu Wang
> Assignee: Roman Khachatryan
> Priority: Major
>
> [~elkhand](?) reported on project iceberg that {{BoundedOneInput.endInput}}
> was
> [called|https://github.com/apache/iceberg/issues/2033#issuecomment-765864038]
> when [stopping job with
> savepoint|https://github.com/apache/iceberg/issues/2033#issuecomment-765557995].
> I think it is a bug of Flink and was introduced in FLINK-14230. The
> [changes|https://github.com/apache/flink/pull/9854/files#diff-0c5fe245445b932fa83fdaf5c4802dbe671b73e8d76f39058c6eaaaffd9639faL577]
> rely on {{StreamTask.afterInvoke}} and {{OperatorChain.closeOperators}} will
> only be invoked after *end of input*. But that is not true long before after
> [FLIP-34: Terminate/Suspend Job with
> Savepoint|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212].
> Task could enter state called
> [*finished*|https://github.com/apache/flink/blob/3a8e06cd16480eacbbf0c10f36b8c79a6f741814/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L467]
> after synchronous savepoint, that is an expected job suspension and stopping.
> [~sunhaibotb] [~pnowojski] [~roman_khachatryan] Could you help confirm this ?
> For full context, see
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]. I have
> pushed branch
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
> in my repository. Test case
> {{SavepointITCase.testStopSavepointWithBoundedInput}} failed due to
> {{BoundedOneInput.endInput}} called.
> I am also aware of [FLIP-147: Support Checkpoints After Tasks
> Finished|https://cwiki.apache.org/confluence/x/mw-ZCQ], maybe the three
> should align on what *finished* means exactly. [~kkl0u] [~chesnay]
> [~gaoyunhaii]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)