akalash commented on a change in pull request #18852:
URL: https://github.com/apache/flink/pull/18852#discussion_r815807364



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -80,25 +86,49 @@ public CheckpointFailureManager(int 
tolerableCpFailureNumber, FailJobCallback fa
      *     strategy can be used.
      * @param exception the checkpoint exception.
      * @param executionAttemptID the execution attempt id, as a safe guard.
+     * @param job JobID
+     * @param statsCallback the pending checkpoint statistics
      */
     public void handleCheckpointException(
             @Nullable PendingCheckpoint pendingCheckpoint,
             CheckpointProperties checkpointProperties,
             CheckpointException exception,
-            @Nullable ExecutionAttemptID executionAttemptID) {
+            @Nullable ExecutionAttemptID executionAttemptID,
+            JobID job,
+            @Nullable PendingCheckpointStats statsCallback) {
+        long checkpointId =
+                pendingCheckpoint == null
+                        ? UNKNOWN_CHECKPOINT_ID
+                        : pendingCheckpoint.getCheckpointID();
+        updateCheckpointStatsTracker(pendingCheckpoint, statsCallback, 
exception);
+
+        LOG.warn(
+                "Failed to trigger checkpoint {} for job {}. ({} consecutive 
failed attempts so far)",
+                checkpointId,
+                job,
+                continuousFailureCounter.get(),
+                exception);
         if (isJobManagerFailure(exception, executionAttemptID)) {
-            handleJobLevelCheckpointException(
-                    checkpointProperties,
-                    exception,
-                    pendingCheckpoint == null
-                            ? UNKNOWN_CHECKPOINT_ID
-                            : pendingCheckpoint.getCheckpointID());
+            handleJobLevelCheckpointException(checkpointProperties, exception, 
checkpointId);
         } else {
             handleTaskLevelCheckpointException(
                     checkNotNull(pendingCheckpoint), exception, 
checkNotNull(executionAttemptID));
         }
     }
 
+    private void updateCheckpointStatsTracker(

Review comment:
       Please, take a look carefully at this method. I see that it is a direct 
copy-past but you can optimize it. at least you don't need pendingCheckpoint 
here since you already have statsCallback. The java-doc is also useless now, 
'if' condition is wrong and etc.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
##########
@@ -80,25 +86,49 @@ public CheckpointFailureManager(int 
tolerableCpFailureNumber, FailJobCallback fa
      *     strategy can be used.
      * @param exception the checkpoint exception.
      * @param executionAttemptID the execution attempt id, as a safe guard.
+     * @param job JobID
+     * @param statsCallback the pending checkpoint statistics
      */
     public void handleCheckpointException(
             @Nullable PendingCheckpoint pendingCheckpoint,
             CheckpointProperties checkpointProperties,
             CheckpointException exception,
-            @Nullable ExecutionAttemptID executionAttemptID) {
+            @Nullable ExecutionAttemptID executionAttemptID,
+            JobID job,
+            @Nullable PendingCheckpointStats statsCallback) {
+        long checkpointId =
+                pendingCheckpoint == null
+                        ? UNKNOWN_CHECKPOINT_ID
+                        : pendingCheckpoint.getCheckpointID();
+        updateCheckpointStatsTracker(pendingCheckpoint, statsCallback, 
exception);
+
+        LOG.warn(
+                "Failed to trigger checkpoint {} for job {}. ({} consecutive 
failed attempts so far)",
+                checkpointId,

Review comment:
       Perhaps, it is better for the user instead of -1(UNKWOWN_CHECKPOINT) to 
print something meaningful like 'UNKNOWN' or something else.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -565,20 +563,37 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                                     request.props,
                                                     checkpointInfo.f0,
                                                     request.isPeriodic,
-                                                    
checkpointInfo.f1.checkpointId,
-                                                    
checkpointInfo.f1.checkpointStorageLocation,
+                                                    checkpointInfo.f1,
                                                     
request.getOnCompletionFuture()),
                                     timer);
 
             final CompletableFuture<?> coordinatorCheckpointsComplete =
-                    pendingCheckpointCompletableFuture.thenComposeAsync(
-                            (pendingCheckpoint) ->
-                                    OperatorCoordinatorCheckpoints
-                                            
.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
-                                                    coordinatorsToCheckpoint,
-                                                    pendingCheckpoint,
-                                                    timer),
-                            timer);
+                    pendingCheckpointCompletableFuture
+                            .thenApplyAsync(
+                                    pendingCheckpoint -> {
+                                        try {
+                                            CheckpointStorageLocation 
checkpointStorageLocation =
+                                                    
initializeCheckpointLocation(
+                                                            
pendingCheckpoint.getCheckpointID(),
+                                                            request.props,
+                                                            
request.externalSavepointLocation,
+                                                            
initializeBaseLocations);
+                                            
pendingCheckpoint.setCheckpointTargetLocation(

Review comment:
       As I see now, according to our thread model here. It is better to avoid 
the volatile inside pendingCheckpoint. You can move setting the StorageLocation 
under the lock and to `timer` thread. So in this case you can move 
`pendingCheckpoint.setCheckpointTargetLocation` to next `thenComposeAsync` or 
create new small one. And use `synchronization(lock)`around the setting 
targetLocation. And then you can remove volatile for both 
fields(targetLocation, disposed).
   I know it is not fully obvious but in parallel to your PR there is the task 
for moving all calls to JobManagerThread that make all execution here one 
threaded and it will allow avoiding synchronization at all. It is why it 
doesn't make sense to add new volatile in this PR. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to