This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new f28db07  [FLINK-21735][coordination] Harden 
JobMaster#updateTaskExecutionState()
f28db07 is described below

commit f28db07305a6574b32d95d6563102e5cba6279fa
Author: SteNicholas <[email protected]>
AuthorDate: Sun Mar 14 16:30:53 2021 +0800

    [FLINK-21735][coordination] Harden JobMaster#updateTaskExecutionState()
    
    This closes #15196.
---
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 27 ++++++++++++++--------
 1 file changed, 18 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1c0f1b9..16b1856 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -392,17 +392,26 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
     @Override
     public CompletableFuture<Acknowledge> updateTaskExecutionState(
             final TaskExecutionState taskExecutionState) {
-        checkNotNull(taskExecutionState, "taskExecutionState");
+        FlinkException taskExecutionException;
+        try {
+            checkNotNull(taskExecutionState, "taskExecutionState");
 
-        if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
-            return CompletableFuture.completedFuture(Acknowledge.get());
-        } else {
-            return FutureUtils.completedExceptionally(
-                    new ExecutionGraphException(
-                            "The execution attempt "
-                                    + taskExecutionState.getID()
-                                    + " was not found."));
+            if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
+                return CompletableFuture.completedFuture(Acknowledge.get());
+            } else {
+                taskExecutionException =
+                        new ExecutionGraphException(
+                                "The execution attempt "
+                                        + taskExecutionState.getID()
+                                        + " was not found.");
+            }
+        } catch (Exception e) {
+            taskExecutionException =
+                    new JobMasterException(
+                            "Could not update the state of task execution for 
JobMaster.", e);
+            handleJobMasterError(taskExecutionException);
         }
+        return FutureUtils.completedExceptionally(taskExecutionException);
     }
 
     @Override

Reply via email to