[
https://issues.apache.org/jira/browse/FLINK-21132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17277083#comment-17277083
]
Piotr Nowojski edited comment on FLINK-21132 at 2/2/21, 12:32 PM:
------------------------------------------------------------------
I'm sharing the same concern as [~roman_khachatryan] about controlling the flow
using the exception, but that could be easily changed in your version [~kezhuw].
I think the larger difference is that [~roman_khachatryan]'s proposal is doing
the clean shutdown, via {{StreamTask#afterInvoke}}, but just set's the flag to
not invoke {{endOfInput}}. While [~kezhuw] approach is re-using the
cancellation/failure code path of shutting down. Bypassing {{afterInvoke}}
seems simpler, as it doesn't require changes in the {{OperatorChain}} and
{{StreamOperatorWrapper}} classes. Maybe the appropriate question would be:
Do we want to do the proper shutdown procedure for stopping with savepoint? For
example quiescing the timer service, potentially processing some records,
finally calling {{close()}} on the operator, flushing the outputs - all of that
while we have already decided to successfully stop with savepoint?
{quote}
For mailbox tasks, it will not handle any mail until checkpoint confirmation.
After checkpoint completion, poison mail will break out mailbox loop. I did not
find a hole in between since there is no chance to read/write.
{quote}
The question is also how to handle situations when stop with savepoint has been
cancelled because savepoint failed (for example it was declined as there was a
race condition with some source finishing and sending {{END_OF_PARITTION}}
event while CheckpointCoordinator was triggering stop with savepoint).
was (Author: pnowojski):
I'm sharing the same concern as [~roman_khachatryan] about controlling the flow
using the exception, but that could be easily changed in your version [~kezhuw].
I think the larger difference is that [~roman_khachatryan]'s proposal is doing
the clean shutdown, via {{StreamTask#afterInvoke}}, but just set's the flag to
not invoke {{endOfInput}}. While [~kezhuw] approach is re-using the
cancellation/failure code path of shutting down. Bypassing {{afterInvoke}}
seems simpler, as it doesn't require changes in the {{OperatorChain}} and
{{StreamOperatorWrapper}} classes. Maybe the appropriate question would be:
Do we want to do the proper shutdown procedure for stopping with savepoint? For
example quiescing the timer service, potentially processing some records,
finally calling {{close()}} on the operator, flushing the outputs - all of that
while we have already decided to successfully stop with savepoint?
> For mailbox tasks, it will not handle any mail until checkpoint confirmation.
> After checkpoint completion, poison mail will break out mailbox loop. I did
> not find a hole in between since there is no chance to read/write.
> BoundedOneInput.endInput is called when taking synchronous savepoint
> --------------------------------------------------------------------
>
> Key: FLINK-21132
> URL: https://issues.apache.org/jira/browse/FLINK-21132
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.10.2, 1.10.3, 1.11.3, 1.12.1
> Reporter: Kezhu Wang
> Assignee: Roman Khachatryan
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.11.4, 1.12.2, 1.13.0
>
>
> [~elkhand](?) reported on project iceberg that {{BoundedOneInput.endInput}}
> was
> [called|https://github.com/apache/iceberg/issues/2033#issuecomment-765864038]
> when [stopping job with
> savepoint|https://github.com/apache/iceberg/issues/2033#issuecomment-765557995].
> I think it is a bug of Flink and was introduced in FLINK-14230. The
> [changes|https://github.com/apache/flink/pull/9854/files#diff-0c5fe245445b932fa83fdaf5c4802dbe671b73e8d76f39058c6eaaaffd9639faL577]
> rely on {{StreamTask.afterInvoke}} and {{OperatorChain.closeOperators}} will
> only be invoked after *end of input*. But that is not true long before after
> [FLIP-34: Terminate/Suspend Job with
> Savepoint|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212].
> Task could enter state called
> [*finished*|https://github.com/apache/flink/blob/3a8e06cd16480eacbbf0c10f36b8c79a6f741814/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L467]
> after synchronous savepoint, that is an expected job suspension and stopping.
> [~sunhaibotb] [~pnowojski] [~roman_khachatryan] Could you help confirm this ?
> For full context, see
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]. I have
> pushed branch
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
> in my repository. Test case
> {{SavepointITCase.testStopSavepointWithBoundedInput}} failed due to
> {{BoundedOneInput.endInput}} called.
> I am also aware of [FLIP-147: Support Checkpoints After Tasks
> Finished|https://cwiki.apache.org/confluence/x/mw-ZCQ], maybe the three
> should align on what *finished* means exactly. [~kkl0u] [~chesnay]
> [~gaoyunhaii]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)