[
https://issues.apache.org/jira/browse/FLINK-14375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gary Yao reassigned FLINK-14375:
--------------------------------
Assignee: Zhu Zhu
> Avoid to trigger failover on a non-effective task failure notification
> ----------------------------------------------------------------------
>
> Key: FLINK-14375
> URL: https://issues.apache.org/jira/browse/FLINK-14375
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Affects Versions: 1.10.0
> Reporter: Zhu Zhu
> Assignee: Zhu Zhu
> Priority: Major
> Fix For: 1.10.0
>
>
> The DefaultScheduler triggers failover if a task is notified to be FAILED.
> However, in the case the multiple tasks in the same region fail together, it
> will trigger multiple failovers. The later triggered failovers are useless,
> lead to concurrent failovers and will increase the restart attempts count.
> I think the deep reason for this issue is that some fake state changes are
> notified to the DefaultScheduler.
> The case above is a FAILED state change from TM will turn a CANCELING vertex
> to CANCELED, and the actual state transition is to CANCELED. But a FAILED
> state is notified to DefaultScheduler.
> And there can be another possible issue caused by it, that a FINISHED state
> change is notified from TM when a vertex is CANCELING. The vertex will become
> CANCELED, while its FINISHED state change will be notified to
> DefaultScheduler which may trigger downstream task scheduling.
> I'd propose to translate a state update to be correct before notifying it to
> DefaultScheduler, namely do changes in SchedulerBase:
> {code:java}
> @Override
> public final boolean updateTaskExecutionState(final TaskExecutionState
> taskExecutionState) {
> final Optional<ExecutionVertexID> executionVertexId =
> getExecutionVertexId(taskExecutionState.getID());
> if (executionVertexId.isPresent()) {
> executionGraph.updateState(taskExecutionState);
>
> updateTaskExecutionStateInternal(executionVertexId.get(), new
> TaskExecutionState(
> taskExecutionState.getJobID(),
> taskExecutionState.getID(),
>
> getExecutionVertex(executionVertexId.get()).getExecutionState(),
> taskExecutionState.getError(userCodeLoader)
> ));
> return true;
> }
> return false;
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)