dmvk commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r805592620
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##########
@@ -306,22 +331,88 @@ void deliverOperatorEventToCoordinator(
operatorId, request);
}
+ /** Transition to different state when failure occurs. Stays in the same
state by default. */
+ abstract void onFailure(Throwable cause);
+
+ /**
+ * Transition to different state when the execution graph reaches a
globally terminal state.
+ *
+ * @param globallyTerminalState globally terminal state which the
execution graph reached
+ */
+ abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+
+ @Override
+ public void handleGlobalFailure(Throwable cause) {
+ failureCollection.add(new GlobalFailure(cause));
+ onFailure(cause);
+ }
+
/**
* Updates the execution graph with the given task execution state
transition.
*
* @param taskExecutionStateTransition taskExecutionStateTransition to
update the ExecutionGraph
* with
* @return {@code true} if the update was successful; otherwise {@code
false}
*/
- abstract boolean updateTaskExecutionState(
- TaskExecutionStateTransition taskExecutionStateTransition);
+ boolean updateTaskExecutionState(TaskExecutionStateTransition
taskExecutionStateTransition) {
+ if (taskExecutionStateTransition.getExecutionState() !=
ExecutionState.FAILED) {
+ return
getExecutionGraph().updateState(taskExecutionStateTransition);
+ }
- /**
- * Callback which is called once the execution graph reaches a globally
terminal state.
- *
- * @param globallyTerminalState globally terminal state which the
execution graph reached
- */
- abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+ // We need to collect the ExecutionVertexID before updating the state,
because the Execution
+ // is de-registered afterwards.
+ // We need to use an optional here, because this method can be called
even after the
+ // Execution is de-registered.
+ Optional<ExecutionVertexID> idOpt =
+
executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID());
+ final boolean successfulUpdate =
+ getExecutionGraph().updateState(taskExecutionStateTransition);
+ if (!successfulUpdate) {
+ return false;
+ }
+
+ Throwable cause =
extractErrorOrUseDefault(taskExecutionStateTransition);
+
+ checkState(idOpt.isPresent());
+ ExecutionVertexID id = idOpt.get();
+
+ if (getNonEmptyExecution(id).getFailureInfo().isPresent()) {
+ failureCollection.add(new LocalFailure(cause, id));
+ }
+ onFailure(cause);
+ return true;
Review comment:
It seems you've been faster to fix this :) My suggestion would have been
something along this lines:
```
boolean updateTaskExecutionState(TaskExecutionStateTransition
taskExecutionStateTransition) {
final Optional<ExecutionVertexID> maybeExecutionVertexId =
executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID());
final boolean successfulUpdate =
getExecutionGraph().updateState(taskExecutionStateTransition);
if (successfulUpdate) {
// We're sure that the executionVertexId has been found, because
we've been able to
// update the execution graph.
final ExecutionVertexID executionVertexId =
maybeExecutionVertexId.orElseThrow(NoSuchElementException::new);
final ExecutionState desiredState =
taskExecutionStateTransition.getExecutionState();
final ExecutionState newState =
executionGraph
.findExecution(executionVertexId)
.map(Execution::getState)
.orElseThrow(NoSuchElementException::new);
// We only want a notification for the actual transition into
the FAILED state.
if (desiredState == ExecutionState.FAILED && desiredState ==
newState) {
final Throwable cause =
extractErrorOrUseDefault(taskExecutionStateTransition);
if
(getNonEmptyExecution(executionVertexId).getFailureInfo().isPresent()) {
failureCollection.add(new LocalFailure(cause,
executionVertexId));
}
onFailure(cause);
return true;
}
}
return false;
}
```
Two notes:
- This de-duplicates the call to the state update
- More important, this also checks whether we've actually reached the
desired state, to align with the behavior of the DefaultScheduler (see
SchedulerBase#isNotifiable)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##########
@@ -306,22 +331,88 @@ void deliverOperatorEventToCoordinator(
operatorId, request);
}
+ /** Transition to different state when failure occurs. Stays in the same
state by default. */
+ abstract void onFailure(Throwable cause);
+
+ /**
+ * Transition to different state when the execution graph reaches a
globally terminal state.
+ *
+ * @param globallyTerminalState globally terminal state which the
execution graph reached
+ */
+ abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+
+ @Override
+ public void handleGlobalFailure(Throwable cause) {
+ failureCollection.add(new GlobalFailure(cause));
+ onFailure(cause);
+ }
+
/**
* Updates the execution graph with the given task execution state
transition.
*
* @param taskExecutionStateTransition taskExecutionStateTransition to
update the ExecutionGraph
* with
* @return {@code true} if the update was successful; otherwise {@code
false}
*/
- abstract boolean updateTaskExecutionState(
- TaskExecutionStateTransition taskExecutionStateTransition);
+ boolean updateTaskExecutionState(TaskExecutionStateTransition
taskExecutionStateTransition) {
+ if (taskExecutionStateTransition.getExecutionState() !=
ExecutionState.FAILED) {
+ return
getExecutionGraph().updateState(taskExecutionStateTransition);
+ }
- /**
- * Callback which is called once the execution graph reaches a globally
terminal state.
- *
- * @param globallyTerminalState globally terminal state which the
execution graph reached
- */
- abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+ // We need to collect the ExecutionVertexID before updating the state,
because the Execution
+ // is de-registered afterwards.
+ // We need to use an optional here, because this method can be called
even after the
+ // Execution is de-registered.
+ Optional<ExecutionVertexID> idOpt =
+
executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID());
+ final boolean successfulUpdate =
+ getExecutionGraph().updateState(taskExecutionStateTransition);
+ if (!successfulUpdate) {
+ return false;
+ }
+
+ Throwable cause =
extractErrorOrUseDefault(taskExecutionStateTransition);
+
+ checkState(idOpt.isPresent());
+ ExecutionVertexID id = idOpt.get();
+
+ if (getNonEmptyExecution(id).getFailureInfo().isPresent()) {
+ failureCollection.add(new LocalFailure(cause, id));
+ }
+ onFailure(cause);
+ return true;
Review comment:
It seems you've been faster to fix this :) My suggestion would have been
something along this lines:
```java
boolean updateTaskExecutionState(TaskExecutionStateTransition
taskExecutionStateTransition) {
final Optional<ExecutionVertexID> maybeExecutionVertexId =
executionGraph.findExecutionVertexId(taskExecutionStateTransition.getID());
final boolean successfulUpdate =
getExecutionGraph().updateState(taskExecutionStateTransition);
if (successfulUpdate) {
// We're sure that the executionVertexId has been found, because
we've been able to
// update the execution graph.
final ExecutionVertexID executionVertexId =
maybeExecutionVertexId.orElseThrow(NoSuchElementException::new);
final ExecutionState desiredState =
taskExecutionStateTransition.getExecutionState();
final ExecutionState newState =
executionGraph
.findExecution(executionVertexId)
.map(Execution::getState)
.orElseThrow(NoSuchElementException::new);
// We only want a notification for the actual transition into
the FAILED state.
if (desiredState == ExecutionState.FAILED && desiredState ==
newState) {
final Throwable cause =
extractErrorOrUseDefault(taskExecutionStateTransition);
if
(getNonEmptyExecution(executionVertexId).getFailureInfo().isPresent()) {
failureCollection.add(new LocalFailure(cause,
executionVertexId));
}
onFailure(cause);
return true;
}
}
return false;
}
```
Two notes:
- This de-duplicates the call to the state update
- More important, this also checks whether we've actually reached the
desired state, to align with the behavior of the DefaultScheduler (see
SchedulerBase#isNotifiable)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]