zentol commented on a change in pull request #18689:
URL: https://github.com/apache/flink/pull/18689#discussion_r808533949



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java
##########
@@ -306,22 +327,87 @@ void deliverOperatorEventToCoordinator(
                 operatorId, request);
     }
 
+    /** Transition to different state when failure occurs. Stays in the same 
state by default. */
+    abstract void onFailure(Throwable cause);
+
+    <T extends StateTransitions.ToRestarting & StateTransitions.ToFailing> 
void restartOrFail(
+            FailureResult failureResult, T context) {
+        if (failureResult.canRestart()) {
+            getLogger().info("Restarting job.", 
failureResult.getFailureCause());
+            context.goToRestarting(
+                    getExecutionGraph(),
+                    getExecutionGraphHandler(),
+                    getOperatorCoordinatorHandler(),
+                    failureResult.getBackoffTime(),
+                    getFailures());
+        } else {
+            getLogger().info("Failing job.", failureResult.getFailureCause());
+            context.goToFailing(
+                    getExecutionGraph(),
+                    getExecutionGraphHandler(),
+                    getOperatorCoordinatorHandler(),
+                    failureResult.getFailureCause(),
+                    getFailures());
+        }
+    }
+
+    /**
+     * Transition to different state when the execution graph reaches a 
globally terminal state.
+     *
+     * @param globallyTerminalState globally terminal state which the 
execution graph reached
+     */
+    abstract void onGloballyTerminalState(JobStatus globallyTerminalState);
+
+    @Override
+    public void handleGlobalFailure(Throwable cause) {
+        failureCollection.add(ExceptionHistoryEntry.createGlobal(cause));
+        onFailure(cause);
+    }
+
     /**
      * Updates the execution graph with the given task execution state 
transition.
      *
      * @param taskExecutionStateTransition taskExecutionStateTransition to 
update the ExecutionGraph
      *     with
      * @return {@code true} if the update was successful; otherwise {@code 
false}
      */
-    abstract boolean updateTaskExecutionState(
-            TaskExecutionStateTransition taskExecutionStateTransition);
+    boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionStateTransition) {
+        // collect before updateState, as updateState may deregister the 
execution
+        final Optional<AccessExecution> maybeExecution =
+                
executionGraph.findExecution(taskExecutionStateTransition.getID());
+        final Optional<String> maybeTaskName =
+                
executionGraph.findVertexWithAttempt(taskExecutionStateTransition.getID());
+
+        final ExecutionState desiredState = 
taskExecutionStateTransition.getExecutionState();
+        boolean successfulUpdate = 
getExecutionGraph().updateState(taskExecutionStateTransition);
+        if (successfulUpdate && desiredState == ExecutionState.FAILED) {
+            final AccessExecution execution =
+                    maybeExecution.orElseThrow(NoSuchElementException::new);
+            final String taskName = 
maybeTaskName.orElseThrow(NoSuchElementException::new);
+            final ExecutionState currentState = execution.getState();
+            if (currentState == desiredState) {
+                failureCollection.add(ExceptionHistoryEntry.create(execution, 
taskName));

Review comment:
       I do wonder whether this collection should be bounded. The exception 
history as a whole is, and this here not having a bound seems counter to that 
idea.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to