StephanEwen commented on pull request #16432:
URL: https://github.com/apache/flink/pull/16432#issuecomment-877346128


   To re-summarize the failure cause:
   
   A failed RPC (failed result future) leads to the failure of the checkpoint 
and triggers a task failure. Then the tracking structures are reset, assuming 
that the failure is taken care of.
   However, because the task failure is processed asynchronously (enqueued in 
the scheduler mailbox), it is possible that some successive checkpoints 
complete before the task failure is handled. And that violates the assumption 
that once the RPC loss is detected, no further checkpoints may complete.
   
   I don't fully understand how this PR fixes that. It looks like it tries to 
change a bit where failure notifications are set, and that in 
`completeCheckpointOnceEventsAreDone()` we don't clear the set of futures we 
(so that the failed future remains in there, blocking further checkpoints) and 
we rely on cleaning it in a different place. Especially this part looks suspect 
to me: Registering the future in the tracker asynchronously might mean it isn't 
even tracked when we make the decision whether the checkpoint can be confirmed:
   
   ```java
   sendingExecutor.execute(
                   () -> {
                       nonSuccessFuturesTrack.trackFuture(result);
                       sender.sendEvent(sendAction, result);
                   });
   ```
   
   ### Alternative Solution
   
   I think what @tillrohrmann  suggested is the right direction: We really need 
to guarantee is that by the time the event-sending future that we track is 
failed (due to RPC loss), we have already marked the job as failed, so no other 
checkpoint can be triggered. So my proposal is to re-write the following 
section of the code:
   
   *Existing Code*
   
   Note how the `handleAsync` on the result future means that triggering the 
subtask failure happens only after the future is already complete and has 
unlocked the checkpoining.
   
   ```java
   final Callable<CompletableFuture<Acknowledge>> sendAction =
           subtaskAccess.createEventSendAction(serializedEvent);
   
   final CompletableFuture<Acknowledge> result = new CompletableFuture<>();
   FutureUtils.assertNoException(
           result.handleAsync(
                   (success, failure) -> {
                       if (failure != null && subtaskAccess.isStillRunning()) {
                           String msg =
                                   String.format(
                                           EVENT_LOSS_ERROR_MESSAGE, evt, 
subtaskAccess.subtaskName());
                           subtaskAccess.triggerTaskFailover(new 
FlinkException(msg, failure));
                       }
                       return null;
                   },
                   sendingExecutor));
   
   sendingExecutor.execute(() -> sender.sendEvent(sendAction, result));
   return result;
   ```
   
   *Changed Code*
   
   Here, the future that is in the tracker is only complete once the subtask is 
marked as failed.
   
   ```java
   final CompletableFuture<Acknowledge> sendResult = new CompletableFuture<>();
   sendingExecutor.execute(() -> sender.sendEvent(sendAction, sendResult));
   
   final CompletableFuture<Acknowledge> result =
           sendResult.handleAsync(
                   (success, failure) -> {
                       if (failure != null && subtaskAccess.isStillRunning()) {
                           String msg =
                                   String.format(
                                          EVENT_LOSS_ERROR_MESSAGE, evt, 
subtaskAccess.subtaskName());
                           subtaskAccess.triggerTaskFailover(new 
FlinkException(msg, failure));
                       }
                       return success;
                   },
                  sendingExecutor);
   
   incompleteFuturesTracker.trackFutureWhileIncomplete(result);
   return result;
   ```
   
   We do need to pass the incomplete futures tracker into the 
`SubtaskGatewayImpl`. But I think that is the only other change we need.
   I would suggest to undo in particular the language changes, because they are 
not correct.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to