This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 01e4745283f1031e02aba21b2f91ebcbb87d3acd
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon Jan 4 14:50:16 2021 +0100

    [FLINK-20846] Move CompletedCheckpointStore creation out of 
ExecutionGraphBuilder.buildGraph
---
 ...ctivatedCheckpointCompletedCheckpointStore.java | 76 ++++++++++++++++++++++
 .../executiongraph/ExecutionGraphBuilder.java      | 16 +----
 .../flink/runtime/scheduler/SchedulerBase.java     | 12 ++++
 .../executiongraph/ExecutionGraphBuilderTest.java  | 55 ++++++++++++++++
 .../ExecutionGraphDeploymentTest.java              | 17 -----
 .../TestingExecutionGraphBuilder.java              | 11 ++++
 6 files changed, 157 insertions(+), 30 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java
new file mode 100644
index 0000000..1a51a89
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobStatus;
+
+import java.util.List;
+
+/**
+ * This class represents a {@link CompletedCheckpointStore} if checkpointing 
has been enabled.
+ * Consequently, no component should use methods other than {@link #shutdown}.
+ */
+public enum DeactivatedCheckpointCompletedCheckpointStore implements 
CompletedCheckpointStore {
+    INSTANCE;
+
+    @Override
+    public void recover() throws Exception {
+        throw unsupportedOperationException();
+    }
+
+    @Override
+    public void addCheckpoint(
+            CompletedCheckpoint checkpoint,
+            CheckpointsCleaner checkpointsCleaner,
+            Runnable postCleanup)
+            throws Exception {
+        throw unsupportedOperationException();
+    }
+
+    @Override
+    public void shutdown(
+            JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner, 
Runnable postCleanup)
+            throws Exception {}
+
+    @Override
+    public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
+        throw unsupportedOperationException();
+    }
+
+    @Override
+    public int getNumberOfRetainedCheckpoints() {
+        throw unsupportedOperationException();
+    }
+
+    @Override
+    public int getMaxNumberOfRetainedCheckpoints() {
+        throw unsupportedOperationException();
+    }
+
+    @Override
+    public boolean requiresExternalizedCheckpoints() {
+        throw unsupportedOperationException();
+    }
+
+    private UnsupportedOperationException unsupportedOperationException() {
+        return new UnsupportedOperationException(
+                String.format(
+                        "The %s cannot store completed checkpoints.", 
getClass().getSimpleName()));
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 0e70f7b..bd0f526 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -80,6 +80,7 @@ public class ExecutionGraphBuilder {
             SlotProvider slotProvider,
             ClassLoader classLoader,
             CheckpointRecoveryFactory recoveryFactory,
+            CompletedCheckpointStore completedCheckpointStore,
             CheckpointIDCounter checkpointIdCounter,
             Time rpcTimeout,
             MetricGroup metrics,
@@ -209,17 +210,6 @@ public class ExecutionGraphBuilder {
             List<ExecutionJobVertex> confirmVertices =
                     idToVertex(snapshotSettings.getVerticesToConfirm(), 
executionGraph);
 
-            final CompletedCheckpointStore completedCheckpoints;
-
-            try {
-                completedCheckpoints =
-                        createCompletedCheckpointStore(
-                                jobManagerConfig, classLoader, 
recoveryFactory, log, jobId);
-            } catch (Exception e) {
-                throw new JobExecutionException(
-                        jobId, "Failed to initialize high-availability 
checkpoint handler", e);
-            }
-
             // Maximum number of remembered checkpoints
             int historySize = 
jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
 
@@ -298,7 +288,7 @@ public class ExecutionGraphBuilder {
                     confirmVertices,
                     hooks,
                     checkpointIdCounter,
-                    completedCheckpoints,
+                    completedCheckpointStore,
                     rootBackend,
                     checkpointStatsTracker);
         }
@@ -321,7 +311,7 @@ public class ExecutionGraphBuilder {
         return recoveryFactory.createCheckpointIDCounter(jobId);
     }
 
-    private static CompletedCheckpointStore createCompletedCheckpointStore(
+    public static CompletedCheckpointStore createCompletedCheckpointStore(
             Configuration jobManagerConfig,
             ClassLoader classLoader,
             CheckpointRecoveryFactory recoveryFactory,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 2feb12a..9dc60ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -37,6 +37,8 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import 
org.apache.flink.runtime.checkpoint.DeactivatedCheckpointCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -280,18 +282,27 @@ public abstract class SchedulerBase implements 
SchedulerNG {
 
         final JobID jobId = jobGraph.getJobID();
         final CheckpointIDCounter checkpointIdCounter;
+        final CompletedCheckpointStore completedCheckpointStore;
 
         if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
             try {
                 checkpointIdCounter =
                         ExecutionGraphBuilder.createCheckpointIdCounter(
                                 checkpointRecoveryFactory, jobId);
+                completedCheckpointStore =
+                        ExecutionGraphBuilder.createCompletedCheckpointStore(
+                                jobMasterConfiguration,
+                                userCodeLoader,
+                                checkpointRecoveryFactory,
+                                log,
+                                jobId);
             } catch (Exception e) {
                 throw new JobExecutionException(
                         jobId, "Failed to initialize high-availability 
checkpoint handler", e);
             }
         } else {
             checkpointIdCounter = DeactivatedCheckpointIDCounter.INSTANCE;
+            completedCheckpointStore = 
DeactivatedCheckpointCompletedCheckpointStore.INSTANCE;
         }
 
         return ExecutionGraphBuilder.buildGraph(
@@ -302,6 +313,7 @@ public abstract class SchedulerBase implements SchedulerNG {
                 slotProvider,
                 userCodeLoader,
                 checkpointRecoveryFactory,
+                completedCheckpointStore,
                 checkpointIdCounter,
                 rpcTimeout,
                 currentJobManagerJobMetricGroup,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilderTest.java
new file mode 100644
index 0000000..8ed09ba
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilderTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests for the {@link ExecutionGraphBuilder}. */
+public class ExecutionGraphBuilderTest extends TestLogger {
+
+    @Test
+    public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception {
+
+        final int maxNumberOfCheckpointsToRetain = 10;
+        final Configuration jobManagerConfig = new Configuration();
+        jobManagerConfig.setInteger(
+                CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 
maxNumberOfCheckpointsToRetain);
+
+        final CompletedCheckpointStore completedCheckpointStore =
+                ExecutionGraphBuilder.createCompletedCheckpointStore(
+                        jobManagerConfig,
+                        getClass().getClassLoader(),
+                        new StandaloneCheckpointRecoveryFactory(),
+                        log,
+                        new JobID());
+
+        assertEquals(
+                maxNumberOfCheckpointsToRetain,
+                completedCheckpointStore.getMaxNumberOfRetainedCheckpoints());
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 3ff9e5b..9c6644d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -522,23 +522,6 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
                         .getMaxNumberOfRetainedCheckpoints());
     }
 
-    @Test
-    public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception {
-
-        final int maxNumberOfCheckpointsToRetain = 10;
-        final Configuration jobManagerConfig = new Configuration();
-        jobManagerConfig.setInteger(
-                CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 
maxNumberOfCheckpointsToRetain);
-
-        final ExecutionGraph eg = createExecutionGraph(jobManagerConfig);
-
-        assertEquals(
-                maxNumberOfCheckpointsToRetain,
-                eg.getCheckpointCoordinator()
-                        .getCheckpointStore()
-                        .getMaxNumberOfRetainedCheckpoints());
-    }
-
     private SchedulerBase setupScheduler(JobVertex v1, int dop1, JobVertex v2, 
int dop2)
             throws Exception {
         v1.setParallelism(dop1);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java
index a7298d4..57d48a2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java
@@ -28,8 +28,10 @@ import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
@@ -74,6 +76,8 @@ public class TestingExecutionGraphBuilder {
     private MetricGroup metricGroup = new UnregisteredMetricsGroup();
     private CheckpointRecoveryFactory checkpointRecoveryFactory =
             new StandaloneCheckpointRecoveryFactory();
+    private CompletedCheckpointStore completedCheckpointStore =
+            new StandaloneCompletedCheckpointStore(1);
     private CheckpointIDCounter checkpointIdCounter = new 
StandaloneCheckpointIDCounter();
     private ExecutionDeploymentListener executionDeploymentListener =
             NoOpExecutionDeploymentListener.get();
@@ -148,6 +152,12 @@ public class TestingExecutionGraphBuilder {
         return this;
     }
 
+    public TestingExecutionGraphBuilder setCompletedCheckpointStore(
+            CompletedCheckpointStore completedCheckpointStore) {
+        this.completedCheckpointStore = completedCheckpointStore;
+        return this;
+    }
+
     public TestingExecutionGraphBuilder setCheckpointIdCounter(
             CheckpointIDCounter checkpointIdCounter) {
         this.checkpointIdCounter = checkpointIdCounter;
@@ -175,6 +185,7 @@ public class TestingExecutionGraphBuilder {
                 slotProvider,
                 userClassLoader,
                 checkpointRecoveryFactory,
+                completedCheckpointStore,
                 checkpointIdCounter,
                 rpcTimeout,
                 metricGroup,

Reply via email to