StephanEwen commented on pull request #16432:
URL: https://github.com/apache/flink/pull/16432#issuecomment-877346128
To re-summarize the failure cause:
A failed RPC (failed result future) leads to the failure of the checkpoint
and triggers a task failure. Then the tracking structures are reset, assuming
that the failure is taken care of.
However, because the task failure is processed asynchronously (enqueued in
the scheduler mailbox), it is possible that some successive checkpoints
complete before the task failure is handled. And that violates the assumption
that once the RPC loss is detected, no further checkpoints may complete.
I don't fully understand how this PR fixes that. It looks like it tries to
change a bit where failure notifications are set, and that in
`completeCheckpointOnceEventsAreDone()` we don't clear the set of futures we
(so that the failed future remains in there, blocking further checkpoints) and
we rely on cleaning it in a different place. Especially this part looks suspect
to me: Registering the future in the tracker asynchronously might mean it isn't
even tracked when we make the decision whether the checkpoint can be confirmed:
```java
sendingExecutor.execute(
() -> {
nonSuccessFuturesTrack.trackFuture(result);
sender.sendEvent(sendAction, result);
});
```
### Alternative Solution
I think what @tillrohrmann suggested is the right direction: We really need
to guarantee is that by the time the event-sending future that we track is
failed (due to RPC loss), we have already marked the job as failed, so no other
checkpoint can be triggered. So my proposal is to re-write the following
section of the code:
*Existing Code*
Note how the `handleAsync` on the result future means that triggering the
subtask failure happens only after the future is already complete and has
unlocked the checkpoining.
```java
final Callable<CompletableFuture<Acknowledge>> sendAction =
subtaskAccess.createEventSendAction(serializedEvent);
final CompletableFuture<Acknowledge> result = new CompletableFuture<>();
FutureUtils.assertNoException(
result.handleAsync(
(success, failure) -> {
if (failure != null && subtaskAccess.isStillRunning()) {
String msg =
String.format(
EVENT_LOSS_ERROR_MESSAGE, evt,
subtaskAccess.subtaskName());
subtaskAccess.triggerTaskFailover(new
FlinkException(msg, failure));
}
return null;
},
sendingExecutor));
sendingExecutor.execute(() -> sender.sendEvent(sendAction, result));
return result;
```
*Changed Code*
Here, the future that is in the tracker is only complete once the subtask is
marked as failed.
```java
final CompletableFuture<Acknowledge> sendResult = new CompletableFuture<>();
sendingExecutor.execute(() -> sender.sendEvent(sendAction, sendResult));
final CompletableFuture<Acknowledge> result =
sendResult.handleAsync(
(success, failure) -> {
if (failure != null && subtaskAccess.isStillRunning()) {
String msg =
String.format(
EVENT_LOSS_ERROR_MESSAGE, evt,
subtaskAccess.subtaskName());
subtaskAccess.triggerTaskFailover(new
FlinkException(msg, failure));
}
return success;
},
sendingExecutor);
incompleteFuturesTracker.trackFutureWhileIncomplete(result);
return result;
```
We do need to pass the incomplete futures tracker into the
`SubtaskGatewayImpl`. But I think that is the only other change we need.
I would suggest to undo in particular the language changes, because they are
not correct.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]