gaoyunhaii commented on a change in pull request #16493:
URL: https://github.com/apache/flink/pull/16493#discussion_r670483114



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -672,6 +644,64 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
         }
     }
 
+    private void triggerCheckpointRequest(
+            CheckpointTriggerRequest request, long timestamp, 
PendingCheckpoint checkpoint) {
+        if (checkpoint.isDisposed()) {
+            onTriggerFailure(
+                    checkpoint,
+                    new CheckpointException(
+                            CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+                            checkpoint.getFailureCause()));
+        } else {
+            triggerTasks(request, timestamp, checkpoint)
+                    .exceptionally(

Review comment:
       I think we might need to execute the callback in the `timer` thread 
explicitly ? since `onTriggerFailure()` would need to be run in the `timer` 
thread and `acks` would be completed in the akka thread. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -672,6 +644,64 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
         }
     }
 
+    private void triggerCheckpointRequest(
+            CheckpointTriggerRequest request, long timestamp, 
PendingCheckpoint checkpoint) {
+        if (checkpoint.isDisposed()) {
+            onTriggerFailure(
+                    checkpoint,
+                    new CheckpointException(
+                            CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+                            checkpoint.getFailureCause()));
+        } else {
+            triggerTasks(request, timestamp, checkpoint)
+                    .exceptionally(
+                            failure -> {
+                                onTriggerFailure(
+                                        checkpoint,
+                                        new CheckpointException(
+                                                
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+                                                failure));
+                                return null;
+                            });
+
+            coordinatorsToCheckpoint.forEach(
+                    (ctx) -> 
ctx.afterSourceBarrierInjection(checkpoint.getCheckpointID()));
+            // It is possible that the tasks has finished
+            // checkpointing at this point.
+            // So we need to complete this pending checkpoint.
+            if (maybeCompleteCheckpoint(checkpoint)) {
+                onTriggerSuccess();
+            }
+        }
+    }
+
+    private CompletableFuture<Void> triggerTasks(
+            CheckpointTriggerRequest request, long timestamp, 
PendingCheckpoint checkpoint) {
+        // no exception, no discarding, everything is OK
+        final long checkpointId = checkpoint.getCheckpointId();

Review comment:
       nit: better use `getCheckpointID()` since `getCheckpointId()` is 
deprecated.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -672,6 +644,64 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
         }
     }
 
+    private void triggerCheckpointRequest(
+            CheckpointTriggerRequest request, long timestamp, 
PendingCheckpoint checkpoint) {
+        if (checkpoint.isDisposed()) {
+            onTriggerFailure(
+                    checkpoint,
+                    new CheckpointException(
+                            CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+                            checkpoint.getFailureCause()));
+        } else {
+            triggerTasks(request, timestamp, checkpoint)
+                    .exceptionally(
+                            failure -> {
+                                onTriggerFailure(
+                                        checkpoint,
+                                        new CheckpointException(
+                                                
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+                                                failure));
+                                return null;
+                            });
+
+            coordinatorsToCheckpoint.forEach(
+                    (ctx) -> 
ctx.afterSourceBarrierInjection(checkpoint.getCheckpointID()));
+            // It is possible that the tasks has finished
+            // checkpointing at this point.
+            // So we need to complete this pending checkpoint.
+            if (maybeCompleteCheckpoint(checkpoint)) {
+                onTriggerSuccess();
+            }
+        }
+    }
+
+    private CompletableFuture<Void> triggerTasks(
+            CheckpointTriggerRequest request, long timestamp, 
PendingCheckpoint checkpoint) {
+        // no exception, no discarding, everything is OK
+        final long checkpointId = checkpoint.getCheckpointId();
+
+        final CheckpointOptions checkpointOptions =
+                CheckpointOptions.forConfig(
+                        request.props.getCheckpointType(),
+                        
checkpoint.getCheckpointStorageLocation().getLocationReference(),
+                        isExactlyOnceMode,
+                        unalignedCheckpointsEnabled,
+                        alignedCheckpointTimeout);
+
+        // send the messages to the tasks that trigger their checkpoint

Review comment:
       nit: `that trigger` -> `to trigger` ? 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -672,6 +644,64 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
         }
     }
 
+    private void triggerCheckpointRequest(
+            CheckpointTriggerRequest request, long timestamp, 
PendingCheckpoint checkpoint) {
+        if (checkpoint.isDisposed()) {
+            onTriggerFailure(
+                    checkpoint,
+                    new CheckpointException(
+                            CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+                            checkpoint.getFailureCause()));
+        } else {
+            triggerTasks(request, timestamp, checkpoint)
+                    .exceptionally(
+                            failure -> {
+                                onTriggerFailure(
+                                        checkpoint,
+                                        new CheckpointException(
+                                                
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+                                                failure));
+                                return null;
+                            });
+
+            coordinatorsToCheckpoint.forEach(
+                    (ctx) -> 
ctx.afterSourceBarrierInjection(checkpoint.getCheckpointID()));
+            // It is possible that the tasks has finished
+            // checkpointing at this point.
+            // So we need to complete this pending checkpoint.
+            if (maybeCompleteCheckpoint(checkpoint)) {
+                onTriggerSuccess();

Review comment:
       There might be one problem for `onTriggerSuccess` here  that we might 
first call `onTriggerSuccess` here, and later we found trigger failure and call 
`onTriggerFailure` in the callback. This might cause some problem like it would 
modify `isTriggering = false` and schedule the next trigger task between the 
current trigger (which composes of several separate Runnables), and it might 
also have some semantics conflict. Perhaps we might change to one of the 
followings ? 
   1. Call `onTriggerSuccess()` also in the callback of trigger tasks finished. 
But if the trigger actually failed with timeout, then the trigger result check 
would be hold by rpc timeout (default 10s) and between this period we could not 
trigger new checkpoints. 
   2. We think the checkpoint is triggered successfully after this line is 
executed, and if we found triggering tasks fails we only call 
`abortPendingCheckpoint`. If we execute the callback also in timer thread, 
there would be no concurrent issue since `abortPendingCheckpoint` would always 
submitted after the current method is done. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -672,6 +644,64 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
         }
     }
 
+    private void triggerCheckpointRequest(
+            CheckpointTriggerRequest request, long timestamp, 
PendingCheckpoint checkpoint) {
+        if (checkpoint.isDisposed()) {
+            onTriggerFailure(
+                    checkpoint,
+                    new CheckpointException(
+                            CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+                            checkpoint.getFailureCause()));
+        } else {
+            triggerTasks(request, timestamp, checkpoint)
+                    .exceptionally(
+                            failure -> {
+                                onTriggerFailure(
+                                        checkpoint,
+                                        new CheckpointException(

Review comment:
       nit: perhaps we do not need to wrapper failure if it is already 
`CheckpointException` ? If task does not exist in TM side the failure would be 
`CheckpointException`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to