ifndef-SleePy commented on a change in pull request #11347:
[FLINK-14971][checkpointing] Make all the non-IO operations in
CheckpointCoordinator single-threaded
URL: https://github.com/apache/flink/pull/11347#discussion_r391383507
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1018,61 +1019,64 @@ else if (checkpoint != null) {
* <p>Important: This method should only be called in the checkpoint
lock scope.
*
* @param pendingCheckpoint to complete
- * @throws CheckpointException if the completion failed
*/
- private void completePendingCheckpoint(PendingCheckpoint
pendingCheckpoint) throws CheckpointException {
- final long checkpointId = pendingCheckpoint.getCheckpointId();
- final CompletedCheckpoint completedCheckpoint;
-
+ private void completePendingCheckpoint(PendingCheckpoint
pendingCheckpoint) {
// As a first step to complete the checkpoint, we register its
state with the registry
Map<OperatorID, OperatorState> operatorStates =
pendingCheckpoint.getOperatorStates();
sharedStateRegistry.registerAll(operatorStates.values());
- try {
- try {
- completedCheckpoint =
pendingCheckpoint.finalizeCheckpoint();
-
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
- }
- catch (Exception e1) {
- // abort the current pending checkpoint if we
fails to finalize the pending checkpoint.
- if (!pendingCheckpoint.isDiscarded()) {
- abortPendingCheckpoint(
- pendingCheckpoint,
- new CheckpointException(
-
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1));
+ final CompletableFuture<CompletedCheckpoint>
completedCheckpointFuture = pendingCheckpoint.finalizeCheckpoint();
+ completedCheckpointFuture.thenApplyAsync((completedCheckpoint)
-> {
+ synchronized (lock) {
+ if (shutdown) {
+ return null;
+ }
+ // the pending checkpoint must be discarded
after the finalization
+
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint
!= null);
+ try {
+
completedCheckpointStore.addCheckpoint(completedCheckpoint);
+ return completedCheckpoint;
+ } catch (Throwable t) {
+ try {
+
completedCheckpoint.discardOnFailedStoring();
+ } catch (Exception e) {
+ LOG.warn("Could not properly
discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), e);
+ }
+ throw new CompletionException(t);
}
-
- throw new CheckpointException("Could not
finalize the pending checkpoint " + checkpointId + '.',
-
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
}
-
- // the pending checkpoint must be discarded after the
finalization
-
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint
!= null);
-
- try {
-
completedCheckpointStore.addCheckpoint(completedCheckpoint);
- } catch (Exception exception) {
- // we failed to store the completed checkpoint.
Let's clean up
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
-
completedCheckpoint.discardOnFailedStoring();
- } catch (Throwable t) {
- LOG.warn("Could not
properly discard completed checkpoint {}.",
completedCheckpoint.getCheckpointID(), t);
- }
+ }, executor)
+ .whenCompleteAsync((completedCheckpoint, throwable) -> {
+ synchronized (lock) {
+ if (shutdown) {
+ return;
+ }
+ if (throwable != null) {
+ if (!pendingCheckpoint.isDiscarded()) {
+ abortPendingCheckpoint(
+ pendingCheckpoint,
+ new CheckpointException(
+
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, throwable));
}
- });
-
- throw new CheckpointException("Could not
complete the pending checkpoint " + checkpointId + '.',
-
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
+ } else {
+
onCheckpointSuccess(completedCheckpoint);
+ }
}
- } finally {
- pendingCheckpoints.remove(checkpointId);
+ }, timer);
+ }
Review comment:
It's indeed a bit complicated. If we combine the meta data persistence and
completed checkpoint storing in one operation (in IO thread), the logic here
would be much easier to understand.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services