[FLINK-8746] [flip6] Allow rescaling of partially running jobs This commit enables the rescaling of Flink jobs which are currently not fully deployed. In such a case, Flink will use the last internal rescaling savepoint. If there is no such savepoint, then it will use the provided savepoint when the job was submitted. In case that there is no savepoint at all, then it will restart the job with vanilla state.
This closes #5560. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/662ed3df Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/662ed3df Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/662ed3df Branch: refs/heads/master Commit: 662ed3df5270befae92af3f59ed28e0cfea4e55d Parents: 16ec3d7 Author: Till Rohrmann <[email protected]> Authored: Thu Feb 22 12:36:53 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sat Feb 24 15:04:38 2018 +0100 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 5 +- .../checkpoint/CheckpointTriggerException.java | 42 +++++ .../flink/runtime/jobmaster/JobMaster.java | 158 ++++++++++++++----- .../runtime/jobmanager/JobManagerITCase.scala | 10 +- 4 files changed, 170 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/662ed3df/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 016defb..59916fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -354,11 +354,10 @@ public class CheckpointCoordinator { * @throws IllegalStateException If no savepoint directory has been * specified and no default savepoint directory has been * configured - * @throws Exception Failures during triggering are forwarded */ public CompletableFuture<CompletedCheckpoint> triggerSavepoint( long timestamp, - @Nullable String targetLocation) throws Exception { + @Nullable String targetLocation) { CheckpointProperties props = CheckpointProperties.forSavepoint(); @@ -371,7 +370,7 @@ public class CheckpointCoordinator { if (triggerResult.isSuccess()) { return triggerResult.getPendingCheckpoint().getCompletionFuture(); } else { - Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message()); + Throwable cause = new CheckpointTriggerException("Failed to trigger savepoint.", triggerResult.getFailureReason()); return FutureUtils.completedExceptionally(cause); } } http://git-wip-us.apache.org/repos/asf/flink/blob/662ed3df/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java new file mode 100644 index 0000000..cb0402a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +/** + * Exceptions which indicate that a checkpoint triggering has failed. + * + */ +public class CheckpointTriggerException extends FlinkException { + + private static final long serialVersionUID = -3330160816161901752L; + + private final CheckpointDeclineReason checkpointDeclineReason; + + public CheckpointTriggerException(String message, CheckpointDeclineReason checkpointDeclineReason) { + super(message + " Decline reason: " + checkpointDeclineReason.message()); + this.checkpointDeclineReason = Preconditions.checkNotNull(checkpointDeclineReason); + } + + public CheckpointDeclineReason getCheckpointDeclineReason() { + return checkpointDeclineReason; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/662ed3df/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 425f241..4d982fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -30,7 +30,9 @@ import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.StoppingException; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointTriggerException; import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; @@ -99,6 +101,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.clock.SystemClock; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; @@ -110,6 +113,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -207,6 +211,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast /** The execution graph of this job. */ private ExecutionGraph executionGraph; + @Nullable + private String lastInternalSavepoint; + // ------------------------------------------------------------------------ public JobMaster( @@ -312,15 +319,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast false)) { // check whether we can restore from a savepoint - final SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings(); - - if (savepointRestoreSettings.restoreSavepoint()) { - checkpointCoordinator.restoreSavepoint( - savepointRestoreSettings.getRestorePath(), - savepointRestoreSettings.allowNonRestoredState(), - executionGraph.getAllVertices(), - userCodeLoader); - } + tryRestoreExecutionGraphFromSavepoint(executionGraph, jobGraph.getSavepointRestoreSettings()); } } @@ -335,6 +334,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast this.metricQueryServicePath = metricQueryServicePath; this.backPressureStatsTracker = checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker()); + this.lastInternalSavepoint = null; } //---------------------------------------------------------------------------------------------- @@ -406,7 +406,17 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast // shut down will internally release all registered slots slotPool.shutDown(); - return slotPool.getTerminationFuture(); + final CompletableFuture<Void> disposeInternalSavepointFuture; + + if (lastInternalSavepoint != null) { + disposeInternalSavepointFuture = CompletableFuture.runAsync(() -> disposeSavepoint(lastInternalSavepoint)); + } else { + disposeInternalSavepointFuture = CompletableFuture.completedFuture(null); + } + + final CompletableFuture<Void> slotPoolTerminationFuture = slotPool.getTerminationFuture(); + + return FutureUtils.completeAll(Arrays.asList(disposeInternalSavepointFuture, slotPoolTerminationFuture)); } //---------------------------------------------------------------------------------------------- @@ -513,41 +523,95 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast // 4. take a savepoint final CompletableFuture<String> savepointFuture = triggerSavepoint( jobMasterConfiguration.getTmpDirectory(), - timeout); + timeout) + .handleAsync( + (String savepointPath, Throwable throwable) -> { + if (throwable != null) { + final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); + if (strippedThrowable instanceof CheckpointTriggerException) { + final CheckpointTriggerException checkpointTriggerException = (CheckpointTriggerException) strippedThrowable; + + if (checkpointTriggerException.getCheckpointDeclineReason() == CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) { + return lastInternalSavepoint; + } else { + throw new CompletionException(checkpointTriggerException); + } + } else { + throw new CompletionException(strippedThrowable); + } + } else { + final String savepointToDispose = lastInternalSavepoint; + lastInternalSavepoint = savepointPath; + + if (savepointToDispose != null) { + // dispose the old savepoint asynchronously + CompletableFuture.runAsync( + () -> disposeSavepoint(savepointToDispose), + scheduledExecutorService); + } + + return lastInternalSavepoint; + } + }, + getMainThreadExecutor()); final CompletableFuture<ExecutionGraph> executionGraphFuture = savepointFuture .thenApplyAsync( - (String savepointPath) -> { - try { - newExecutionGraph.getCheckpointCoordinator().restoreSavepoint( - savepointPath, - false, - newExecutionGraph.getAllVertices(), - userCodeLoader); - } catch (Exception e) { - disposeSavepoint(savepointPath); - - throw new CompletionException(new JobModificationException("Could not restore from temporary rescaling savepoint.", e)); - } + (@Nullable String savepointPath) -> { + if (savepointPath != null) { + try { + tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, SavepointRestoreSettings.forPath(savepointPath, false)); + } catch (Exception e) { + final String message = String.format("Could not restore from temporary rescaling savepoint. This might indicate " + + "that the savepoint %s got corrupted. Deleting this savepoint as a precaution.", + savepointPath); + + log.info(message); + + CompletableFuture + .runAsync( + () -> { + if (savepointPath.equals(lastInternalSavepoint)) { + lastInternalSavepoint = null; + } + }, + getMainThreadExecutor()) + .thenRunAsync( + () -> disposeSavepoint(savepointPath), + scheduledExecutorService); + + throw new CompletionException(new JobModificationException(message, e)); + } + } else { + try { + tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings()); + } catch (Exception e) { + final String message = String.format("Could not restore from initial savepoint. This might indicate " + + "that the savepoint %s got corrupted.", jobGraph.getSavepointRestoreSettings().getRestorePath()); - // delete the savepoint file once we reach a terminal state - newExecutionGraph.getTerminationFuture() - .whenCompleteAsync( - (JobStatus jobStatus, Throwable throwable) -> disposeSavepoint(savepointPath), - scheduledExecutorService); + log.info(message); + + throw new CompletionException(new JobModificationException(message, e)); + } + } return newExecutionGraph; }, scheduledExecutorService) - .exceptionally( - (Throwable failure) -> { - // in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint - // coordinator and abort the rescaling operation - if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) { - checkpointCoordinator.startCheckpointScheduler(); - } + .handleAsync( + (ExecutionGraph executionGraph, Throwable failure) -> { + if (failure != null) { + // in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint + // coordinator and abort the rescaling operation + if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) { + checkpointCoordinator.startCheckpointScheduler(); + } - throw new CompletionException(failure); - }); + throw new CompletionException(ExceptionUtils.stripCompletionException(failure)); + } else { + return executionGraph; + } + }, + getMainThreadExecutor()); // 5. suspend the current job final CompletableFuture<JobStatus> terminationFuture = executionGraphFuture.thenComposeAsync( @@ -1134,6 +1198,26 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast } } + /** + * Tries to restore the given {@link ExecutionGraph} from the provided {@link SavepointRestoreSettings}. + * + * @param executionGraphToRestore {@link ExecutionGraph} which is supposed to be restored + * @param savepointRestoreSettings {@link SavepointRestoreSettings} containing information about the savepoint to restore from + * @throws Exception if the {@link ExecutionGraph} could not be restored + */ + private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception { + if (savepointRestoreSettings.restoreSavepoint()) { + final CheckpointCoordinator checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator(); + if (checkpointCoordinator != null) { + checkpointCoordinator.restoreSavepoint( + savepointRestoreSettings.getRestorePath(), + savepointRestoreSettings.allowNonRestoredState(), + executionGraphToRestore.getAllVertices(), + userCodeLoader); + } + } + } + //---------------------------------------------------------------------------------------------- private void handleFatalError(final Throwable cause) { http://git-wip-us.apache.org/repos/asf/flink/blob/662ed3df/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index b33a78e..3f3950b 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -25,7 +25,7 @@ import akka.testkit.{ImplicitSender, TestKit} import akka.util.Timeout import org.apache.flink.api.common.JobID import org.apache.flink.runtime.akka.ListeningBehaviour -import org.apache.flink.runtime.checkpoint.{CheckpointRetentionPolicy, CheckpointCoordinator, CompletedCheckpoint} +import org.apache.flink.runtime.checkpoint._ import org.apache.flink.runtime.client.JobExecutionException import org.apache.flink.runtime.io.network.partition.ResultPartitionType import org.apache.flink.runtime.jobgraph.tasks.{CheckpointCoordinatorConfiguration, JobCheckpointingSettings} @@ -857,7 +857,7 @@ class JobManagerITCase(_system: ActorSystem) // Mock the checkpoint coordinator val checkpointCoordinator = mock(classOf[CheckpointCoordinator]) - doThrow(new Exception("Expected Test Exception")) + doThrow(new IllegalStateException("Expected Test Exception")) .when(checkpointCoordinator) .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString()) @@ -872,7 +872,7 @@ class JobManagerITCase(_system: ActorSystem) // Verify the response response.jobId should equal(jobGraph.getJobID()) - response.cause.getCause.getClass should equal(classOf[Exception]) + response.cause.getCause.getClass should equal(classOf[IllegalStateException]) response.cause.getCause.getMessage should equal("Expected Test Exception") } } @@ -913,7 +913,7 @@ class JobManagerITCase(_system: ActorSystem) // Mock the checkpoint coordinator val checkpointCoordinator = mock(classOf[CheckpointCoordinator]) - doThrow(new Exception("Expected Test Exception")) + doThrow(new IllegalStateException("Expected Test Exception")) .when(checkpointCoordinator) .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString()) val savepointPathPromise = new CompletableFuture[CompletedCheckpoint]() @@ -982,7 +982,7 @@ class JobManagerITCase(_system: ActorSystem) // Mock the checkpoint coordinator val checkpointCoordinator = mock(classOf[CheckpointCoordinator]) - doThrow(new Exception("Expected Test Exception")) + doThrow(new IllegalStateException("Expected Test Exception")) .when(checkpointCoordinator) .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
