[ 
https://issues.apache.org/jira/browse/FLINK-21132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17276235#comment-17276235
 ] 

Kezhu Wang edited comment on FLINK-21132 at 2/1/21, 10:29 AM:
--------------------------------------------------------------

Hi [~roman_khachatryan], thanks for reviewing, let me detail:
 1. The key change this proposal made is *making old 
{{StreamTask.finishTask}}(renaming to {{StreamTask.cancelTask}}) works for all 
tasks but not only source tasks*. So this will works as long as we fire 
checkpoint-complete on all tasks. Before this change, {{StreamTask.finishTask}} 
is an nop on no source tasks.
 2. {{StopTaskException}} is just like {{CancelTaskException}}, but got 
different treatment in {{Task.doRun}}.

3. FLINK-21133 is about FLIP-27 source, which is {{SourceOperatorStreamTask}} 
but not {{SourceStreamTask}}. {{SourceStreamTask}} get special treatment in old 
solution also.

I would like list changes in running order:
 1. Up receiving {{notifyCheckpointComplete}}, it stops current task no matter 
whether it is a source or not. This operator will cause mailbox loop run out.
 2. After stopping current task, it set {{stopWithSavepointId}} for later usage.
 3. After mailbox loop run out, if {{stopWithSavepointId}} is set, throw 
{{StopTaskException}}.
 4. {{Task.doRun}} will skip result partition finishing(eg. firing EoP) if 
{{StopTaskException}} is captured.

Another observation, I think current behavior(source EoP) may not work with 
FLIP-147 which allows no-source heading operator.


was (Author: kezhuw):
Hi [~roman_khachatryan], thanks for reviewing, let me detail:
 1. The key change this proposal made is *making old 
{{StreamTask.finishTask}}(renaming to {{StreamTask.cancelTask}}) works for all 
tasks but not only source tasks*. So this will works as long as we fire 
checkpoint on all tasks. Before this change, {{StreamTask.finishTask}} is an 
nop on no source tasks.
 2. {{StopTaskException}} is just like {{CancelTaskException}}, but got 
different treatment in {{Task.doRun}}.

3. FLINK-21133 is about FLIP-27 source, which is {{SourceOperatorStreamTask}} 
but not {{SourceStreamTask}}. {{SourceStreamTask}} get special treatment in old 
solution also.

I would like list changes in running order:
 1. Up receiving {{notifyCheckpointComplete}}, it stops current task no matter 
whether it is a source or not. This operator will cause mailbox loop run out.
 2. After stopping current task, it set {{stopWithSavepointId}} for later usage.
 3. After mailbox loop run out, if {{stopWithSavepointId}} is set, throw 
{{StopTaskException}}.
 4. {{Task.doRun}} will skip result partition finishing(eg. firing EoP) if 
{{StopTaskException}} is captured.

Another observation, I think current behavior(source EoP) may not work with 
FLIP-147 which allows no-source heading operator.

> BoundedOneInput.endInput is called when taking synchronous savepoint
> --------------------------------------------------------------------
>
>                 Key: FLINK-21132
>                 URL: https://issues.apache.org/jira/browse/FLINK-21132
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.10.2, 1.10.3, 1.11.3, 1.12.1
>            Reporter: Kezhu Wang
>            Assignee: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.11.4, 1.12.2, 1.13.0
>
>
> [~elkhand](?) reported on project iceberg that {{BoundedOneInput.endInput}} 
> was 
> [called|https://github.com/apache/iceberg/issues/2033#issuecomment-765864038] 
> when [stopping job with 
> savepoint|https://github.com/apache/iceberg/issues/2033#issuecomment-765557995].
> I think it is a bug of Flink and was introduced in FLINK-14230. The 
> [changes|https://github.com/apache/flink/pull/9854/files#diff-0c5fe245445b932fa83fdaf5c4802dbe671b73e8d76f39058c6eaaaffd9639faL577]
>  rely on {{StreamTask.afterInvoke}} and {{OperatorChain.closeOperators}} will 
> only be invoked after *end of input*. But that is not true long before after 
> [FLIP-34: Terminate/Suspend Job with 
> Savepoint|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212].
>  Task could enter state called 
> [*finished*|https://github.com/apache/flink/blob/3a8e06cd16480eacbbf0c10f36b8c79a6f741814/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L467]
>  after synchronous savepoint, that is an expected job suspension and stopping.
> [~sunhaibotb] [~pnowojski] [~roman_khachatryan] Could you help confirm this ?
> For full context, see 
> [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033]. I have 
> pushed branch 
> [synchronous-savepoint-conflict-with-bounded-end-input-case|https://github.com/kezhuw/flink/commits/synchronous-savepoint-conflict-with-bounded-end-input-case]
>  in my repository. Test case 
> {{SavepointITCase.testStopSavepointWithBoundedInput}} failed due to 
> {{BoundedOneInput.endInput}} called.
> I am also aware of [FLIP-147: Support Checkpoints After Tasks 
> Finished|https://cwiki.apache.org/confluence/x/mw-ZCQ], maybe the three 
> should align on what *finished* means exactly. [~kkl0u] [~chesnay] 
> [~gaoyunhaii]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to