[FLINK-8746] [flip6] Allow rescaling of partially running jobs

This commit enables the rescaling of Flink jobs which are currently not fully
deployed. In such a case, Flink will use the last internal rescaling savepoint.
If there is no such savepoint, then it will use the provided savepoint when the
job was submitted. In case that there is no savepoint at all, then it will 
restart
the job with vanilla state.

This closes #5560.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/662ed3df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/662ed3df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/662ed3df

Branch: refs/heads/master
Commit: 662ed3df5270befae92af3f59ed28e0cfea4e55d
Parents: 16ec3d7
Author: Till Rohrmann <[email protected]>
Authored: Thu Feb 22 12:36:53 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sat Feb 24 15:04:38 2018 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |   5 +-
 .../checkpoint/CheckpointTriggerException.java  |  42 +++++
 .../flink/runtime/jobmaster/JobMaster.java      | 158 ++++++++++++++-----
 .../runtime/jobmanager/JobManagerITCase.scala   |  10 +-
 4 files changed, 170 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/662ed3df/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 016defb..59916fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -354,11 +354,10 @@ public class CheckpointCoordinator {
         * @throws IllegalStateException If no savepoint directory has been
         *                               specified and no default savepoint 
directory has been
         *                               configured
-        * @throws Exception             Failures during triggering are 
forwarded
         */
        public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
                        long timestamp,
-                       @Nullable String targetLocation) throws Exception {
+                       @Nullable String targetLocation) {
 
                CheckpointProperties props = 
CheckpointProperties.forSavepoint();
 
@@ -371,7 +370,7 @@ public class CheckpointCoordinator {
                if (triggerResult.isSuccess()) {
                        return 
triggerResult.getPendingCheckpoint().getCompletionFuture();
                } else {
-                       Throwable cause = new Exception("Failed to trigger 
savepoint: " + triggerResult.getFailureReason().message());
+                       Throwable cause = new 
CheckpointTriggerException("Failed to trigger savepoint.", 
triggerResult.getFailureReason());
                        return FutureUtils.completedExceptionally(cause);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/662ed3df/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
new file mode 100644
index 0000000..cb0402a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Exceptions which indicate that a checkpoint triggering has failed.
+ *
+ */
+public class CheckpointTriggerException extends FlinkException {
+
+       private static final long serialVersionUID = -3330160816161901752L;
+
+       private final CheckpointDeclineReason checkpointDeclineReason;
+
+       public CheckpointTriggerException(String message, 
CheckpointDeclineReason checkpointDeclineReason) {
+               super(message + " Decline reason: " + 
checkpointDeclineReason.message());
+               this.checkpointDeclineReason = 
Preconditions.checkNotNull(checkpointDeclineReason);
+       }
+
+       public CheckpointDeclineReason getCheckpointDeclineReason() {
+               return checkpointDeclineReason;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/662ed3df/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 425f241..4d982fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -30,7 +30,9 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
@@ -99,6 +101,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.clock.SystemClock;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -110,6 +113,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -207,6 +211,9 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
        /** The execution graph of this job. */
        private ExecutionGraph executionGraph;
 
+       @Nullable
+       private String lastInternalSavepoint;
+
        // 
------------------------------------------------------------------------
 
        public JobMaster(
@@ -312,15 +319,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                                false)) {
 
                                // check whether we can restore from a savepoint
-                               final SavepointRestoreSettings 
savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
-
-                               if 
(savepointRestoreSettings.restoreSavepoint()) {
-                                       checkpointCoordinator.restoreSavepoint(
-                                               
savepointRestoreSettings.getRestorePath(),
-                                               
savepointRestoreSettings.allowNonRestoredState(),
-                                               executionGraph.getAllVertices(),
-                                               userCodeLoader);
-                               }
+                               
tryRestoreExecutionGraphFromSavepoint(executionGraph, 
jobGraph.getSavepointRestoreSettings());
                        }
                }
 
@@ -335,6 +334,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                this.metricQueryServicePath = metricQueryServicePath;
                this.backPressureStatsTracker = 
checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker());
+               this.lastInternalSavepoint = null;
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -406,7 +406,17 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                // shut down will internally release all registered slots
                slotPool.shutDown();
 
-               return slotPool.getTerminationFuture();
+               final CompletableFuture<Void> disposeInternalSavepointFuture;
+
+               if (lastInternalSavepoint != null) {
+                       disposeInternalSavepointFuture = 
CompletableFuture.runAsync(() -> disposeSavepoint(lastInternalSavepoint));
+               } else {
+                       disposeInternalSavepointFuture = 
CompletableFuture.completedFuture(null);
+               }
+
+               final CompletableFuture<Void> slotPoolTerminationFuture = 
slotPool.getTerminationFuture();
+
+               return 
FutureUtils.completeAll(Arrays.asList(disposeInternalSavepointFuture, 
slotPoolTerminationFuture));
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -513,41 +523,95 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                // 4. take a savepoint
                final CompletableFuture<String> savepointFuture = 
triggerSavepoint(
                        jobMasterConfiguration.getTmpDirectory(),
-                       timeout);
+                       timeout)
+                       .handleAsync(
+                               (String savepointPath, Throwable throwable) -> {
+                                       if (throwable != null) {
+                                               final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+                                               if (strippedThrowable 
instanceof CheckpointTriggerException) {
+                                                       final 
CheckpointTriggerException checkpointTriggerException = 
(CheckpointTriggerException) strippedThrowable;
+
+                                                       if 
(checkpointTriggerException.getCheckpointDeclineReason() == 
CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
+                                                               return 
lastInternalSavepoint;
+                                                       } else {
+                                                               throw new 
CompletionException(checkpointTriggerException);
+                                                       }
+                                               } else {
+                                                       throw new 
CompletionException(strippedThrowable);
+                                               }
+                                       } else {
+                                               final String savepointToDispose 
= lastInternalSavepoint;
+                                               lastInternalSavepoint = 
savepointPath;
+
+                                               if (savepointToDispose != null) 
{
+                                                       // dispose the old 
savepoint asynchronously
+                                                       
CompletableFuture.runAsync(
+                                                               () -> 
disposeSavepoint(savepointToDispose),
+                                                               
scheduledExecutorService);
+                                               }
+
+                                               return lastInternalSavepoint;
+                                       }
+                               },
+                               getMainThreadExecutor());
 
                final CompletableFuture<ExecutionGraph> executionGraphFuture = 
savepointFuture
                        .thenApplyAsync(
-                               (String savepointPath) -> {
-                                       try {
-                                               
newExecutionGraph.getCheckpointCoordinator().restoreSavepoint(
-                                                       savepointPath,
-                                                       false,
-                                                       
newExecutionGraph.getAllVertices(),
-                                                       userCodeLoader);
-                                       } catch (Exception e) {
-                                               disposeSavepoint(savepointPath);
-
-                                               throw new 
CompletionException(new JobModificationException("Could not restore from 
temporary rescaling savepoint.", e));
-                                       }
+                               (@Nullable String savepointPath) -> {
+                                       if (savepointPath != null) {
+                                               try {
+                                                       
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, 
SavepointRestoreSettings.forPath(savepointPath, false));
+                                               } catch (Exception e) {
+                                                       final String message = 
String.format("Could not restore from temporary rescaling savepoint. This might 
indicate " +
+                                                                       "that 
the savepoint %s got corrupted. Deleting this savepoint as a precaution.",
+                                                               savepointPath);
+
+                                                       log.info(message);
+
+                                                       CompletableFuture
+                                                               .runAsync(
+                                                                       () -> {
+                                                                               
if (savepointPath.equals(lastInternalSavepoint)) {
+                                                                               
        lastInternalSavepoint = null;
+                                                                               
}
+                                                                       },
+                                                                       
getMainThreadExecutor())
+                                                               .thenRunAsync(
+                                                                       () -> 
disposeSavepoint(savepointPath),
+                                                                       
scheduledExecutorService);
+
+                                                       throw new 
CompletionException(new JobModificationException(message, e));
+                                               }
+                                       } else {
+                                               try {
+                                                       
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, 
jobGraph.getSavepointRestoreSettings());
+                                               } catch (Exception e) {
+                                                       final String message = 
String.format("Could not restore from initial savepoint. This might indicate " +
+                                                               "that the 
savepoint %s got corrupted.", 
jobGraph.getSavepointRestoreSettings().getRestorePath());
 
-                                       // delete the savepoint file once we 
reach a terminal state
-                                       newExecutionGraph.getTerminationFuture()
-                                               .whenCompleteAsync(
-                                                       (JobStatus jobStatus, 
Throwable throwable) -> disposeSavepoint(savepointPath),
-                                                       
scheduledExecutorService);
+                                                       log.info(message);
+
+                                                       throw new 
CompletionException(new JobModificationException(message, e));
+                                               }
+                                       }
 
                                        return newExecutionGraph;
                                }, scheduledExecutorService)
-                       .exceptionally(
-                               (Throwable failure) -> {
-                                       // in case that we couldn't take a 
savepoint or restore from it, let's restart the checkpoint
-                                       // coordinator and abort the rescaling 
operation
-                                       if 
(checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
-                                               
checkpointCoordinator.startCheckpointScheduler();
-                                       }
+                       .handleAsync(
+                               (ExecutionGraph executionGraph, Throwable 
failure) -> {
+                                       if (failure != null) {
+                                               // in case that we couldn't 
take a savepoint or restore from it, let's restart the checkpoint
+                                               // coordinator and abort the 
rescaling operation
+                                               if 
(checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
+                                                       
checkpointCoordinator.startCheckpointScheduler();
+                                               }
 
-                                       throw new CompletionException(failure);
-                               });
+                                               throw new 
CompletionException(ExceptionUtils.stripCompletionException(failure));
+                                       } else {
+                                               return executionGraph;
+                                       }
+                               },
+                               getMainThreadExecutor());
 
                // 5. suspend the current job
                final CompletableFuture<JobStatus> terminationFuture = 
executionGraphFuture.thenComposeAsync(
@@ -1134,6 +1198,26 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                }
        }
 
+       /**
+        * Tries to restore the given {@link ExecutionGraph} from the provided 
{@link SavepointRestoreSettings}.
+        *
+        * @param executionGraphToRestore {@link ExecutionGraph} which is 
supposed to be restored
+        * @param savepointRestoreSettings {@link SavepointRestoreSettings} 
containing information about the savepoint to restore from
+        * @throws Exception if the {@link ExecutionGraph} could not be restored
+        */
+       private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph 
executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) 
throws Exception {
+               if (savepointRestoreSettings.restoreSavepoint()) {
+                       final CheckpointCoordinator checkpointCoordinator = 
executionGraphToRestore.getCheckpointCoordinator();
+                       if (checkpointCoordinator != null) {
+                               checkpointCoordinator.restoreSavepoint(
+                                       
savepointRestoreSettings.getRestorePath(),
+                                       
savepointRestoreSettings.allowNonRestoredState(),
+                                       
executionGraphToRestore.getAllVertices(),
+                                       userCodeLoader);
+                       }
+               }
+       }
+
        
//----------------------------------------------------------------------------------------------
 
        private void handleFatalError(final Throwable cause) {

http://git-wip-us.apache.org/repos/asf/flink/blob/662ed3df/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index b33a78e..3f3950b 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -25,7 +25,7 @@ import akka.testkit.{ImplicitSender, TestKit}
 import akka.util.Timeout
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.akka.ListeningBehaviour
-import org.apache.flink.runtime.checkpoint.{CheckpointRetentionPolicy, 
CheckpointCoordinator, CompletedCheckpoint}
+import org.apache.flink.runtime.checkpoint._
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType
 import 
org.apache.flink.runtime.jobgraph.tasks.{CheckpointCoordinatorConfiguration, 
JobCheckpointingSettings}
@@ -857,7 +857,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           // Mock the checkpoint coordinator
           val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
-          doThrow(new Exception("Expected Test Exception"))
+          doThrow(new IllegalStateException("Expected Test Exception"))
             .when(checkpointCoordinator)
             .triggerSavepoint(org.mockito.Matchers.anyLong(), 
org.mockito.Matchers.anyString())
 
@@ -872,7 +872,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           // Verify the response
           response.jobId should equal(jobGraph.getJobID())
-          response.cause.getCause.getClass should equal(classOf[Exception])
+          response.cause.getCause.getClass should 
equal(classOf[IllegalStateException])
           response.cause.getCause.getMessage should equal("Expected Test 
Exception")
         }
       }
@@ -913,7 +913,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           // Mock the checkpoint coordinator
           val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
-          doThrow(new Exception("Expected Test Exception"))
+          doThrow(new IllegalStateException("Expected Test Exception"))
             .when(checkpointCoordinator)
             .triggerSavepoint(org.mockito.Matchers.anyLong(), 
org.mockito.Matchers.anyString())
           val savepointPathPromise = new 
CompletableFuture[CompletedCheckpoint]()
@@ -982,7 +982,7 @@ class JobManagerITCase(_system: ActorSystem)
 
           // Mock the checkpoint coordinator
           val checkpointCoordinator = mock(classOf[CheckpointCoordinator])
-          doThrow(new Exception("Expected Test Exception"))
+          doThrow(new IllegalStateException("Expected Test Exception"))
             .when(checkpointCoordinator)
             .triggerSavepoint(org.mockito.Matchers.anyLong(), 
org.mockito.Matchers.anyString())
 

Reply via email to