rmetzger commented on a change in pull request #14948:
URL: https://github.com/apache/flink/pull/14948#discussion_r588323345



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -430,14 +430,30 @@ protected void onStart() throws JobMasterException {
             final TaskExecutionState taskExecutionState) {
         checkNotNull(taskExecutionState, "taskExecutionState");
 
-        if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {
-            return CompletableFuture.completedFuture(Acknowledge.get());
-        } else {
-            return FutureUtils.completedExceptionally(
-                    new ExecutionGraphException(
-                            "The execution attempt "
-                                    + taskExecutionState.getID()
-                                    + " was not found."));
+        try {
+            if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {

Review comment:
       While working on the change, I accidentally tried starting the 
checkpoint scheduler on a job with no checkpointing enabled in 
`StopWithSavepoint.updateTaskExecutionState()` (which is not the right place to 
do this). The condition for "no checkpointing enabled" is that the 
checkpointing interval is set to Long.MAX_VALUE.
   If you try to start the checkpoint scheduler with that value, you end up 
with an integer overflow here: 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1792
   
   The exception is not caught anywhere, so the RPC system sends it back to the 
caller. However, in my case, the caller was notifying the JobMaster about a 
failure. The caller (TaskExecutor.updateTaskExecutionState()) is catching the 
exception coming back from the RPC call, but ignores it if the task is not 
anymore in the taskSlotTable. I assume that it is not in the `taskSlotTable` 
anymore because the task failed.
   
   Most likely we don't have this case anywhere in our production code, but I 
spend quite some time figuring out this issue, because the exception from the 
CheckpointCoordinator doesn't get logged anywhere and is completely swallowed.
   I'm not sure if it is necessary to have such defenses for these rare cases 
in the code, but since I experimented with this fix I left it in with a comment.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to