zhuzhurk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1185941867


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -473,26 +500,50 @@ public CompletableFuture<Acknowledge> cancel(Time 
timeout) {
     @Override
     public CompletableFuture<Acknowledge> updateTaskExecutionState(
             final TaskExecutionState taskExecutionState) {
-        FlinkException taskExecutionException;
+        checkNotNull(taskExecutionState, "taskExecutionState");
+        // Use the main/caller thread for all updates to make sure they are 
processed in order.
+        // (MainThreadExecutor i.e., the akka thread pool does not guarantee 
that)
+        // Only detach for a FAILED state update that is terminal and may 
perform io heavy labeling.
+        if 
(ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
+            return labelFailure(taskExecutionState)
+                    .thenApplyAsync(
+                            taskStateWithLabels -> {
+                                try {
+                                    return 
doUpdateTaskExecutionState(taskStateWithLabels);
+                                } catch (FlinkException e) {
+                                    throw new CompletionException(e);
+                                }
+                            },
+                            getMainThreadExecutor());
+        }
         try {
-            checkNotNull(taskExecutionState, "taskExecutionState");
+            return CompletableFuture.completedFuture(
+                    doUpdateTaskExecutionState(taskExecutionState));
+        } catch (FlinkException e) {
+            return FutureUtils.completedExceptionally(e);
+        }
+    }
 
+    private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState 
taskExecutionState)
+            throws FlinkException {
+        @Nullable FlinkException taskExecutionException;
+        try {
             if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {

Review Comment:
   How about to do failure enrichment in 
`SchedulerNG#updateTaskExecutionState(TaskExecutionStateTransition)`? 
   A common logic can be introduced to be reused by `SchedulerBase` and 
`AdaptiveScheduler`.
   
   I prefer to not spread the reference of JobMaster to avoid it being coupled 
with scheduler internals. InternalFailuresListener is also a workaround to 
support errors which are directly performed on `Execution`s, notifying them to 
schedulers. It should be removed in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to