This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new f28db07 [FLINK-21735][coordination] Harden
JobMaster#updateTaskExecutionState()
f28db07 is described below
commit f28db07305a6574b32d95d6563102e5cba6279fa
Author: SteNicholas <[email protected]>
AuthorDate: Sun Mar 14 16:30:53 2021 +0800
[FLINK-21735][coordination] Harden JobMaster#updateTaskExecutionState()
This closes #15196.
---
.../apache/flink/runtime/jobmaster/JobMaster.java | 27 ++++++++++++++--------
1 file changed, 18 insertions(+), 9 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1c0f1b9..16b1856 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -392,17 +392,26 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId>
@Override
public CompletableFuture<Acknowledge> updateTaskExecutionState(
final TaskExecutionState taskExecutionState) {
- checkNotNull(taskExecutionState, "taskExecutionState");
+ FlinkException taskExecutionException;
+ try {
+ checkNotNull(taskExecutionState, "taskExecutionState");
- if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
- return CompletableFuture.completedFuture(Acknowledge.get());
- } else {
- return FutureUtils.completedExceptionally(
- new ExecutionGraphException(
- "The execution attempt "
- + taskExecutionState.getID()
- + " was not found."));
+ if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ } else {
+ taskExecutionException =
+ new ExecutionGraphException(
+ "The execution attempt "
+ + taskExecutionState.getID()
+ + " was not found.");
+ }
+ } catch (Exception e) {
+ taskExecutionException =
+ new JobMasterException(
+ "Could not update the state of task execution for
JobMaster.", e);
+ handleJobMasterError(taskExecutionException);
}
+ return FutureUtils.completedExceptionally(taskExecutionException);
}
@Override