[ 
https://issues.apache.org/jira/browse/FLINK-14375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-14375:
----------------------------
    Description: 
The DefaultScheduler triggers failover if a task is notified to be FAILED. 
However, in the case the multiple tasks in the same region fail together, it 
will trigger multiple failovers. The later triggered failovers are useless, 
lead to concurrent failovers and will increase the restart attempts count.

I think the deep reason for this issue is that some fake state changes are 
notified to the DefaultScheduler.
The case above is a FAILED state change from TM will turn a CANCELING vertex to 
CANCELED, and the actual state transition is to CANCELED. But a FAILED state is 
notified to DefaultScheduler.

And there can be another possible issue caused by it, that a FINISHED state 
change is notified from TM when a vertex is CANCELING. The vertex will become 
CANCELED, while its FINISHED state change will be notified to DefaultScheduler 
which may trigger downstream task scheduling.

I'd propose to translate a state update to be correct before notifying it to 
DefaultScheduler, namely do changes in SchedulerBase:


{code:java}
        @Override
        public final boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
                final Optional<ExecutionVertexID> executionVertexId = 
getExecutionVertexId(taskExecutionState.getID());
                if (executionVertexId.isPresent()) {
                        executionGraph.updateState(taskExecutionState);
                        
updateTaskExecutionStateInternal(executionVertexId.get(), new 
TaskExecutionState(
                                taskExecutionState.getJobID(),
                                taskExecutionState.getID(),
                                
getExecutionVertex(executionVertexId.get()).getExecutionState(),
                                taskExecutionState.getError(userCodeLoader)
                        ));
                        return true;
                }
                return false;
        }
{code}



  was:
The DefaultScheduler triggers failover if a task is notified to be FAILED. 
However, in the case the multiple tasks in the same region fail together, it 
will trigger multiple failovers. The later triggered failovers are useless, 
lead to concurrent failovers and will increase the restart attempts count.

I think the deep reason for this issue is that some fake state changes are 
notified to the DefaultScheduler.
The case above is a FAILED state change from TM will turn a CANCELING vertex to 
CANCELED, and the actual state transition is to CANCELED. But a FAILED state is 
notified to DefaultScheduler.

And there can be another possible issue caused by it, that a FINISHED state 
change is notified from TM when a vertex is CANCELING. The vertex will become 
CANCELED, while its FINISHED state change will be notified to DefaultScheduler 
which may trigger downstream task scheduling.

I'd propose to translate a state update to be correct before notifying it to 
DefaultScheduler, namely do changes in SchedulerBase:

        @Override
        public final boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
                final Optional<ExecutionVertexID> executionVertexId = 
getExecutionVertexId(taskExecutionState.getID());
                if (executionVertexId.isPresent()) {
                        executionGraph.updateState(taskExecutionState);
                        
updateTaskExecutionStateInternal(executionVertexId.get(), new 
TaskExecutionState(
                                taskExecutionState.getJobID(),
                                taskExecutionState.getID(),
                                
getExecutionVertex(executionVertexId.get()).getExecutionState(),
                                taskExecutionState.getError(userCodeLoader)
                        ));
                        return true;
                }
                return false;
        }



> Avoid to trigger failover on a non-effective task failure notification
> ----------------------------------------------------------------------
>
>                 Key: FLINK-14375
>                 URL: https://issues.apache.org/jira/browse/FLINK-14375
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.0
>            Reporter: Zhu Zhu
>            Priority: Major
>             Fix For: 1.10.0
>
>
> The DefaultScheduler triggers failover if a task is notified to be FAILED. 
> However, in the case the multiple tasks in the same region fail together, it 
> will trigger multiple failovers. The later triggered failovers are useless, 
> lead to concurrent failovers and will increase the restart attempts count.
> I think the deep reason for this issue is that some fake state changes are 
> notified to the DefaultScheduler.
> The case above is a FAILED state change from TM will turn a CANCELING vertex 
> to CANCELED, and the actual state transition is to CANCELED. But a FAILED 
> state is notified to DefaultScheduler.
> And there can be another possible issue caused by it, that a FINISHED state 
> change is notified from TM when a vertex is CANCELING. The vertex will become 
> CANCELED, while its FINISHED state change will be notified to 
> DefaultScheduler which may trigger downstream task scheduling.
> I'd propose to translate a state update to be correct before notifying it to 
> DefaultScheduler, namely do changes in SchedulerBase:
> {code:java}
>       @Override
>       public final boolean updateTaskExecutionState(final TaskExecutionState 
> taskExecutionState) {
>               final Optional<ExecutionVertexID> executionVertexId = 
> getExecutionVertexId(taskExecutionState.getID());
>               if (executionVertexId.isPresent()) {
>                       executionGraph.updateState(taskExecutionState);
>                       
> updateTaskExecutionStateInternal(executionVertexId.get(), new 
> TaskExecutionState(
>                               taskExecutionState.getJobID(),
>                               taskExecutionState.getID(),
>                               
> getExecutionVertex(executionVertexId.get()).getExecutionState(),
>                               taskExecutionState.getError(userCodeLoader)
>                       ));
>                       return true;
>               }
>               return false;
>       }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to