pnowojski commented on a change in pull request #11347:
[FLINK-14971][checkpointing] Make all the non-IO operations in
CheckpointCoordinator single-threaded
URL: https://github.com/apache/flink/pull/11347#discussion_r390894422
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1018,61 +1019,64 @@ else if (checkpoint != null) {
* <p>Important: This method should only be called in the checkpoint
lock scope.
*
* @param pendingCheckpoint to complete
- * @throws CheckpointException if the completion failed
*/
- private void completePendingCheckpoint(PendingCheckpoint
pendingCheckpoint) throws CheckpointException {
- final long checkpointId = pendingCheckpoint.getCheckpointId();
- final CompletedCheckpoint completedCheckpoint;
-
+ private void completePendingCheckpoint(PendingCheckpoint
pendingCheckpoint) {
// As a first step to complete the checkpoint, we register its
state with the registry
Map<OperatorID, OperatorState> operatorStates =
pendingCheckpoint.getOperatorStates();
sharedStateRegistry.registerAll(operatorStates.values());
- try {
- try {
- completedCheckpoint =
pendingCheckpoint.finalizeCheckpoint();
-
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
- }
- catch (Exception e1) {
- // abort the current pending checkpoint if we
fails to finalize the pending checkpoint.
- if (!pendingCheckpoint.isDiscarded()) {
- abortPendingCheckpoint(
- pendingCheckpoint,
- new CheckpointException(
-
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1));
+ final CompletableFuture<CompletedCheckpoint>
completedCheckpointFuture = pendingCheckpoint.finalizeCheckpoint();
+ completedCheckpointFuture.thenApplyAsync((completedCheckpoint)
-> {
+ synchronized (lock) {
+ if (shutdown) {
+ return null;
+ }
+ // the pending checkpoint must be discarded
after the finalization
+
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint
!= null);
+ try {
+
completedCheckpointStore.addCheckpoint(completedCheckpoint);
+ return completedCheckpoint;
+ } catch (Throwable t) {
+ try {
+
completedCheckpoint.discardOnFailedStoring();
+ } catch (Exception e) {
+ LOG.warn("Could not properly
discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), e);
+ }
+ throw new CompletionException(t);
}
-
- throw new CheckpointException("Could not
finalize the pending checkpoint " + checkpointId + '.',
-
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
}
-
- // the pending checkpoint must be discarded after the
finalization
-
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint
!= null);
-
- try {
-
completedCheckpointStore.addCheckpoint(completedCheckpoint);
- } catch (Exception exception) {
- // we failed to store the completed checkpoint.
Let's clean up
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
-
completedCheckpoint.discardOnFailedStoring();
- } catch (Throwable t) {
- LOG.warn("Could not
properly discard completed checkpoint {}.",
completedCheckpoint.getCheckpointID(), t);
- }
+ }, executor)
+ .whenCompleteAsync((completedCheckpoint, throwable) -> {
+ synchronized (lock) {
+ if (shutdown) {
+ return;
+ }
+ if (throwable != null) {
+ if (!pendingCheckpoint.isDiscarded()) {
+ abortPendingCheckpoint(
+ pendingCheckpoint,
+ new CheckpointException(
+
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, throwable));
}
- });
-
- throw new CheckpointException("Could not
complete the pending checkpoint " + checkpointId + '.',
-
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
+ } else {
+
onCheckpointSuccess(completedCheckpoint);
+ }
}
- } finally {
- pendingCheckpoints.remove(checkpointId);
+ }, timer);
+ }
Review comment:
I think we are missing some comments why is something being executed on the
main thread or io thread executor. For example this chain in
CheckpointCoordinator#completePendingCheckpoint:
1. finalizeCheckpoint goes first on io executor
2. then finalizeCheckpoint asynchronously goes back to the main thread
3. then completePendingCheckpoint goes again to io executor
4. and finally completePendingCheckpoint goes back to main thread executor
asynchronously
First thing is those missing comments, why something is executed in one
executor not the other one. Secondly, does it have to be this back and forth?
Could we somehow simplify the code with simpler control flow `main thread -> io
thread -> main thread`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services