[ 
https://issues.apache.org/jira/browse/FLINK-14375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-14375:
--------------------------------

    Assignee: Zhu Zhu

> Avoid to trigger failover on a non-effective task failure notification
> ----------------------------------------------------------------------
>
>                 Key: FLINK-14375
>                 URL: https://issues.apache.org/jira/browse/FLINK-14375
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.0
>            Reporter: Zhu Zhu
>            Assignee: Zhu Zhu
>            Priority: Major
>             Fix For: 1.10.0
>
>
> The DefaultScheduler triggers failover if a task is notified to be FAILED. 
> However, in the case the multiple tasks in the same region fail together, it 
> will trigger multiple failovers. The later triggered failovers are useless, 
> lead to concurrent failovers and will increase the restart attempts count.
> I think the deep reason for this issue is that some fake state changes are 
> notified to the DefaultScheduler.
> The case above is a FAILED state change from TM will turn a CANCELING vertex 
> to CANCELED, and the actual state transition is to CANCELED. But a FAILED 
> state is notified to DefaultScheduler.
> And there can be another possible issue caused by it, that a FINISHED state 
> change is notified from TM when a vertex is CANCELING. The vertex will become 
> CANCELED, while its FINISHED state change will be notified to 
> DefaultScheduler which may trigger downstream task scheduling.
> I'd propose to translate a state update to be correct before notifying it to 
> DefaultScheduler, namely do changes in SchedulerBase:
> {code:java}
>       @Override
>       public final boolean updateTaskExecutionState(final TaskExecutionState 
> taskExecutionState) {
>               final Optional<ExecutionVertexID> executionVertexId = 
> getExecutionVertexId(taskExecutionState.getID());
>               if (executionVertexId.isPresent()) {
>                       executionGraph.updateState(taskExecutionState);
>                       
> updateTaskExecutionStateInternal(executionVertexId.get(), new 
> TaskExecutionState(
>                               taskExecutionState.getJobID(),
>                               taskExecutionState.getID(),
>                               
> getExecutionVertex(executionVertexId.get()).getExecutionState(),
>                               taskExecutionState.getError(userCodeLoader)
>                       ));
>                       return true;
>               }
>               return false;
>       }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to