[FLINK-8546] [flip6] Respect savepoints and restore from latest checkpoints
Let the JobMaster respect checkpoints and savepoints. The JobMaster will always try to restore the latest checkpoint if there is one available. Next it will check whether savepoint restore settings have been set. If so, then it will try to restore the savepoint. Only if these settings are not set, the job will be started from scratch. This closes #5444. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3a4e8964 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3a4e8964 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3a4e8964 Branch: refs/heads/master Commit: 3a4e89643d7d7642dde9b5644491f261d4d545bd Parents: 9e85bb0 Author: Till Rohrmann <[email protected]> Authored: Tue Feb 13 15:19:18 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sun Feb 18 10:12:55 2018 +0100 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 4 +- .../flink/runtime/jobmaster/JobMaster.java | 23 ++ .../TestingCheckpointRecoveryFactory.java | 46 ++++ .../jobmanager/JobManagerHARecoveryTest.java | 24 +- .../flink/runtime/jobmaster/JobMasterTest.java | 217 ++++++++++++++++--- 5 files changed, 260 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3a4e8964/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index ed3570a..016defb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -966,7 +966,7 @@ public class CheckpointCoordinator { * Restores the latest checkpointed state. * * @param tasks Map of job vertices to restore. State for these vertices is - * restored via {@link Execution#setInitialState(TaskStateSnapshot)}. + * restored via {@link Execution#setInitialState(JobManagerTaskRestore)}. * @param errorIfNoCheckpoint Fail if no completed checkpoint is available to * restore from. * @param allowNonRestoredState Allow checkpoint state that cannot be mapped @@ -1065,7 +1065,7 @@ public class CheckpointCoordinator { * mapped to any job vertex in tasks. * @param tasks Map of job vertices to restore. State for these * vertices is restored via - * {@link Execution#setInitialState(TaskStateSnapshot)}. + * {@link Execution#setInitialState(JobManagerTaskRestore)}. * @param userClassLoader The class loader to resolve serialized classes in * legacy savepoint versions. */ http://git-wip-us.apache.org/repos/asf/flink/blob/3a4e8964/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index dfa4d1c..2a4b881 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -57,6 +57,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; @@ -297,6 +298,28 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast jobMasterConfiguration.getSlotRequestTimeout(), log); + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + + if (checkpointCoordinator != null) { + // check whether we find a valid checkpoint + if (!checkpointCoordinator.restoreLatestCheckpointedState( + executionGraph.getAllVertices(), + false, + false)) { + + // check whether we can restore from a savepoint + final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings(); + + if (savepointRestoreSettings.restoreSavepoint()) { + checkpointCoordinator.restoreSavepoint( + savepointRestoreSettings.getRestorePath(), + savepointRestoreSettings.allowNonRestoredState(), + executionGraph.getAllVertices(), + userCodeLoader); + } + } + } + // register self as job status change listener executionGraph.registerJobStatusListener(new JobManagerJobStatusListener()); http://git-wip-us.apache.org/repos/asf/flink/blob/3a4e8964/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java new file mode 100644 index 0000000..7bc0c85 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; + +/** + * Simple {@link CheckpointRecoveryFactory} which is initialized with a + * {@link CompletedCheckpointStore} and a {@link CheckpointIDCounter}. + */ +public class TestingCheckpointRecoveryFactory implements CheckpointRecoveryFactory { + + private final CompletedCheckpointStore store; + private final CheckpointIDCounter counter; + + public TestingCheckpointRecoveryFactory(CompletedCheckpointStore store, CheckpointIDCounter counter) { + this.store = store; + this.counter = counter; + } + + @Override + public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception { + return store; + } + + @Override + public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception { + return counter; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3a4e8964/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 93cde3a..8f1e12c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; @@ -177,7 +178,7 @@ public class JobManagerHARecoveryTest extends TestLogger { submittedJobGraphStore.start(null); CompletedCheckpointStore checkpointStore = new RecoverableCompletedCheckpointStore(); CheckpointIDCounter checkpointCounter = new StandaloneCheckpointIDCounter(); - CheckpointRecoveryFactory checkpointStateFactory = new MyCheckpointRecoveryFactory(checkpointStore, checkpointCounter); + CheckpointRecoveryFactory checkpointStateFactory = new TestingCheckpointRecoveryFactory(checkpointStore, checkpointCounter); TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService(); TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService( null, @@ -465,27 +466,6 @@ public class JobManagerHARecoveryTest extends TestLogger { } } - static class MyCheckpointRecoveryFactory implements CheckpointRecoveryFactory { - - private final CompletedCheckpointStore store; - private final CheckpointIDCounter counter; - - public MyCheckpointRecoveryFactory(CompletedCheckpointStore store, CheckpointIDCounter counter) { - this.store = store; - this.counter = counter; - } - - @Override - public CompletedCheckpointStore createCheckpointStore(JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception { - return store; - } - - @Override - public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception { - return counter; - } - } - public static class BlockingInvokable extends AbstractInvokable { private static final OneShotLatch LATCH = new OneShotLatch(); http://git-wip-us.apache.org/repos/asf/flink/blob/3a4e8964/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index e142d9c..e401020 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -25,13 +25,25 @@ import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.Checkpoints; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; @@ -40,6 +52,8 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -57,7 +71,12 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -162,21 +181,7 @@ public class JobMasterTest extends TestLogger { final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration); - final JobMaster jobMaster = new JobMaster( - rpcService, - jobMasterConfiguration, - jmResourceId, - jobGraph, - haServices, - jobManagerSharedServices, - fastHeartbeatServices, - blobServer, - null, - new NoOpOnCompletionActions(), - testingFatalErrorHandler, - JobMasterTest.class.getClassLoader(), - null, - null); + final JobMaster jobMaster = createJobMaster(jobMasterConfiguration, jobGraph, haServices, jobManagerSharedServices); CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); @@ -237,21 +242,7 @@ public class JobMasterTest extends TestLogger { final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration); - final JobMaster jobMaster = new JobMaster( - rpcService, - jobMasterConfiguration, - jmResourceId, - jobGraph, - haServices, - jobManagerSharedServices, - fastHeartbeatServices, - blobServer, - null, - new NoOpOnCompletionActions(), - testingFatalErrorHandler, - JobMasterTest.class.getClassLoader(), - null, - null); + final JobMaster jobMaster = createJobMaster(jobMasterConfiguration, jobGraph, haServices, jobManagerSharedServices); CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); @@ -282,6 +273,152 @@ public class JobMasterTest extends TestLogger { } /** + * Tests that a JobMaster will restore the given JobGraph from its savepoint upon + * initial submission. + */ + @Test + public void testRestoringFromSavepoint() throws Exception { + + // create savepoint data + final long savepointId = 42L; + final File savepointFile = createSavepoint(savepointId); + + // set savepoint settings + final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath( + savepointFile.getAbsolutePath(), + true); + final JobGraph jobGraph = createJobGraphWithCheckpointing(savepointRestoreSettings); + + final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1); + final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(completedCheckpointStore, new StandaloneCheckpointIDCounter()); + haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory); + final JobMaster jobMaster = createJobMaster( + JobMasterConfiguration.fromConfiguration(configuration), + jobGraph, + haServices, + new TestingJobManagerSharedServicesBuilder().build()); + + try { + // starting the JobMaster should have read the savepoint + final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(); + + assertThat(savepointCheckpoint, Matchers.notNullValue()); + + assertThat(savepointCheckpoint.getCheckpointID(), Matchers.is(savepointId)); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + + /** + * Tests that an existing checkpoint will have precedence over an savepoint + */ + @Test + public void testCheckpointPrecedesSavepointRecovery() throws Exception { + + // create savepoint data + final long savepointId = 42L; + final File savepointFile = createSavepoint(savepointId); + + // set savepoint settings + final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath("" + + savepointFile.getAbsolutePath(), + true); + final JobGraph jobGraph = createJobGraphWithCheckpointing(savepointRestoreSettings); + + final long checkpointId = 1L; + + final CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint( + jobGraph.getJobID(), + checkpointId, + 1L, + 1L, + Collections.emptyMap(), + null, + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + new DummyCheckpointStorageLocation()); + + final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1); + completedCheckpointStore.addCheckpoint(completedCheckpoint); + final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(completedCheckpointStore, new StandaloneCheckpointIDCounter()); + haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory); + final JobMaster jobMaster = createJobMaster( + JobMasterConfiguration.fromConfiguration(configuration), + jobGraph, + haServices, + new TestingJobManagerSharedServicesBuilder().build()); + + try { + // starting the JobMaster should have read the savepoint + final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(); + + assertThat(savepointCheckpoint, Matchers.notNullValue()); + + assertThat(savepointCheckpoint.getCheckpointID(), Matchers.is(checkpointId)); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + + private File createSavepoint(long savepointId) throws IOException { + final File savepointFile = temporaryFolder.newFile(); + final SavepointV2 savepoint = new SavepointV2(savepointId, Collections.emptyList(), Collections.emptyList()); + + try (FileOutputStream fileOutputStream = new FileOutputStream(savepointFile)) { + Checkpoints.storeCheckpointMetadata(savepoint, fileOutputStream); + } + + return savepointFile; + } + + @Nonnull + private JobGraph createJobGraphWithCheckpointing(SavepointRestoreSettings savepointRestoreSettings) { + final JobGraph jobGraph = new JobGraph(); + + // enable checkpointing which is required to resume from a savepoint + final CheckpointCoordinatorConfiguration checkpoinCoordinatorConfiguration = new CheckpointCoordinatorConfiguration( + 1000L, + 1000L, + 1000L, + 1, + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true); + final JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + checkpoinCoordinatorConfiguration, + null); + jobGraph.setSnapshotSettings(checkpointingSettings); + jobGraph.setSavepointRestoreSettings(savepointRestoreSettings); + + return jobGraph; + } + + @Nonnull + private JobMaster createJobMaster( + JobMasterConfiguration jobMasterConfiguration, + JobGraph jobGraph, + HighAvailabilityServices highAvailabilityServices, + JobManagerSharedServices jobManagerSharedServices) throws Exception { + return new JobMaster( + rpcService, + jobMasterConfiguration, + jmResourceId, + jobGraph, + highAvailabilityServices, + jobManagerSharedServices, + fastHeartbeatServices, + blobServer, + null, + new NoOpOnCompletionActions(), + testingFatalErrorHandler, + JobMasterTest.class.getClassLoader(), + null, + null); + } + + /** * No op implementation of {@link OnCompletionActions}. */ private static final class NoOpOnCompletionActions implements OnCompletionActions { @@ -297,4 +434,24 @@ public class JobMasterTest extends TestLogger { } } + private static final class DummyCheckpointStorageLocation implements CompletedCheckpointStorageLocation { + + private static final long serialVersionUID = 164095949572620688L; + + @Override + public String getExternalPointer() { + return null; + } + + @Override + public StreamStateHandle getMetadataHandle() { + return null; + } + + @Override + public void disposeStorageLocation() throws IOException { + + } + } + }
