This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac6ebf25535c5447c7e9c030a9c83b5e433091fb
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Dec 9 16:28:27 2021 +0100

    [refactor] Extract addCompletedCheckpointToStore method
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 81 +++++++++++++---------
 1 file changed, 48 insertions(+), 33 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6846f42..b71deda 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1202,14 +1202,13 @@ public class CheckpointCoordinator {
             throws CheckpointException {
         final long checkpointId = pendingCheckpoint.getCheckpointId();
         final CompletedCheckpoint completedCheckpoint;
+        final CompletedCheckpoint lastSubsumed;
 
         // As a first step to complete the checkpoint, we register its state 
with the registry
         Map<OperatorID, OperatorState> operatorStates = 
pendingCheckpoint.getOperatorStates();
         SharedStateRegistry sharedStateRegistry = 
completedCheckpointStore.getSharedStateRegistry();
         sharedStateRegistry.registerAll(operatorStates.values());
 
-        long lastSubsumedCheckpointId = 
CheckpointStoreUtil.INVALID_CHECKPOINT_ID;
-
         try {
             try {
                 completedCheckpoint =
@@ -1239,36 +1238,11 @@ public class CheckpointCoordinator {
             // the pending checkpoint must be discarded after the finalization
             Preconditions.checkState(pendingCheckpoint.isDisposed() && 
completedCheckpoint != null);
 
-            try {
-                CompletedCheckpoint lastSubsumed =
-                        
completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
-                                completedCheckpoint,
-                                checkpointsCleaner,
-                                this::scheduleTriggerRequest);
-                if (lastSubsumed != null && 
lastSubsumed.getProperties().discardOnSubsumed()) {
-                    lastSubsumedCheckpointId = lastSubsumed.getCheckpointID();
-                }
-            } catch (Exception exception) {
-                if (exception instanceof PossibleInconsistentStateException) {
-                    LOG.warn(
-                            "An error occurred while writing checkpoint {} to 
the underlying metadata store. Flink was not able to determine whether the 
metadata was successfully persisted. The corresponding state located at '{}' 
won't be discarded and needs to be cleaned up manually.",
-                            completedCheckpoint.getCheckpointID(),
-                            completedCheckpoint.getExternalPointer());
-                } else {
-                    // we failed to store the completed checkpoint. Let's 
clean up
-                    checkpointsCleaner.cleanCheckpointOnFailedStoring(
-                            completedCheckpoint, executor);
-                }
-
-                sendAbortedMessages(
-                        
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
-                        checkpointId,
-                        pendingCheckpoint.getCheckpointTimestamp());
-                throw new CheckpointException(
-                        "Could not complete the pending checkpoint " + 
checkpointId + '.',
-                        CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
-                        exception);
-            }
+            lastSubsumed =
+                    addCompletedCheckpointToStoreAndSubsumeOldest(
+                            checkpointId,
+                            completedCheckpoint,
+                            
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo());
         } finally {
             pendingCheckpoints.remove(checkpointId);
             scheduleTriggerRequest();
@@ -1309,7 +1283,47 @@ public class CheckpointCoordinator {
                 pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
                 checkpointId,
                 completedCheckpoint.getTimestamp(),
-                lastSubsumedCheckpointId);
+                extractIdIfDiscardedOnSubsumed(lastSubsumed));
+    }
+
+    private long extractIdIfDiscardedOnSubsumed(CompletedCheckpoint 
lastSubsumed) {
+        final long lastSubsumedCheckpointId;
+        if (lastSubsumed != null && 
lastSubsumed.getProperties().discardOnSubsumed()) {
+            lastSubsumedCheckpointId = lastSubsumed.getCheckpointID();
+        } else {
+            lastSubsumedCheckpointId = 
CheckpointStoreUtil.INVALID_CHECKPOINT_ID;
+        }
+        return lastSubsumedCheckpointId;
+    }
+
+    private CompletedCheckpoint addCompletedCheckpointToStoreAndSubsumeOldest(
+            long checkpointId,
+            CompletedCheckpoint completedCheckpoint,
+            List<ExecutionVertex> tasksToAbort)
+            throws CheckpointException {
+        try {
+            return completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
+                    completedCheckpoint, checkpointsCleaner, 
this::scheduleTriggerRequest);
+        } catch (Exception exception) {
+            if (exception instanceof PossibleInconsistentStateException) {
+                LOG.warn(
+                        "An error occurred while writing checkpoint {} to the 
underlying metadata"
+                                + " store. Flink was not able to determine 
whether the metadata was"
+                                + " successfully persisted. The corresponding 
state located at '{}'"
+                                + " won't be discarded and needs to be cleaned 
up manually.",
+                        completedCheckpoint.getCheckpointID(),
+                        completedCheckpoint.getExternalPointer());
+            } else {
+                // we failed to store the completed checkpoint. Let's clean up
+                
checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, 
executor);
+            }
+
+            sendAbortedMessages(tasksToAbort, checkpointId, 
completedCheckpoint.getTimestamp());
+            throw new CheckpointException(
+                    "Could not complete the pending checkpoint " + 
checkpointId + '.',
+                    CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
+                    exception);
+        }
     }
 
     void scheduleTriggerRequest() {
@@ -1329,6 +1343,7 @@ public class CheckpointCoordinator {
             long completedCheckpointId,
             long completedTimestamp,
             long lastSubsumedCheckpointId) {
+
         // commit tasks
         for (ExecutionVertex ev : tasksToCommit) {
             Execution ee = ev.getCurrentExecutionAttempt();

Reply via email to