This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1bbb8b6141635106cb137820b819fedaa8fdfce6
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon Jan 4 13:48:41 2021 +0100

    [FLINK-20846] Factor out CompletedCheckpointStore and CheckpointIDCounter 
creation in ExecutionGraphBuilder.buildGraph
---
 .../executiongraph/ExecutionGraphBuilder.java      | 70 +++++++++++++++-------
 1 file changed, 47 insertions(+), 23 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 49a812c..5dc499c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -249,8 +249,8 @@ public class ExecutionGraphBuilder {
         }
 
         // configure the state checkpointing
-        JobCheckpointingSettings snapshotSettings = 
jobGraph.getCheckpointingSettings();
-        if (snapshotSettings != null) {
+        if (isCheckpointingEnabled(jobGraph)) {
+            JobCheckpointingSettings snapshotSettings = 
jobGraph.getCheckpointingSettings();
             List<ExecutionJobVertex> triggerVertices =
                     idToVertex(snapshotSettings.getVerticesToTrigger(), 
executionGraph);
 
@@ -260,29 +260,14 @@ public class ExecutionGraphBuilder {
             List<ExecutionJobVertex> confirmVertices =
                     idToVertex(snapshotSettings.getVerticesToConfirm(), 
executionGraph);
 
-            CompletedCheckpointStore completedCheckpoints;
-            CheckpointIDCounter checkpointIdCounter;
-            try {
-                int maxNumberOfCheckpointsToRetain =
-                        
jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
-
-                if (maxNumberOfCheckpointsToRetain <= 0) {
-                    // warning and use 1 as the default value if the setting in
-                    // state.checkpoints.max-retained-checkpoints is not 
greater than 0.
-                    log.warn(
-                            "The setting for '{} : {}' is invalid. Using 
default value of {}",
-                            
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),
-                            maxNumberOfCheckpointsToRetain,
-                            
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
-
-                    maxNumberOfCheckpointsToRetain =
-                            
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
-                }
+            final CompletedCheckpointStore completedCheckpoints;
+            final CheckpointIDCounter checkpointIdCounter;
 
+            try {
                 completedCheckpoints =
-                        recoveryFactory.createCheckpointStore(
-                                jobId, maxNumberOfCheckpointsToRetain, 
classLoader);
-                checkpointIdCounter = 
recoveryFactory.createCheckpointIDCounter(jobId);
+                        createCompletedCheckpointStore(
+                                jobManagerConfig, classLoader, 
recoveryFactory, log, jobId);
+                checkpointIdCounter = 
createCheckpointIdCounter(recoveryFactory, jobId);
             } catch (Exception e) {
                 throw new JobExecutionException(
                         jobId, "Failed to initialize high-availability 
checkpoint handler", e);
@@ -380,6 +365,45 @@ public class ExecutionGraphBuilder {
         return executionGraph;
     }
 
+    private static boolean isCheckpointingEnabled(JobGraph jobGraph) {
+        return jobGraph.getCheckpointingSettings() != null;
+    }
+
+    private static CheckpointIDCounter createCheckpointIdCounter(
+            CheckpointRecoveryFactory recoveryFactory, JobID jobId) throws 
Exception {
+        return recoveryFactory.createCheckpointIDCounter(jobId);
+    }
+
+    private static CompletedCheckpointStore createCompletedCheckpointStore(
+            Configuration jobManagerConfig,
+            ClassLoader classLoader,
+            CheckpointRecoveryFactory recoveryFactory,
+            Logger log,
+            JobID jobId)
+            throws Exception {
+        CompletedCheckpointStore completedCheckpoints;
+        int maxNumberOfCheckpointsToRetain =
+                
jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
+
+        if (maxNumberOfCheckpointsToRetain <= 0) {
+            // warning and use 1 as the default value if the setting in
+            // state.checkpoints.max-retained-checkpoints is not greater than 
0.
+            log.warn(
+                    "The setting for '{} : {}' is invalid. Using default value 
of {}",
+                    CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),
+                    maxNumberOfCheckpointsToRetain,
+                    
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
+
+            maxNumberOfCheckpointsToRetain =
+                    
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
+        }
+
+        completedCheckpoints =
+                recoveryFactory.createCheckpointStore(
+                        jobId, maxNumberOfCheckpointsToRetain, classLoader);
+        return completedCheckpoints;
+    }
+
     private static List<ExecutionJobVertex> idToVertex(
             List<JobVertexID> jobVertices, ExecutionGraph executionGraph)
             throws IllegalArgumentException {

Reply via email to