This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9d8ecc6cd856d2d27fbafe0a9e2b911055ecb08d
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Dec 9 16:33:17 2021 +0100

    [refactor] Extract finalizeCheckpoint method
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 57 ++++++++++++----------
 1 file changed, 32 insertions(+), 25 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index b71deda..7d9bda9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1210,30 +1210,7 @@ public class CheckpointCoordinator {
         sharedStateRegistry.registerAll(operatorStates.values());
 
         try {
-            try {
-                completedCheckpoint =
-                        pendingCheckpoint.finalizeCheckpoint(
-                                checkpointsCleaner,
-                                this::scheduleTriggerRequest,
-                                executor,
-                                getStatsCallback(pendingCheckpoint));
-
-                
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
-            } catch (Exception e1) {
-                // abort the current pending checkpoint if we fails to 
finalize the pending
-                // checkpoint.
-                if (!pendingCheckpoint.isDisposed()) {
-                    abortPendingCheckpoint(
-                            pendingCheckpoint,
-                            new CheckpointException(
-                                    
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1));
-                }
-
-                throw new CheckpointException(
-                        "Could not finalize the pending checkpoint " + 
checkpointId + '.',
-                        CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
-                        e1);
-            }
+            completedCheckpoint = finalizeCheckpoint(pendingCheckpoint);
 
             // the pending checkpoint must be discarded after the finalization
             Preconditions.checkState(pendingCheckpoint.isDisposed() && 
completedCheckpoint != null);
@@ -1286,6 +1263,37 @@ public class CheckpointCoordinator {
                 extractIdIfDiscardedOnSubsumed(lastSubsumed));
     }
 
+    private CompletedCheckpoint finalizeCheckpoint(PendingCheckpoint 
pendingCheckpoint)
+            throws CheckpointException {
+        try {
+            final CompletedCheckpoint completedCheckpoint =
+                    pendingCheckpoint.finalizeCheckpoint(
+                            checkpointsCleaner,
+                            this::scheduleTriggerRequest,
+                            executor,
+                            getStatsCallback(pendingCheckpoint));
+
+            
failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID());
+            return completedCheckpoint;
+        } catch (Exception e1) {
+            // abort the current pending checkpoint if we fails to finalize 
the pending
+            // checkpoint.
+            if (!pendingCheckpoint.isDisposed()) {
+                abortPendingCheckpoint(
+                        pendingCheckpoint,
+                        new CheckpointException(
+                                
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1));
+            }
+
+            throw new CheckpointException(
+                    "Could not finalize the pending checkpoint "
+                            + pendingCheckpoint.getCheckpointID()
+                            + '.',
+                    CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE,
+                    e1);
+        }
+    }
+
     private long extractIdIfDiscardedOnSubsumed(CompletedCheckpoint 
lastSubsumed) {
         final long lastSubsumedCheckpointId;
         if (lastSubsumed != null && 
lastSubsumed.getProperties().discardOnSubsumed()) {
@@ -1343,7 +1351,6 @@ public class CheckpointCoordinator {
             long completedCheckpointId,
             long completedTimestamp,
             long lastSubsumedCheckpointId) {
-
         // commit tasks
         for (ExecutionVertex ev : tasksToCommit) {
             Execution ee = ev.getCurrentExecutionAttempt();

Reply via email to