This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 01e4745283f1031e02aba21b2f91ebcbb87d3acd Author: Till Rohrmann <[email protected]> AuthorDate: Mon Jan 4 14:50:16 2021 +0100 [FLINK-20846] Move CompletedCheckpointStore creation out of ExecutionGraphBuilder.buildGraph --- ...ctivatedCheckpointCompletedCheckpointStore.java | 76 ++++++++++++++++++++++ .../executiongraph/ExecutionGraphBuilder.java | 16 +---- .../flink/runtime/scheduler/SchedulerBase.java | 12 ++++ .../executiongraph/ExecutionGraphBuilderTest.java | 55 ++++++++++++++++ .../ExecutionGraphDeploymentTest.java | 17 ----- .../TestingExecutionGraphBuilder.java | 11 ++++ 6 files changed, 157 insertions(+), 30 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java new file mode 100644 index 0000000..1a51a89 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobStatus; + +import java.util.List; + +/** + * This class represents a {@link CompletedCheckpointStore} if checkpointing has been enabled. + * Consequently, no component should use methods other than {@link #shutdown}. + */ +public enum DeactivatedCheckpointCompletedCheckpointStore implements CompletedCheckpointStore { + INSTANCE; + + @Override + public void recover() throws Exception { + throw unsupportedOperationException(); + } + + @Override + public void addCheckpoint( + CompletedCheckpoint checkpoint, + CheckpointsCleaner checkpointsCleaner, + Runnable postCleanup) + throws Exception { + throw unsupportedOperationException(); + } + + @Override + public void shutdown( + JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) + throws Exception {} + + @Override + public List<CompletedCheckpoint> getAllCheckpoints() throws Exception { + throw unsupportedOperationException(); + } + + @Override + public int getNumberOfRetainedCheckpoints() { + throw unsupportedOperationException(); + } + + @Override + public int getMaxNumberOfRetainedCheckpoints() { + throw unsupportedOperationException(); + } + + @Override + public boolean requiresExternalizedCheckpoints() { + throw unsupportedOperationException(); + } + + private UnsupportedOperationException unsupportedOperationException() { + return new UnsupportedOperationException( + String.format( + "The %s cannot store completed checkpoints.", getClass().getSimpleName())); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index 0e70f7b..bd0f526 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -80,6 +80,7 @@ public class ExecutionGraphBuilder { SlotProvider slotProvider, ClassLoader classLoader, CheckpointRecoveryFactory recoveryFactory, + CompletedCheckpointStore completedCheckpointStore, CheckpointIDCounter checkpointIdCounter, Time rpcTimeout, MetricGroup metrics, @@ -209,17 +210,6 @@ public class ExecutionGraphBuilder { List<ExecutionJobVertex> confirmVertices = idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph); - final CompletedCheckpointStore completedCheckpoints; - - try { - completedCheckpoints = - createCompletedCheckpointStore( - jobManagerConfig, classLoader, recoveryFactory, log, jobId); - } catch (Exception e) { - throw new JobExecutionException( - jobId, "Failed to initialize high-availability checkpoint handler", e); - } - // Maximum number of remembered checkpoints int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE); @@ -298,7 +288,7 @@ public class ExecutionGraphBuilder { confirmVertices, hooks, checkpointIdCounter, - completedCheckpoints, + completedCheckpointStore, rootBackend, checkpointStatsTracker); } @@ -321,7 +311,7 @@ public class ExecutionGraphBuilder { return recoveryFactory.createCheckpointIDCounter(jobId); } - private static CompletedCheckpointStore createCompletedCheckpointStore( + public static CompletedCheckpointStore createCompletedCheckpointStore( Configuration jobManagerConfig, ClassLoader classLoader, CheckpointRecoveryFactory recoveryFactory, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 2feb12a..9dc60ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -37,6 +37,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.DeactivatedCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.client.JobExecutionException; @@ -280,18 +282,27 @@ public abstract class SchedulerBase implements SchedulerNG { final JobID jobId = jobGraph.getJobID(); final CheckpointIDCounter checkpointIdCounter; + final CompletedCheckpointStore completedCheckpointStore; if (ExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) { try { checkpointIdCounter = ExecutionGraphBuilder.createCheckpointIdCounter( checkpointRecoveryFactory, jobId); + completedCheckpointStore = + ExecutionGraphBuilder.createCompletedCheckpointStore( + jobMasterConfiguration, + userCodeLoader, + checkpointRecoveryFactory, + log, + jobId); } catch (Exception e) { throw new JobExecutionException( jobId, "Failed to initialize high-availability checkpoint handler", e); } } else { checkpointIdCounter = DeactivatedCheckpointIDCounter.INSTANCE; + completedCheckpointStore = DeactivatedCheckpointCompletedCheckpointStore.INSTANCE; } return ExecutionGraphBuilder.buildGraph( @@ -302,6 +313,7 @@ public abstract class SchedulerBase implements SchedulerNG { slotProvider, userCodeLoader, checkpointRecoveryFactory, + completedCheckpointStore, checkpointIdCounter, rpcTimeout, currentJobManagerJobMetricGroup, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilderTest.java new file mode 100644 index 0000000..8ed09ba --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilderTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** Tests for the {@link ExecutionGraphBuilder}. */ +public class ExecutionGraphBuilderTest extends TestLogger { + + @Test + public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception { + + final int maxNumberOfCheckpointsToRetain = 10; + final Configuration jobManagerConfig = new Configuration(); + jobManagerConfig.setInteger( + CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, maxNumberOfCheckpointsToRetain); + + final CompletedCheckpointStore completedCheckpointStore = + ExecutionGraphBuilder.createCompletedCheckpointStore( + jobManagerConfig, + getClass().getClassLoader(), + new StandaloneCheckpointRecoveryFactory(), + log, + new JobID()); + + assertEquals( + maxNumberOfCheckpointsToRetain, + completedCheckpointStore.getMaxNumberOfRetainedCheckpoints()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 3ff9e5b..9c6644d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -522,23 +522,6 @@ public class ExecutionGraphDeploymentTest extends TestLogger { .getMaxNumberOfRetainedCheckpoints()); } - @Test - public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception { - - final int maxNumberOfCheckpointsToRetain = 10; - final Configuration jobManagerConfig = new Configuration(); - jobManagerConfig.setInteger( - CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, maxNumberOfCheckpointsToRetain); - - final ExecutionGraph eg = createExecutionGraph(jobManagerConfig); - - assertEquals( - maxNumberOfCheckpointsToRetain, - eg.getCheckpointCoordinator() - .getCheckpointStore() - .getMaxNumberOfRetainedCheckpoints()); - } - private SchedulerBase setupScheduler(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception { v1.setParallelism(dop1); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java index a7298d4..57d48a2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java @@ -28,8 +28,10 @@ import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; @@ -74,6 +76,8 @@ public class TestingExecutionGraphBuilder { private MetricGroup metricGroup = new UnregisteredMetricsGroup(); private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory(); + private CompletedCheckpointStore completedCheckpointStore = + new StandaloneCompletedCheckpointStore(1); private CheckpointIDCounter checkpointIdCounter = new StandaloneCheckpointIDCounter(); private ExecutionDeploymentListener executionDeploymentListener = NoOpExecutionDeploymentListener.get(); @@ -148,6 +152,12 @@ public class TestingExecutionGraphBuilder { return this; } + public TestingExecutionGraphBuilder setCompletedCheckpointStore( + CompletedCheckpointStore completedCheckpointStore) { + this.completedCheckpointStore = completedCheckpointStore; + return this; + } + public TestingExecutionGraphBuilder setCheckpointIdCounter( CheckpointIDCounter checkpointIdCounter) { this.checkpointIdCounter = checkpointIdCounter; @@ -175,6 +185,7 @@ public class TestingExecutionGraphBuilder { slotProvider, userClassLoader, checkpointRecoveryFactory, + completedCheckpointStore, checkpointIdCounter, rpcTimeout, metricGroup,
