StephanEwen commented on a change in pull request #12234:
URL: https://github.com/apache/flink/pull/12234#discussion_r430993238
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
##########
@@ -58,7 +58,17 @@
final Collection<CompletableFuture<CoordinatorSnapshot>>
individualSnapshots = new ArrayList<>(coordinators.size());
for (final OperatorCoordinatorCheckpointContext coordinator :
coordinators) {
-
individualSnapshots.add(triggerCoordinatorCheckpoint(coordinator,
checkpointId));
+ final CompletableFuture<CoordinatorSnapshot>
checkpointFuture = triggerCoordinatorCheckpoint(coordinator, checkpointId);
+ coordinator.onCallTriggerCheckpoint(checkpointId);
+
+ individualSnapshots.add(checkpointFuture);
+ checkpointFuture.whenComplete((ignored, failure) -> {
+ if (failure != null) {
+ coordinator.abortCurrentTriggering();
+ } else {
+
coordinator.onCheckpointStateFutureComplete(checkpointId);
Review comment:
This sounded like a great catch to me initially. I have not tried to put
this into a test to guard/fix this specific situation, but I found that I have
a hard time to even come up with an implementation where this could even
potentially happen.
Do you happen to have a code snippet that would produce the situation you
where thinking of here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]