ifndef-SleePy commented on a change in pull request #10332: 
[FLINK-13905][checkpointing] Separate checkpoint triggering into several 
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r367778946
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##########
 @@ -576,85 +648,181 @@ public boolean isShutdown() {
                        checkpoint.setStatsCallback(callback);
                }
 
-               // schedule the timer that will clean up the expired checkpoints
-               final Runnable canceller = () -> {
-                       synchronized (lock) {
-                               // only do the work if the checkpoint is not 
discarded anyways
-                               // note that checkpoint completion discards the 
pending checkpoint object
-                               if (!checkpoint.isDiscarded()) {
-                                       LOG.info("Checkpoint {} of job {} 
expired before completing.", checkpointID, job);
-
-                                       abortPendingCheckpoint(
-                                               checkpoint,
-                                               new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
-                               }
-                       }
-               };
+               synchronized (lock) {
 
-               try {
-                       // re-acquire the coordinator-wide lock
-                       synchronized (lock) {
-                               preCheckBeforeTriggeringCheckpoint(isPeriodic, 
props.forceCheckpoint());
+                       pendingCheckpoints.put(checkpointID, checkpoint);
 
-                               LOG.info("Triggering checkpoint {} @ {} for job 
{}.", checkpointID, timestamp, job);
+                       ScheduledFuture<?> cancellerHandle = timer.schedule(
+                               new CheckpointCanceller(checkpoint),
+                               checkpointTimeout, TimeUnit.MILLISECONDS);
 
-                               pendingCheckpoints.put(checkpointID, 
checkpoint);
+                       if (!checkpoint.setCancellerHandle(cancellerHandle)) {
+                               // checkpoint is already disposed!
+                               cancellerHandle.cancel(false);
+                       }
+               }
 
-                               ScheduledFuture<?> cancellerHandle = 
timer.schedule(
-                                               canceller,
-                                               checkpointTimeout, 
TimeUnit.MILLISECONDS);
+               LOG.info("Triggering checkpoint {} @ {} for job {}.", 
checkpointID, timestamp, job);
+               return checkpoint;
+       }
 
-                               if 
(!checkpoint.setCancellerHandle(cancellerHandle)) {
-                                       // checkpoint is already disposed!
-                                       cancellerHandle.cancel(false);
-                               }
+       /**
+        * Snapshot master hook states asynchronously.
+        *
+        * @param checkpoint the pending checkpoint
+        * @return the future represents master hook states are finished or not
+        */
+       private CompletableFuture<Void> snapshotMasterState(PendingCheckpoint 
checkpoint) {
+               if (masterHooks.isEmpty()) {
+                       return CompletableFuture.completedFuture(null);
+               }
 
-                               // TODO, asynchronously snapshots master hook 
without waiting here
-                               for (MasterTriggerRestoreHook<?> masterHook : 
masterHooks.values()) {
-                                       final MasterState masterState =
-                                               
MasterHooks.triggerHook(masterHook, checkpointID, timestamp, executor)
-                                                       .get(checkpointTimeout, 
TimeUnit.MILLISECONDS);
-                                       
checkpoint.acknowledgeMasterState(masterHook.getIdentifier(), masterState);
-                               }
-                               
Preconditions.checkState(checkpoint.areMasterStatesFullyAcknowledged());
-                       }
-                       // end of lock scope
+               final long checkpointID = checkpoint.getCheckpointId();
+               final long timestamp = checkpoint.getCheckpointTimestamp();
 
-                       final CheckpointOptions checkpointOptions = new 
CheckpointOptions(
-                                       props.getCheckpointType(),
-                                       
checkpointStorageLocation.getLocationReference());
+               final CompletableFuture<Void> masterStateCompletableFuture = 
new CompletableFuture<>();
+               for (MasterTriggerRestoreHook<?> masterHook : 
masterHooks.values()) {
+                       MasterHooks
+                               .triggerHook(masterHook, checkpointID, 
timestamp, executor)
+                               .whenCompleteAsync((masterState, throwable) -> {
+                                       try {
+                                               synchronized (lock) {
+                                                       if 
(masterStateCompletableFuture.isDone()) {
+                                                               return;
+                                                       }
+                                                       if 
(checkpoint.isDiscarded()) {
+                                                               throw new 
IllegalStateException(
+                                                                       
"Checkpoint " + checkpointID + " has been discarded");
+                                                       }
+                                                       if (throwable == null) {
+                                                               
checkpoint.acknowledgeMasterState(
+                                                                       
masterHook.getIdentifier(), masterState);
+                                                               if 
(checkpoint.areMasterStatesFullyAcknowledged()) {
+                                                                       
masterStateCompletableFuture.complete(null);
+                                                               }
+                                                       } else {
+                                                               
masterStateCompletableFuture.completeExceptionally(throwable);
+                                                       }
+                                               }
+                                       } catch (Throwable t) {
+                                               
masterStateCompletableFuture.completeExceptionally(t);
+                                       }
+                               }, timer);
+               }
+               return masterStateCompletableFuture;
+       }
 
-                       // send the messages to the tasks that trigger their 
checkpoint
-                       for (Execution execution: executions) {
-                               if (props.isSynchronous()) {
-                                       
execution.triggerSynchronousSavepoint(checkpointID, timestamp, 
checkpointOptions, advanceToEndOfTime);
-                               } else {
-                                       
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
-                               }
+       /**
+        * Snapshot task state.
+        *
+        * @param timestamp the timestamp of this checkpoint reques
+        * @param checkpointID the checkpoint id
+        * @param checkpointStorageLocation the checkpoint location
+        * @param props the checkpoint properties
+        * @param executions the executions which should be triggered
+        * @param advanceToEndOfTime Flag indicating if the source should 
inject a {@code MAX_WATERMARK}
+        *                               in the pipeline to fire any registered 
event-time timers.
+        */
+       private void snapshotTaskState(
+               long timestamp,
+               long checkpointID,
+               CheckpointStorageLocation checkpointStorageLocation,
+               CheckpointProperties props,
+               Execution[] executions,
+               boolean advanceToEndOfTime) {
+
+               final CheckpointOptions checkpointOptions = new 
CheckpointOptions(
+                       props.getCheckpointType(),
+                       checkpointStorageLocation.getLocationReference());
+
+               // send the messages to the tasks that trigger their checkpoint
+               for (Execution execution: executions) {
+                       if (props.isSynchronous()) {
+                               
execution.triggerSynchronousSavepoint(checkpointID, timestamp, 
checkpointOptions, advanceToEndOfTime);
+                       } else {
+                               execution.triggerCheckpoint(checkpointID, 
timestamp, checkpointOptions);
                        }
-
-                       numUnsuccessfulCheckpointsTriggers.set(0);
-                       return checkpoint.getCompletionFuture();
                }
-               catch (Throwable t) {
-                       int numUnsuccessful = 
numUnsuccessfulCheckpointsTriggers.incrementAndGet();
-                       LOG.warn("Failed to trigger checkpoint {} for job {}. 
({} consecutive failed attempts so far)",
-                                       checkpointID, job, numUnsuccessful, t);
+       }
 
-                       synchronized (lock) {
-                               if (!checkpoint.isDiscarded()) {
-                                       abortPendingCheckpoint(
-                                               checkpoint,
-                                               new CheckpointException(
-                                                       
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t));
+       /**
+        * Trigger request is successful.
+        * NOTE, it must be invoked if trigger request is successful.
+        */
+       private void onTriggerSuccess() {
+               isTriggering = false;
+               numUnsuccessfulCheckpointsTriggers.set(0);
+               checkQueuedCheckpointTriggerRequest();
+       }
+
+       /**
+        * The trigger request is failed prematurely without a proper 
initialization.
+        * There is no resource to release, but the completion promise needs to 
fail manually here.
+        *
+        * @param onCompletionPromise the completion promise of the 
checkpoint/savepoint
+        * @param throwable the reason of trigger failure
+        */
+       private void onTriggerFailure(
+               CompletableFuture<CompletedCheckpoint> onCompletionPromise, 
Throwable throwable) {
+               final CheckpointException checkpointException =
+                       
getCheckpointException(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, 
throwable);
+               onCompletionPromise.completeExceptionally(checkpointException);
+               onTriggerFailure((PendingCheckpoint) null, checkpointException);
+       }
+
+       /**
+        * The trigger request is failed.
+        * NOTE, it must be invoked if trigger request is failed.
+        *
+        * @param checkpoint the pending checkpoint which is failed. It could 
be null if it's failed
+        *                   prematurely without a proper initialization.
+        * @param throwable the reason of trigger failure
+        */
+       private void onTriggerFailure(@Nullable PendingCheckpoint checkpoint, 
Throwable throwable) {
+               try {
 
 Review comment:
   Yes, you are right, we need some checking here. As mentioned above, it's not 
easy (or a bit hacky) to check current thread for a `ScheduledExecutor`. Since 
the timer thread would be abandoned when we put all these codes into main 
thread, I think we should not spending too much time on this. We could do that 
in main thread.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to