This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 18feec2931d2366f37069bf33cb9c6c6a4fa28d6 Author: Till Rohrmann <[email protected]> AuthorDate: Mon Jan 4 16:16:58 2021 +0100 [hotfix] Factor CompletedCheckpointStore and CheckpointIDCounter factories out Moved the factories of the CompletedCheckpointStore and the CheckpointIDCounter to SchedulerUtils in order to make them reusable. --- .../executiongraph/ExecutionGraphBuilder.java | 37 ------- .../flink/runtime/scheduler/SchedulerBase.java | 50 ++------- .../flink/runtime/scheduler/SchedulerUtils.java | 115 +++++++++++++++++++++ .../SchedulerUtilsTest.java} | 8 +- 4 files changed, 129 insertions(+), 81 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index 86c26e0..81bb33b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; @@ -29,7 +28,6 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; @@ -308,41 +306,6 @@ public class ExecutionGraphBuilder { return jobGraph.getCheckpointingSettings() != null; } - public static CheckpointIDCounter createCheckpointIdCounter( - CheckpointRecoveryFactory recoveryFactory, JobID jobId) throws Exception { - return recoveryFactory.createCheckpointIDCounter(jobId); - } - - public static CompletedCheckpointStore createCompletedCheckpointStore( - Configuration jobManagerConfig, - ClassLoader classLoader, - CheckpointRecoveryFactory recoveryFactory, - Logger log, - JobID jobId) - throws Exception { - CompletedCheckpointStore completedCheckpoints; - int maxNumberOfCheckpointsToRetain = - jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS); - - if (maxNumberOfCheckpointsToRetain <= 0) { - // warning and use 1 as the default value if the setting in - // state.checkpoints.max-retained-checkpoints is not greater than 0. - log.warn( - "The setting for '{} : {}' is invalid. Using default value of {}", - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(), - maxNumberOfCheckpointsToRetain, - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()); - - maxNumberOfCheckpointsToRetain = - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue(); - } - - completedCheckpoints = - recoveryFactory.createCheckpointStore( - jobId, maxNumberOfCheckpointsToRetain, classLoader); - return completedCheckpoints; - } - private static List<ExecutionJobVertex> idToVertex( List<JobVertexID> jobVertices, ExecutionGraph executionGraph) throws IllegalArgumentException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index cf93cdc..6390932 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -39,8 +39,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; -import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointCompletedCheckpointStore; -import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; @@ -219,8 +217,16 @@ public abstract class SchedulerBase implements SchedulerNG { this.executionVertexVersioner = checkNotNull(executionVertexVersioner); this.checkpointsCleaner = new CheckpointsCleaner(); - this.completedCheckpointStore = createCompletedCheckpointStore(); - this.checkpointIdCounter = createCheckpointIdCounter(); + this.completedCheckpointStore = + SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled( + jobGraph, + jobMasterConfiguration, + userCodeLoader, + checkpointRecoveryFactory, + log); + this.checkpointIdCounter = + SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled( + jobGraph, checkpointRecoveryFactory); this.executionGraph = createAndRestoreExecutionGraph( @@ -277,42 +283,6 @@ public abstract class SchedulerBase implements SchedulerNG { } } - private CompletedCheckpointStore createCompletedCheckpointStore() throws JobExecutionException { - final JobID jobId = jobGraph.getJobID(); - if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) { - try { - return ExecutionGraphBuilder.createCompletedCheckpointStore( - jobMasterConfiguration, - userCodeLoader, - checkpointRecoveryFactory, - log, - jobId); - } catch (Exception e) { - throw new JobExecutionException( - jobId, - "Failed to initialize high-availability completed checkpoint store", - e); - } - } else { - return DeactivatedCheckpointCompletedCheckpointStore.INSTANCE; - } - } - - private CheckpointIDCounter createCheckpointIdCounter() throws JobExecutionException { - final JobID jobId = jobGraph.getJobID(); - if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) { - try { - return ExecutionGraphBuilder.createCheckpointIdCounter( - checkpointRecoveryFactory, jobId); - } catch (Exception e) { - throw new JobExecutionException( - jobId, "Failed to initialize high-availability checkpoint id counter", e); - } - } else { - return DeactivatedCheckpointIDCounter.INSTANCE; - } - } - private ExecutionGraph createAndRestoreExecutionGraph( JobManagerJobMetricGroup currentJobManagerJobMetricGroup, CompletedCheckpointStore completedCheckpointStore, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java new file mode 100644 index 0000000..b379a50 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointIDCounter; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import org.slf4j.Logger; + +/** Utils class for Flink's scheduler implementations. */ +public final class SchedulerUtils { + + private SchedulerUtils() { + throw new UnsupportedOperationException( + "Instantiation of SchedulerUtils is not supported."); + } + + public static CompletedCheckpointStore createCompletedCheckpointStoreIfCheckpointingIsEnabled( + JobGraph jobGraph, + Configuration configuration, + ClassLoader userCodeLoader, + CheckpointRecoveryFactory checkpointRecoveryFactory, + Logger log) + throws JobExecutionException { + final JobID jobId = jobGraph.getJobID(); + if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) { + try { + return createCompletedCheckpointStore( + configuration, userCodeLoader, checkpointRecoveryFactory, log, jobId); + } catch (Exception e) { + throw new JobExecutionException( + jobId, + "Failed to initialize high-availability completed checkpoint store", + e); + } + } else { + return DeactivatedCheckpointCompletedCheckpointStore.INSTANCE; + } + } + + @VisibleForTesting + static CompletedCheckpointStore createCompletedCheckpointStore( + Configuration jobManagerConfig, + ClassLoader classLoader, + CheckpointRecoveryFactory recoveryFactory, + Logger log, + JobID jobId) + throws Exception { + int maxNumberOfCheckpointsToRetain = + jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS); + + if (maxNumberOfCheckpointsToRetain <= 0) { + // warning and use 1 as the default value if the setting in + // state.checkpoints.max-retained-checkpoints is not greater than 0. + log.warn( + "The setting for '{} : {}' is invalid. Using default value of {}", + CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(), + maxNumberOfCheckpointsToRetain, + CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()); + + maxNumberOfCheckpointsToRetain = + CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue(); + } + + return recoveryFactory.createCheckpointStore( + jobId, maxNumberOfCheckpointsToRetain, classLoader); + } + + public static CheckpointIDCounter createCheckpointIDCounterIfCheckpointingIsEnabled( + JobGraph jobGraph, CheckpointRecoveryFactory checkpointRecoveryFactory) + throws JobExecutionException { + final JobID jobId = jobGraph.getJobID(); + if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) { + try { + return createCheckpointIdCounter(checkpointRecoveryFactory, jobId); + } catch (Exception e) { + throw new JobExecutionException( + jobId, "Failed to initialize high-availability checkpoint id counter", e); + } + } else { + return DeactivatedCheckpointIDCounter.INSTANCE; + } + } + + private static CheckpointIDCounter createCheckpointIdCounter( + CheckpointRecoveryFactory recoveryFactory, JobID jobId) throws Exception { + return recoveryFactory.createCheckpointIDCounter(jobId); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java similarity index 89% rename from flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilderTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java index 8ed09ba..23073aa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.executiongraph; +package org.apache.flink.runtime.scheduler; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.CheckpointingOptions; @@ -29,8 +29,8 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; -/** Tests for the {@link ExecutionGraphBuilder}. */ -public class ExecutionGraphBuilderTest extends TestLogger { +/** Tests for the {@link SchedulerUtils} utilities. */ +public class SchedulerUtilsTest extends TestLogger { @Test public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception { @@ -41,7 +41,7 @@ public class ExecutionGraphBuilderTest extends TestLogger { CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, maxNumberOfCheckpointsToRetain); final CompletedCheckpointStore completedCheckpointStore = - ExecutionGraphBuilder.createCompletedCheckpointStore( + SchedulerUtils.createCompletedCheckpointStore( jobManagerConfig, getClass().getClassLoader(), new StandaloneCheckpointRecoveryFactory(),
