This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9d8ecc6cd856d2d27fbafe0a9e2b911055ecb08d Author: Dawid Wysakowicz <[email protected]> AuthorDate: Thu Dec 9 16:33:17 2021 +0100 [refactor] Extract finalizeCheckpoint method --- .../runtime/checkpoint/CheckpointCoordinator.java | 57 ++++++++++++---------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index b71deda..7d9bda9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1210,30 +1210,7 @@ public class CheckpointCoordinator { sharedStateRegistry.registerAll(operatorStates.values()); try { - try { - completedCheckpoint = - pendingCheckpoint.finalizeCheckpoint( - checkpointsCleaner, - this::scheduleTriggerRequest, - executor, - getStatsCallback(pendingCheckpoint)); - - failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId()); - } catch (Exception e1) { - // abort the current pending checkpoint if we fails to finalize the pending - // checkpoint. - if (!pendingCheckpoint.isDisposed()) { - abortPendingCheckpoint( - pendingCheckpoint, - new CheckpointException( - CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1)); - } - - throw new CheckpointException( - "Could not finalize the pending checkpoint " + checkpointId + '.', - CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, - e1); - } + completedCheckpoint = finalizeCheckpoint(pendingCheckpoint); // the pending checkpoint must be discarded after the finalization Preconditions.checkState(pendingCheckpoint.isDisposed() && completedCheckpoint != null); @@ -1286,6 +1263,37 @@ public class CheckpointCoordinator { extractIdIfDiscardedOnSubsumed(lastSubsumed)); } + private CompletedCheckpoint finalizeCheckpoint(PendingCheckpoint pendingCheckpoint) + throws CheckpointException { + try { + final CompletedCheckpoint completedCheckpoint = + pendingCheckpoint.finalizeCheckpoint( + checkpointsCleaner, + this::scheduleTriggerRequest, + executor, + getStatsCallback(pendingCheckpoint)); + + failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID()); + return completedCheckpoint; + } catch (Exception e1) { + // abort the current pending checkpoint if we fails to finalize the pending + // checkpoint. + if (!pendingCheckpoint.isDisposed()) { + abortPendingCheckpoint( + pendingCheckpoint, + new CheckpointException( + CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1)); + } + + throw new CheckpointException( + "Could not finalize the pending checkpoint " + + pendingCheckpoint.getCheckpointID() + + '.', + CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, + e1); + } + } + private long extractIdIfDiscardedOnSubsumed(CompletedCheckpoint lastSubsumed) { final long lastSubsumedCheckpointId; if (lastSubsumed != null && lastSubsumed.getProperties().discardOnSubsumed()) { @@ -1343,7 +1351,6 @@ public class CheckpointCoordinator { long completedCheckpointId, long completedTimestamp, long lastSubsumedCheckpointId) { - // commit tasks for (ExecutionVertex ev : tasksToCommit) { Execution ee = ev.getCurrentExecutionAttempt();
