This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ac6ebf25535c5447c7e9c030a9c83b5e433091fb Author: Dawid Wysakowicz <[email protected]> AuthorDate: Thu Dec 9 16:28:27 2021 +0100 [refactor] Extract addCompletedCheckpointToStore method --- .../runtime/checkpoint/CheckpointCoordinator.java | 81 +++++++++++++--------- 1 file changed, 48 insertions(+), 33 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 6846f42..b71deda 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1202,14 +1202,13 @@ public class CheckpointCoordinator { throws CheckpointException { final long checkpointId = pendingCheckpoint.getCheckpointId(); final CompletedCheckpoint completedCheckpoint; + final CompletedCheckpoint lastSubsumed; // As a first step to complete the checkpoint, we register its state with the registry Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates(); SharedStateRegistry sharedStateRegistry = completedCheckpointStore.getSharedStateRegistry(); sharedStateRegistry.registerAll(operatorStates.values()); - long lastSubsumedCheckpointId = CheckpointStoreUtil.INVALID_CHECKPOINT_ID; - try { try { completedCheckpoint = @@ -1239,36 +1238,11 @@ public class CheckpointCoordinator { // the pending checkpoint must be discarded after the finalization Preconditions.checkState(pendingCheckpoint.isDisposed() && completedCheckpoint != null); - try { - CompletedCheckpoint lastSubsumed = - completedCheckpointStore.addCheckpointAndSubsumeOldestOne( - completedCheckpoint, - checkpointsCleaner, - this::scheduleTriggerRequest); - if (lastSubsumed != null && lastSubsumed.getProperties().discardOnSubsumed()) { - lastSubsumedCheckpointId = lastSubsumed.getCheckpointID(); - } - } catch (Exception exception) { - if (exception instanceof PossibleInconsistentStateException) { - LOG.warn( - "An error occurred while writing checkpoint {} to the underlying metadata store. Flink was not able to determine whether the metadata was successfully persisted. The corresponding state located at '{}' won't be discarded and needs to be cleaned up manually.", - completedCheckpoint.getCheckpointID(), - completedCheckpoint.getExternalPointer()); - } else { - // we failed to store the completed checkpoint. Let's clean up - checkpointsCleaner.cleanCheckpointOnFailedStoring( - completedCheckpoint, executor); - } - - sendAbortedMessages( - pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(), - checkpointId, - pendingCheckpoint.getCheckpointTimestamp()); - throw new CheckpointException( - "Could not complete the pending checkpoint " + checkpointId + '.', - CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, - exception); - } + lastSubsumed = + addCompletedCheckpointToStoreAndSubsumeOldest( + checkpointId, + completedCheckpoint, + pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo()); } finally { pendingCheckpoints.remove(checkpointId); scheduleTriggerRequest(); @@ -1309,7 +1283,47 @@ public class CheckpointCoordinator { pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(), checkpointId, completedCheckpoint.getTimestamp(), - lastSubsumedCheckpointId); + extractIdIfDiscardedOnSubsumed(lastSubsumed)); + } + + private long extractIdIfDiscardedOnSubsumed(CompletedCheckpoint lastSubsumed) { + final long lastSubsumedCheckpointId; + if (lastSubsumed != null && lastSubsumed.getProperties().discardOnSubsumed()) { + lastSubsumedCheckpointId = lastSubsumed.getCheckpointID(); + } else { + lastSubsumedCheckpointId = CheckpointStoreUtil.INVALID_CHECKPOINT_ID; + } + return lastSubsumedCheckpointId; + } + + private CompletedCheckpoint addCompletedCheckpointToStoreAndSubsumeOldest( + long checkpointId, + CompletedCheckpoint completedCheckpoint, + List<ExecutionVertex> tasksToAbort) + throws CheckpointException { + try { + return completedCheckpointStore.addCheckpointAndSubsumeOldestOne( + completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest); + } catch (Exception exception) { + if (exception instanceof PossibleInconsistentStateException) { + LOG.warn( + "An error occurred while writing checkpoint {} to the underlying metadata" + + " store. Flink was not able to determine whether the metadata was" + + " successfully persisted. The corresponding state located at '{}'" + + " won't be discarded and needs to be cleaned up manually.", + completedCheckpoint.getCheckpointID(), + completedCheckpoint.getExternalPointer()); + } else { + // we failed to store the completed checkpoint. Let's clean up + checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, executor); + } + + sendAbortedMessages(tasksToAbort, checkpointId, completedCheckpoint.getTimestamp()); + throw new CheckpointException( + "Could not complete the pending checkpoint " + checkpointId + '.', + CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, + exception); + } } void scheduleTriggerRequest() { @@ -1329,6 +1343,7 @@ public class CheckpointCoordinator { long completedCheckpointId, long completedTimestamp, long lastSubsumedCheckpointId) { + // commit tasks for (ExecutionVertex ev : tasksToCommit) { Execution ee = ev.getCurrentExecutionAttempt();
