This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 18feec2931d2366f37069bf33cb9c6c6a4fa28d6
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon Jan 4 16:16:58 2021 +0100

    [hotfix] Factor CompletedCheckpointStore and CheckpointIDCounter factories 
out
    
    Moved the factories of the CompletedCheckpointStore and the 
CheckpointIDCounter to
    SchedulerUtils in order to make them reusable.
---
 .../executiongraph/ExecutionGraphBuilder.java      |  37 -------
 .../flink/runtime/scheduler/SchedulerBase.java     |  50 ++-------
 .../flink/runtime/scheduler/SchedulerUtils.java    | 115 +++++++++++++++++++++
 .../SchedulerUtilsTest.java}                       |   8 +-
 4 files changed, 129 insertions(+), 81 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 86c26e0..81bb33b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -29,7 +28,6 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
@@ -308,41 +306,6 @@ public class ExecutionGraphBuilder {
         return jobGraph.getCheckpointingSettings() != null;
     }
 
-    public static CheckpointIDCounter createCheckpointIdCounter(
-            CheckpointRecoveryFactory recoveryFactory, JobID jobId) throws 
Exception {
-        return recoveryFactory.createCheckpointIDCounter(jobId);
-    }
-
-    public static CompletedCheckpointStore createCompletedCheckpointStore(
-            Configuration jobManagerConfig,
-            ClassLoader classLoader,
-            CheckpointRecoveryFactory recoveryFactory,
-            Logger log,
-            JobID jobId)
-            throws Exception {
-        CompletedCheckpointStore completedCheckpoints;
-        int maxNumberOfCheckpointsToRetain =
-                
jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
-
-        if (maxNumberOfCheckpointsToRetain <= 0) {
-            // warning and use 1 as the default value if the setting in
-            // state.checkpoints.max-retained-checkpoints is not greater than 
0.
-            log.warn(
-                    "The setting for '{} : {}' is invalid. Using default value 
of {}",
-                    CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),
-                    maxNumberOfCheckpointsToRetain,
-                    
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
-
-            maxNumberOfCheckpointsToRetain =
-                    
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
-        }
-
-        completedCheckpoints =
-                recoveryFactory.createCheckpointStore(
-                        jobId, maxNumberOfCheckpointsToRetain, classLoader);
-        return completedCheckpoints;
-    }
-
     private static List<ExecutionJobVertex> idToVertex(
             List<JobVertexID> jobVertices, ExecutionGraph executionGraph)
             throws IllegalArgumentException {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index cf93cdc..6390932 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -39,8 +39,6 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import 
org.apache.flink.runtime.checkpoint.DeactivatedCheckpointCompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -219,8 +217,16 @@ public abstract class SchedulerBase implements SchedulerNG 
{
         this.executionVertexVersioner = checkNotNull(executionVertexVersioner);
 
         this.checkpointsCleaner = new CheckpointsCleaner();
-        this.completedCheckpointStore = createCompletedCheckpointStore();
-        this.checkpointIdCounter = createCheckpointIdCounter();
+        this.completedCheckpointStore =
+                
SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(
+                        jobGraph,
+                        jobMasterConfiguration,
+                        userCodeLoader,
+                        checkpointRecoveryFactory,
+                        log);
+        this.checkpointIdCounter =
+                
SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(
+                        jobGraph, checkpointRecoveryFactory);
 
         this.executionGraph =
                 createAndRestoreExecutionGraph(
@@ -277,42 +283,6 @@ public abstract class SchedulerBase implements SchedulerNG 
{
         }
     }
 
-    private CompletedCheckpointStore createCompletedCheckpointStore() throws 
JobExecutionException {
-        final JobID jobId = jobGraph.getJobID();
-        if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
-            try {
-                return ExecutionGraphBuilder.createCompletedCheckpointStore(
-                        jobMasterConfiguration,
-                        userCodeLoader,
-                        checkpointRecoveryFactory,
-                        log,
-                        jobId);
-            } catch (Exception e) {
-                throw new JobExecutionException(
-                        jobId,
-                        "Failed to initialize high-availability completed 
checkpoint store",
-                        e);
-            }
-        } else {
-            return DeactivatedCheckpointCompletedCheckpointStore.INSTANCE;
-        }
-    }
-
-    private CheckpointIDCounter createCheckpointIdCounter() throws 
JobExecutionException {
-        final JobID jobId = jobGraph.getJobID();
-        if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
-            try {
-                return ExecutionGraphBuilder.createCheckpointIdCounter(
-                        checkpointRecoveryFactory, jobId);
-            } catch (Exception e) {
-                throw new JobExecutionException(
-                        jobId, "Failed to initialize high-availability 
checkpoint id counter", e);
-            }
-        } else {
-            return DeactivatedCheckpointIDCounter.INSTANCE;
-        }
-    }
-
     private ExecutionGraph createAndRestoreExecutionGraph(
             JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
             CompletedCheckpointStore completedCheckpointStore,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
new file mode 100644
index 0000000..b379a50
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import 
org.apache.flink.runtime.checkpoint.DeactivatedCheckpointCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointIDCounter;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.slf4j.Logger;
+
+/** Utils class for Flink's scheduler implementations. */
+public final class SchedulerUtils {
+
+    private SchedulerUtils() {
+        throw new UnsupportedOperationException(
+                "Instantiation of SchedulerUtils is not supported.");
+    }
+
+    public static CompletedCheckpointStore 
createCompletedCheckpointStoreIfCheckpointingIsEnabled(
+            JobGraph jobGraph,
+            Configuration configuration,
+            ClassLoader userCodeLoader,
+            CheckpointRecoveryFactory checkpointRecoveryFactory,
+            Logger log)
+            throws JobExecutionException {
+        final JobID jobId = jobGraph.getJobID();
+        if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
+            try {
+                return createCompletedCheckpointStore(
+                        configuration, userCodeLoader, 
checkpointRecoveryFactory, log, jobId);
+            } catch (Exception e) {
+                throw new JobExecutionException(
+                        jobId,
+                        "Failed to initialize high-availability completed 
checkpoint store",
+                        e);
+            }
+        } else {
+            return DeactivatedCheckpointCompletedCheckpointStore.INSTANCE;
+        }
+    }
+
+    @VisibleForTesting
+    static CompletedCheckpointStore createCompletedCheckpointStore(
+            Configuration jobManagerConfig,
+            ClassLoader classLoader,
+            CheckpointRecoveryFactory recoveryFactory,
+            Logger log,
+            JobID jobId)
+            throws Exception {
+        int maxNumberOfCheckpointsToRetain =
+                
jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
+
+        if (maxNumberOfCheckpointsToRetain <= 0) {
+            // warning and use 1 as the default value if the setting in
+            // state.checkpoints.max-retained-checkpoints is not greater than 
0.
+            log.warn(
+                    "The setting for '{} : {}' is invalid. Using default value 
of {}",
+                    CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),
+                    maxNumberOfCheckpointsToRetain,
+                    
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
+
+            maxNumberOfCheckpointsToRetain =
+                    
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
+        }
+
+        return recoveryFactory.createCheckpointStore(
+                jobId, maxNumberOfCheckpointsToRetain, classLoader);
+    }
+
+    public static CheckpointIDCounter 
createCheckpointIDCounterIfCheckpointingIsEnabled(
+            JobGraph jobGraph, CheckpointRecoveryFactory 
checkpointRecoveryFactory)
+            throws JobExecutionException {
+        final JobID jobId = jobGraph.getJobID();
+        if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
+            try {
+                return createCheckpointIdCounter(checkpointRecoveryFactory, 
jobId);
+            } catch (Exception e) {
+                throw new JobExecutionException(
+                        jobId, "Failed to initialize high-availability 
checkpoint id counter", e);
+            }
+        } else {
+            return DeactivatedCheckpointIDCounter.INSTANCE;
+        }
+    }
+
+    private static CheckpointIDCounter createCheckpointIdCounter(
+            CheckpointRecoveryFactory recoveryFactory, JobID jobId) throws 
Exception {
+        return recoveryFactory.createCheckpointIDCounter(jobId);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
similarity index 89%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilderTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
index 8ed09ba..23073aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.executiongraph;
+package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.CheckpointingOptions;
@@ -29,8 +29,8 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
-/** Tests for the {@link ExecutionGraphBuilder}. */
-public class ExecutionGraphBuilderTest extends TestLogger {
+/** Tests for the {@link SchedulerUtils} utilities. */
+public class SchedulerUtilsTest extends TestLogger {
 
     @Test
     public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception {
@@ -41,7 +41,7 @@ public class ExecutionGraphBuilderTest extends TestLogger {
                 CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 
maxNumberOfCheckpointsToRetain);
 
         final CompletedCheckpointStore completedCheckpointStore =
-                ExecutionGraphBuilder.createCompletedCheckpointStore(
+                SchedulerUtils.createCompletedCheckpointStore(
                         jobManagerConfig,
                         getClass().getClassLoader(),
                         new StandaloneCheckpointRecoveryFactory(),

Reply via email to