Myasuka commented on a change in pull request #8322: [FLINK-12364] Introduce a
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r294053711
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1329,10 +1331,13 @@ private void discardCheckpoint(PendingCheckpoint
pendingCheckpoint, @Nullable Th
LOG.info("Discarding checkpoint {} of job {}.", checkpointId,
job, cause);
- if (cause == null || cause instanceof
CheckpointDeclineException) {
-
pendingCheckpoint.abort(CheckpointFailureReason.CHECKPOINT_DECLINED, cause);
+ if (cause == null) {
+ failPendingCheckpoint(pendingCheckpoint,
CheckpointFailureReason.CHECKPOINT_DECLINED);
+ } else if (cause instanceof CheckpointException) {
+ CheckpointException exception = (CheckpointException)
cause;
+ failPendingCheckpoint(pendingCheckpoint,
exception.getCheckpointFailureReason(), cause);
} else {
-
pendingCheckpoint.abort(CheckpointFailureReason.JOB_FAILURE, cause);
+ failPendingCheckpoint(pendingCheckpoint,
CheckpointFailureReason.JOB_FAILURE, cause);
Review comment:
I noticed that you actually did not touch the `AbstractStreamOperator` part.
With your PR, if operator fail to complete the snapshot in method
`#snapshotState(long , long , CheckpointOptions , CheckpointStreamFactory )`
and it would only decline that checkpoint and return as an `Exception`. In
other words, `CheckpointFailureManager` would only process the failed
checkpoint in this line which results in not increasing
`continuousFailureCounter` in `CheckpointFailureManager`. Is this reasonable?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services