[
https://issues.apache.org/jira/browse/FLINK-21030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17277248#comment-17277248
]
Till Rohrmann edited comment on FLINK-21030 at 2/2/21, 4:21 PM:
----------------------------------------------------------------
I think the problem can and should be solved in the scope of the
{{SchedulerBase}}/{{DefaultScheduler}}. The underlying problem is that by
calling {{SchedulerBase.stopWithSavepoint}} the scheduler should go into a
"stop-with-savepoint" state where it should react differently to the available
signals. In fact, the stop with savepoint is a two stage operation which needs
different behaviour depending on the stage.
1. Taking a savepoint
2. Stopping the job gracefully (waiting that the notify checkpoint complete
messages are sent and the {{Tasks}} shut themselves down)
If a failure occurs in the first stage, then we should fail the
stop-with-savepoint operation and recover the job normally. If a failure occurs
in the second stage, then it is possible that parts of the {{ExecutionGraph}}
have already shut down. Hence, any task failure needs to trigger a global
failover where we restart the whole job.
At the moment, the second stage is not treated properly which results into the
reported problem.
The easiest solution I can think of is to predicate the stop-with-savepoint
operation on the state of the {{Executions}}. Only if all {{Executions}} reach
a {{FINISHED}} state, then the job can reach a {{FINISHED}} state. Hence, if
one observes an {{Execution}} which reaches another terminal state, then one
knows that the savepoint operation has failed. Depending on whether one is in
stage 1. or 2. one needs to do nothing or trigger a global job failover.
Alternatively, one could make the state of stop-with-savepoint more explicit by
letting the {{SchedulerBase}} store a {{StopWithSavepointOperation}}. This
operation would then influence what {{DefaultScheduler.handleTaskFailure}} and
{{DefaultScheduler.handleGlobalFailure}} do. If, for example, we are in the 2.
stage, then a single task failure should trigger a global failover.
cc [~zhuzh]
was (Author: till.rohrmann):
I think the problem can and should be solved in the scope of the
{{SchedulerBase}}/{{DefaultScheduler}}. The underlying problem is that by
calling {{SchedulerBase.stopWithSavepoint}} the scheduler should go into a
"stop-with-savepoint" state where it should react differently to the available
signals. In fact, the stop with savepoint is a two stage operation which needs
different behaviour depending on the stage.
1. Taking a savepoint
2. Stopping the job gracefully (waiting that the notify checkpoint complete
messages are sent and the {{Tasks}} shut themselves down)
If a failure occurs in the first stage, then we should fail the
stop-with-savepoint operation and recover the job normally. If a failure occurs
in the second stage, then it possible that parts of the {{ExecutionGraph}} have
already shut down. Hence, any task failure needs to trigger a global failover
where we restart the whole job.
At the moment, the second stage is not treated properly which results into the
reported problem.
The easiest solution I can think of is to predicate the stop-with-savepoint
operation on the state of the {{Executions}}. Only if all {{Executions}} reach
a {{FINISHED}} state, then the job can reach a {{FINISHED}} state. Hence, if
one observes an {{Execution}} which reaches another terminal state, then one
knows that the savepoint operation has failed. Depending on whether one is in
stage 1. or 2. one needs to do nothing or trigger a global job failover.
Alternatively, one could make the state of stop-with-savepoint more explicit by
letting the {{SchedulerBase}} store a {{StopWithSavepointOperation}}. This
operation would then influence what {{DefaultScheduler.handleTaskFailure}} and
{{DefaultScheduler.handleGlobalFailure}} do. If, for example, we are in the 2.
stage, then a single task failure should trigger a global failover.
cc [~zhuzh]
> Broken job restart for job with disjoint graph
> ----------------------------------------------
>
> Key: FLINK-21030
> URL: https://issues.apache.org/jira/browse/FLINK-21030
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.11.2
> Reporter: Theo Diefenthal
> Assignee: Matthias
> Priority: Blocker
> Fix For: 1.11.4, 1.12.2, 1.13.0
>
>
> Building on top of bugs:
> https://issues.apache.org/jira/browse/FLINK-21028
> and https://issues.apache.org/jira/browse/FLINK-21029 :
> I tried to stop a Flink application on YARN via savepoint which didn't
> succeed due to a possible bug/racecondition in shutdown (Bug 21028). Due to
> some reason, Flink attempted to restart the pipeline after the failure in
> shutdown (21029). The bug here:
> As I mentioned: My jobgraph is disjoint and the pipelines are fully isolated.
> Lets say the original error occured in a single task of pipeline1. Flink then
> restarted the entire pipeline1, but pipeline2 was shutdown successfully and
> switched the state to FINISHED.
> My job thus was in kind of an invalid state after the attempt to stopping:
> One of two pipelines was running, the other was FINISHED. I guess this is
> kind of a bug in the restarting behavior that only all connected components
> of a graph are restarted, but the others aren't...
--
This message was sent by Atlassian Jira
(v8.3.4#803005)