akalash commented on a change in pull request #18852:
URL: https://github.com/apache/flink/pull/18852#discussion_r814014000



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -918,8 +927,8 @@ private void onTriggerFailure(
                     getCheckpointException(
                             
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable);
 
+            int numUnsuccessful = 
numUnsuccessfulCheckpointsTriggers.incrementAndGet();

Review comment:
       I actually have no idea why we use this variable for counting the 
unsuccessful checkpoints. I think we can remove it and we can move logging to 
`CheckpointFailureManager`. As you can see the `CheckpointFailureManager` 
already have `continuousFailureCounter` which is used for the decision of a 
failing job. I think it will be more correct information considering that 
`CheckpointFailureManager#continuousFailureCounter` and 
`CheckpointCoordinator#numUnsuccessfulCheckpointsTriggers` can have different 
values.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -931,11 +940,17 @@ private void onTriggerFailure(
                     abortPendingCheckpoint(checkpoint, cause);
                 }
             } else {
-                LOG.info(
-                        "Failed to trigger checkpoint for job {} because {}",
+                LOG.warn(
+                        "Failed to trigger checkpoint for job {}. ({} 
consecutive failed attempts so far)",
                         job,
-                        throwable.getMessage());
+                        numUnsuccessful,
+                        throwable);
 
+                if (checkpoint == null
+                        && CheckpointFailureReason.IO_EXCEPTION
+                                == cause.getCheckpointFailureReason()) {
+                    statsTracker.reportFailedCheckpointsWithoutInProgress();

Review comment:
       I don't really agree with this condition. I mean if flink fails when the 
pending checkpoint has been created already then we update statistics 
independently on the reason for fail(even if the consecutive fail counter won't 
be increased). But if we have not created the pending checkpoint then we check 
for IO_EXCEPTION. In my opinion, it is not consistent behavior.
   
   And in general, I don't really like how we use `CheckpointStatsCounts`. 
Right now we increase the checkpoint in progress count and the total checkpoint 
number only when the pending checkpoint has been created. But if we fail before 
the pending checkpoint we need to call the separated method(as it was created 
in this PR) to handle this situation. 
   Maybe it is better to have logic like that:
   - increment the number of inProgressCheckpoints and the total checkpoints as 
the first step before even checkpoint id has been created.
   - add the PendingCheckpoint to stats tracker as soon as it would be 
created(right now it happens at the same time with previous step after the 
PendingCheckpoint is created)
   - decrement the total number of checkpoints and checkpoints in progress as 
the last step of the checkpoint(the same as now)
   - decrement checkpoints in progress but increment failed checkpoint number 
if the checkpoint fails(the same as now for PendingCheckpoint but use it for 
non existing PendingCheckpoint as well)
   
   Important notice that the number of failed checkpoint in statistic and 
continuousFailureCounter in CheckpointFailureManager are totally different 
numbers. The first one can be grown all the time while the second one can be 
always zero if the reason of fail doesn't consider as critical.
   
   @1996fanrui , @pnowojski , WDYT?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
##########
@@ -105,12 +105,12 @@
     /** The checkpoint properties. */
     private final CheckpointProperties props;
 
-    /** Target storage location to persist the checkpoint metadata to. */
-    private final CheckpointStorageLocation targetLocation;
-
     /** The promise to fulfill once the checkpoint has been completed. */
     private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;
 
+    /** Target storage location to persist the checkpoint metadata to. */
+    private CheckpointStorageLocation targetLocation;

Review comment:
       Since it is not final anymore we need to carefully think about the 
thread model here. I mean as I understand this field can be used from several 
threads. So perhaps it should be volatile or we should use it only under the 
lock.(I am not sure about that but it should be checked)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -565,20 +563,37 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                                     request.props,
                                                     checkpointInfo.f0,
                                                     request.isPeriodic,
-                                                    
checkpointInfo.f1.checkpointId,
-                                                    
checkpointInfo.f1.checkpointStorageLocation,
+                                                    checkpointInfo.f1,
                                                     
request.getOnCompletionFuture()),
                                     timer);
 
             final CompletableFuture<?> coordinatorCheckpointsComplete =
-                    pendingCheckpointCompletableFuture.thenComposeAsync(
-                            (pendingCheckpoint) ->
-                                    OperatorCoordinatorCheckpoints
-                                            
.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
-                                                    coordinatorsToCheckpoint,
-                                                    pendingCheckpoint,
-                                                    timer),
-                            timer);
+                    pendingCheckpointCompletableFuture
+                            .thenApplyAsync(
+                                    pendingCheckpoint -> {
+                                        try {
+                                            CheckpointStorageLocation 
checkpointStorageLocation =
+                                                    
initializeCheckpointLocation(
+                                                            
pendingCheckpoint.getCheckpointID(),
+                                                            request.props,
+                                                            
request.externalSavepointLocation,
+                                                            
initializeBaseLocations);
+                                            
pendingCheckpoint.setCheckpointTargetLocation(
+                                                    checkpointStorageLocation);
+                                            return pendingCheckpoint;
+                                        } catch (Throwable e) {
+                                            throw new CompletionException(e);
+                                        }
+                                    },
+                                    executor)
+                            .thenComposeAsync(
+                                    (pendingCheckpoint) ->
+                                            OperatorCoordinatorCheckpoints
+                                                    
.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
+                                                            
coordinatorsToCheckpoint,
+                                                            pendingCheckpoint,
+                                                            timer),
+                                    timer);

Review comment:
       I think that it is better to extract changes of moving the checkpoint 
storage location to a separate commit(the same PR/branch but the second 
commit). I think it will be better since the logging/statistic fixes and these 
changes resolve the different problems. But it is better to fix only one 
problem in one commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to