zhuzhurk commented on code in PR #20222:
URL: https://github.com/apache/flink/pull/20222#discussion_r919710604


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##########
@@ -220,46 +217,57 @@ protected void startSchedulingInternal() {
     }
 
     @Override
-    protected void updateTaskExecutionStateInternal(
-            final ExecutionVertexID executionVertexId,
-            final TaskExecutionStateTransition taskExecutionState) {
+    protected void onTaskExecutionStateUpdate(final Execution execution) {
+        switch (execution.getState()) {
+            case FINISHED:
+                onTaskFinished(execution);
+                break;
+            case FAILED:
+                onTaskFailed(execution);
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        String.format(
+                                "State %s should not be notified to 
DefaultScheduler.",
+                                execution.getState()));
+        }
+    }
 
+    protected void onTaskFinished(final Execution execution) {
+        checkState(execution.getState() == ExecutionState.FINISHED);
+
+        final ExecutionVertexID executionVertexId = 
execution.getVertex().getID();
         // once a task finishes, it will release the assigned allocation/slot 
and no longer
         // needs it. Therefore, it should stop reserving the slot so that 
other tasks are
         // possible to use the slot. Ideally, the `stopReserveAllocation` 
should happen
         // along with the release slot process. However, that process is 
hidden in the depth
         // of the ExecutionGraph, so we currently do it in DefaultScheduler 
after that process
         // is done.
-        if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) 
{
-            stopReserveAllocation(executionVertexId);
-        }
+        stopReserveAllocation(executionVertexId);
 
-        schedulingStrategy.onExecutionStateChange(
-                executionVertexId, taskExecutionState.getExecutionState());
-        maybeHandleTaskFailure(taskExecutionState, 
getCurrentExecutionOfVertex(executionVertexId));
+        schedulingStrategy.onExecutionStateChange(executionVertexId, 
ExecutionState.FINISHED);

Review Comment:
   This is a legacy issue and I would avoid to do it in this PR. We actually 
hope it to handle all states but there is a blocker at the moment. 
   More details can be found at FLINK-14233.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to