[
https://issues.apache.org/jira/browse/FLINK-21132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17277160#comment-17277160
]
Piotr Nowojski commented on FLINK-21132:
----------------------------------------
{quote}
I'm concerned about the order in which tasks would terminate. If we rely on
checkpoint completion notifications for that then downstream can terminate
before upstream which can fail the job if there are any more data.
{quote}
I've discussed this concern offline with [~roman_khachatryan]. Probably when
downstream terminates before the upstream would not cause job failure, but in
your proposal [~kezhuw] bad things can happen. The root cause of problems is
that you are shutting down tasks from {{notifyCheckpointCompleted}} call, while
both current solution in the master and [~roman_khachatryan] proposal is
relaying on clean shutdown after receiving {{END_OF_PARTITION}} events. So the
shutdown happens from the heads/sources to tails/sinks.
Keep in mind that while task is spinning inside
{{runSynchronousSavepointMailboxLoop}}, it's not processing inputs BUT it can
be producing output via mailbox actions (processing time timers firing
{{WindowOperator}}, or just emitting records from the mailbox action like
{{AsyncWaitOperator}} or {{ContinuousFileReaderOperator}} are doing). Rarely
this can cause a back-pressure to happen and task might be stuck inside
{{runSynchronousSavepointMailboxLoop}}.
Now in your proposal [~kezhuw] if downstream task receives
{{notifyCheckpointCompleted}} before the upstream, there will be nobody to
relieve the upstream task from the backpressure and upstream task would be
deadlocked, not able to process {{notifyCheckpointCompleted}}.
It's not ideal what we have at the moment, in the future we might decide to
shut down earlier, without waiting for {{END_OF_PARTITION}} to travel through
all of the job graph, but that would have to be a separate story, independent
of this bug fix.
> BoundedOneInput.endInput is called when taking synchronous savepoint
> --------------------------------------------------------------------
>
> Key: FLINK-21132
> URL: https://issues.apache.org/jira/browse/FLINK-21132
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.10.2, 1.10.3, 1.11.3, 1.12.1
> Reporter: Kezhu Wang
> Assignee: Roman Khachatryan
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.11.4, 1.12.2, 1.13.0
>
>
> [~elkhand](?) reported on project iceberg that {{BoundedOneInput.endInput}}
> was
> [called|https://github.com/apache/iceberg/issues/2033#issuecomment-765864038]
> when [stopping job with
> savepoint|https://github.com/apache/iceberg/issues/2033#issuecomment-765557995].
> I think it is a bug of Flink and was introduced in FLINK-14230. The
> [changes|https://github.com/apache/flink/pull/9854/files#diff-0c5fe245445b932fa83fdaf5c4802dbe671b73e8d76f39058c6eaaaffd9639faL577]
> rely on {{StreamTask.afterInvoke}} and {{OperatorChain.closeOperators}} will
> only be invoked after *end of input*. But that is not true long before after
> [FLIP-34: Terminate/Suspend Job with
> Savepoint|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212].
> Task could enter state called
> [*finished*|https://github.com/apache/flink/blob/3a8e06cd16480eacbbf0c10f36b8c79a6f741814/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L467]
> after synchronous savepoint, that is an expected job suspension and stopping.
> [~sunhaibotb] [~pnowojski] [~roman_khachatryan] Could you help confirm this ?
> For full context, see
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]. I have
> pushed branch
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
> in my repository. Test case
> {{SavepointITCase.testStopSavepointWithBoundedInput}} failed due to
> {{BoundedOneInput.endInput}} called.
> I am also aware of [FLIP-147: Support Checkpoints After Tasks
> Finished|https://cwiki.apache.org/confluence/x/mw-ZCQ], maybe the three
> should align on what *finished* means exactly. [~kkl0u] [~chesnay]
> [~gaoyunhaii]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)