akalash commented on a change in pull request #18852:
URL: https://github.com/apache/flink/pull/18852#discussion_r814014000
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -918,8 +927,8 @@ private void onTriggerFailure(
getCheckpointException(
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, throwable);
+ int numUnsuccessful =
numUnsuccessfulCheckpointsTriggers.incrementAndGet();
Review comment:
I actually have no idea why we use this variable for counting the
unsuccessful checkpoints. I think we can remove it and we can move logging to
`CheckpointFailureManager`. As you can see the `CheckpointFailureManager`
already have `continuousFailureCounter` which is used for the decision of a
failing job. I think it will be more correct information considering that
`CheckpointFailureManager#continuousFailureCounter` and
`CheckpointCoordinator#numUnsuccessfulCheckpointsTriggers` can have different
values.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -931,11 +940,17 @@ private void onTriggerFailure(
abortPendingCheckpoint(checkpoint, cause);
}
} else {
- LOG.info(
- "Failed to trigger checkpoint for job {} because {}",
+ LOG.warn(
+ "Failed to trigger checkpoint for job {}. ({}
consecutive failed attempts so far)",
job,
- throwable.getMessage());
+ numUnsuccessful,
+ throwable);
+ if (checkpoint == null
+ && CheckpointFailureReason.IO_EXCEPTION
+ == cause.getCheckpointFailureReason()) {
+ statsTracker.reportFailedCheckpointsWithoutInProgress();
Review comment:
I don't really agree with this condition. I mean if flink fails when the
pending checkpoint has been created already then we update statistics
independently on the reason for fail(even if the consecutive fail counter won't
be increased). But if we have not created the pending checkpoint then we check
for IO_EXCEPTION. In my opinion, it is not consistent behavior.
And in general, I don't really like how we use `CheckpointStatsCounts`.
Right now we increase the checkpoint in progress count and the total checkpoint
number only when the pending checkpoint has been created. But if we fail before
the pending checkpoint we need to call the separated method(as it was created
in this PR) to handle this situation.
Maybe it is better to have logic like that:
- increment the number of inProgressCheckpoints and the total checkpoints as
the first step before even checkpoint id has been created.
- add the PendingCheckpoint to stats tracker as soon as it would be
created(right now it happens at the same time with previous step after the
PendingCheckpoint is created)
- decrement the total number of checkpoints and checkpoints in progress as
the last step of the checkpoint(the same as now)
- decrement checkpoints in progress but increment failed checkpoint number
if the checkpoint fails(the same as now for PendingCheckpoint but use it for
non existing PendingCheckpoint as well)
Important notice that the number of failed checkpoint in statistic and
continuousFailureCounter in CheckpointFailureManager are totally different
numbers. The first one can be grown all the time while the second one can be
always zero if the reason of fail doesn't consider as critical.
@1996fanrui , @pnowojski , WDYT?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
##########
@@ -105,12 +105,12 @@
/** The checkpoint properties. */
private final CheckpointProperties props;
- /** Target storage location to persist the checkpoint metadata to. */
- private final CheckpointStorageLocation targetLocation;
-
/** The promise to fulfill once the checkpoint has been completed. */
private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;
+ /** Target storage location to persist the checkpoint metadata to. */
+ private CheckpointStorageLocation targetLocation;
Review comment:
Since it is not final anymore we need to carefully think about the
thread model here. I mean as I understand this field can be used from several
threads. So perhaps it should be volatile or we should use it only under the
lock.(I am not sure about that but it should be checked)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -565,20 +563,37 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
request.props,
checkpointInfo.f0,
request.isPeriodic,
-
checkpointInfo.f1.checkpointId,
-
checkpointInfo.f1.checkpointStorageLocation,
+ checkpointInfo.f1,
request.getOnCompletionFuture()),
timer);
final CompletableFuture<?> coordinatorCheckpointsComplete =
- pendingCheckpointCompletableFuture.thenComposeAsync(
- (pendingCheckpoint) ->
- OperatorCoordinatorCheckpoints
-
.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
- coordinatorsToCheckpoint,
- pendingCheckpoint,
- timer),
- timer);
+ pendingCheckpointCompletableFuture
+ .thenApplyAsync(
+ pendingCheckpoint -> {
+ try {
+ CheckpointStorageLocation
checkpointStorageLocation =
+
initializeCheckpointLocation(
+
pendingCheckpoint.getCheckpointID(),
+ request.props,
+
request.externalSavepointLocation,
+
initializeBaseLocations);
+
pendingCheckpoint.setCheckpointTargetLocation(
+ checkpointStorageLocation);
+ return pendingCheckpoint;
+ } catch (Throwable e) {
+ throw new CompletionException(e);
+ }
+ },
+ executor)
+ .thenComposeAsync(
+ (pendingCheckpoint) ->
+ OperatorCoordinatorCheckpoints
+
.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
+
coordinatorsToCheckpoint,
+ pendingCheckpoint,
+ timer),
+ timer);
Review comment:
I think that it is better to extract changes of moving the checkpoint
storage location to a separate commit(the same PR/branch but the second
commit). I think it will be better since the logging/statistic fixes and these
changes resolve the different problems. But it is better to fix only one
problem in one commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]