pnowojski commented on a change in pull request #11347: 
[FLINK-14971][checkpointing] Make all the non-IO operations in 
CheckpointCoordinator single-threaded
URL: https://github.com/apache/flink/pull/11347#discussion_r390894422
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##########
 @@ -1018,61 +1019,64 @@ else if (checkpoint != null) {
         * <p>Important: This method should only be called in the checkpoint 
lock scope.
         *
         * @param pendingCheckpoint to complete
-        * @throws CheckpointException if the completion failed
         */
-       private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint) throws CheckpointException {
-               final long checkpointId = pendingCheckpoint.getCheckpointId();
-               final CompletedCheckpoint completedCheckpoint;
-
+       private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint) {
                // As a first step to complete the checkpoint, we register its 
state with the registry
                Map<OperatorID, OperatorState> operatorStates = 
pendingCheckpoint.getOperatorStates();
                sharedStateRegistry.registerAll(operatorStates.values());
 
-               try {
-                       try {
-                               completedCheckpoint = 
pendingCheckpoint.finalizeCheckpoint();
-                               
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
-                       }
-                       catch (Exception e1) {
-                               // abort the current pending checkpoint if we 
fails to finalize the pending checkpoint.
-                               if (!pendingCheckpoint.isDiscarded()) {
-                                       abortPendingCheckpoint(
-                                               pendingCheckpoint,
-                                               new CheckpointException(
-                                                       
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1));
+               final CompletableFuture<CompletedCheckpoint> 
completedCheckpointFuture = pendingCheckpoint.finalizeCheckpoint();
+               completedCheckpointFuture.thenApplyAsync((completedCheckpoint) 
-> {
+                       synchronized (lock) {
+                               if (shutdown) {
+                                       return null;
+                               }
+                               // the pending checkpoint must be discarded 
after the finalization
+                               
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint 
!= null);
+                               try {
+                                       
completedCheckpointStore.addCheckpoint(completedCheckpoint);
+                                       return completedCheckpoint;
+                               } catch (Throwable t) {
+                                       try {
+                                               
completedCheckpoint.discardOnFailedStoring();
+                                       } catch (Exception e) {
+                                               LOG.warn("Could not properly 
discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), e);
+                                       }
+                                       throw new CompletionException(t);
                                }
-
-                               throw new CheckpointException("Could not 
finalize the pending checkpoint " + checkpointId + '.',
-                                       
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
                        }
-
-                       // the pending checkpoint must be discarded after the 
finalization
-                       
Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint 
!= null);
-
-                       try {
-                               
completedCheckpointStore.addCheckpoint(completedCheckpoint);
-                       } catch (Exception exception) {
-                               // we failed to store the completed checkpoint. 
Let's clean up
-                               executor.execute(new Runnable() {
-                                       @Override
-                                       public void run() {
-                                               try {
-                                                       
completedCheckpoint.discardOnFailedStoring();
-                                               } catch (Throwable t) {
-                                                       LOG.warn("Could not 
properly discard completed checkpoint {}.", 
completedCheckpoint.getCheckpointID(), t);
-                                               }
+               }, executor)
+               .whenCompleteAsync((completedCheckpoint, throwable) -> {
+                       synchronized (lock) {
+                               if (shutdown) {
+                                       return;
+                               }
+                               if (throwable != null) {
+                                       if (!pendingCheckpoint.isDiscarded()) {
+                                               abortPendingCheckpoint(
+                                                       pendingCheckpoint,
+                                                       new CheckpointException(
+                                                               
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, throwable));
                                        }
-                               });
-
-                               throw new CheckpointException("Could not 
complete the pending checkpoint " + checkpointId + '.',
-                                       
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
+                               } else {
+                                       
onCheckpointSuccess(completedCheckpoint);
+                               }
                        }
-               } finally {
-                       pendingCheckpoints.remove(checkpointId);
+               }, timer);
+       }
 
 Review comment:
   I think we are missing some comments why is something being executed on the 
main thread or io thread executor. For example this chain in 
CheckpointCoordinator#completePendingCheckpoint:
   1. finalizeCheckpoint goes first on io executor
   2. then finalizeCheckpoint asynchronously goes back to the main thread
   3. then completePendingCheckpoint goes again to io executor
   4. and finally completePendingCheckpoint goes back to main thread executor 
asynchronously
   
   First thing is those missing comments, why something is executed in one 
executor not the other one. Secondly, does it have to be this back and forth? 
Could we somehow simplify the code with simpler control flow `main thread -> io 
thread -> main thread`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to