pgaref commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1185672001


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -473,26 +500,50 @@ public CompletableFuture<Acknowledge> cancel(Time 
timeout) {
     @Override
     public CompletableFuture<Acknowledge> updateTaskExecutionState(
             final TaskExecutionState taskExecutionState) {
-        FlinkException taskExecutionException;
+        checkNotNull(taskExecutionState, "taskExecutionState");
+        // Use the main/caller thread for all updates to make sure they are 
processed in order.
+        // (MainThreadExecutor i.e., the akka thread pool does not guarantee 
that)
+        // Only detach for a FAILED state update that is terminal and may 
perform io heavy labeling.
+        if 
(ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
+            return labelFailure(taskExecutionState)
+                    .thenApplyAsync(
+                            taskStateWithLabels -> {
+                                try {
+                                    return 
doUpdateTaskExecutionState(taskStateWithLabels);
+                                } catch (FlinkException e) {
+                                    throw new CompletionException(e);
+                                }
+                            },
+                            getMainThreadExecutor());
+        }
         try {
-            checkNotNull(taskExecutionState, "taskExecutionState");
+            return CompletableFuture.completedFuture(
+                    doUpdateTaskExecutionState(taskExecutionState));
+        } catch (FlinkException e) {
+            return FutureUtils.completedExceptionally(e);
+        }
+    }
 
+    private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState 
taskExecutionState)
+            throws FlinkException {
+        @Nullable FlinkException taskExecutionException;
+        try {
             if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {

Review Comment:
   Btw, InternalFailuresListener#notifyTaskFailure is already covered as part 
of FLINK-31891 -- when a TM disconnection happens we need to Release Payload 
Slot and since the error is not fromSchedulerNg we use the 
internalTaskFailuresListener: 
https://github.com/apache/flink/pull/22511/files#diff-d535f910a10f835962b0637e12014068a9727b2152a84223fd9b1bf9c6c074d6R1665
   
   This is not however covering the `onMissingDeploymentsOf` case David 
mentioned above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to