This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1bbb8b6141635106cb137820b819fedaa8fdfce6 Author: Till Rohrmann <[email protected]> AuthorDate: Mon Jan 4 13:48:41 2021 +0100 [FLINK-20846] Factor out CompletedCheckpointStore and CheckpointIDCounter creation in ExecutionGraphBuilder.buildGraph --- .../executiongraph/ExecutionGraphBuilder.java | 70 +++++++++++++++------- 1 file changed, 47 insertions(+), 23 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index 49a812c..5dc499c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -249,8 +249,8 @@ public class ExecutionGraphBuilder { } // configure the state checkpointing - JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings(); - if (snapshotSettings != null) { + if (isCheckpointingEnabled(jobGraph)) { + JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings(); List<ExecutionJobVertex> triggerVertices = idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph); @@ -260,29 +260,14 @@ public class ExecutionGraphBuilder { List<ExecutionJobVertex> confirmVertices = idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph); - CompletedCheckpointStore completedCheckpoints; - CheckpointIDCounter checkpointIdCounter; - try { - int maxNumberOfCheckpointsToRetain = - jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS); - - if (maxNumberOfCheckpointsToRetain <= 0) { - // warning and use 1 as the default value if the setting in - // state.checkpoints.max-retained-checkpoints is not greater than 0. - log.warn( - "The setting for '{} : {}' is invalid. Using default value of {}", - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(), - maxNumberOfCheckpointsToRetain, - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()); - - maxNumberOfCheckpointsToRetain = - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue(); - } + final CompletedCheckpointStore completedCheckpoints; + final CheckpointIDCounter checkpointIdCounter; + try { completedCheckpoints = - recoveryFactory.createCheckpointStore( - jobId, maxNumberOfCheckpointsToRetain, classLoader); - checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId); + createCompletedCheckpointStore( + jobManagerConfig, classLoader, recoveryFactory, log, jobId); + checkpointIdCounter = createCheckpointIdCounter(recoveryFactory, jobId); } catch (Exception e) { throw new JobExecutionException( jobId, "Failed to initialize high-availability checkpoint handler", e); @@ -380,6 +365,45 @@ public class ExecutionGraphBuilder { return executionGraph; } + private static boolean isCheckpointingEnabled(JobGraph jobGraph) { + return jobGraph.getCheckpointingSettings() != null; + } + + private static CheckpointIDCounter createCheckpointIdCounter( + CheckpointRecoveryFactory recoveryFactory, JobID jobId) throws Exception { + return recoveryFactory.createCheckpointIDCounter(jobId); + } + + private static CompletedCheckpointStore createCompletedCheckpointStore( + Configuration jobManagerConfig, + ClassLoader classLoader, + CheckpointRecoveryFactory recoveryFactory, + Logger log, + JobID jobId) + throws Exception { + CompletedCheckpointStore completedCheckpoints; + int maxNumberOfCheckpointsToRetain = + jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS); + + if (maxNumberOfCheckpointsToRetain <= 0) { + // warning and use 1 as the default value if the setting in + // state.checkpoints.max-retained-checkpoints is not greater than 0. + log.warn( + "The setting for '{} : {}' is invalid. Using default value of {}", + CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(), + maxNumberOfCheckpointsToRetain, + CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()); + + maxNumberOfCheckpointsToRetain = + CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue(); + } + + completedCheckpoints = + recoveryFactory.createCheckpointStore( + jobId, maxNumberOfCheckpointsToRetain, classLoader); + return completedCheckpoints; + } + private static List<ExecutionJobVertex> idToVertex( List<JobVertexID> jobVertices, ExecutionGraph executionGraph) throws IllegalArgumentException {
