[FLINK-8629] [flip6] Allow JobMaster to rescale jobs

This commit adds the functionality to rescale a job or parts of it to
the JobMaster. In order to rescale a job, the JobMaster does the following:
1. Take a savepoint
2. Create a rescaled ExecutionGraph from the JobGraph
3. Initialize it with the taken savepoint
4. Suspend the old ExecutionGraph
5. Restart the new ExecutionGraph once the old ExecutionGraph has been suspended

This closes #5446.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f83e2f77
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f83e2f77
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f83e2f77

Branch: refs/heads/master
Commit: f83e2f770a2ba7da9c9333ef536bbd612d744de2
Parents: 7e96a24
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Feb 13 16:14:41 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Feb 22 17:32:37 2018 +0100

----------------------------------------------------------------------
 .../util/function/BiConsumerWithException.java  |  50 +++++
 .../flink/runtime/checkpoint/Checkpoints.java   |   2 +-
 .../flink/runtime/jobmaster/JobMaster.java      | 219 +++++++++++++++++--
 .../jobmaster/JobMasterConfiguration.java       |  12 +
 .../runtime/jobmaster/JobMasterGateway.java     |  28 +++
 .../runtime/jobmaster/RescalingBehaviour.java   |  49 +++++
 .../exceptions/JobMasterException.java          |  41 ++++
 .../exceptions/JobModificationException.java    |  39 ++++
 .../utils/TestingJobMasterGateway.java          |  11 +
 9 files changed, 434 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
 
b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
new file mode 100644
index 0000000..6ab1161
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/util/function/BiConsumerWithException.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import java.util.function.BiConsumer;
+
+/**
+ * A checked extension of the {@link BiConsumer} interface.
+ *
+ * @param <T> type of the first argument
+ * @param <U> type of the second argument
+ * @param <E> type of the thrown exception
+ */
+@FunctionalInterface
+public interface BiConsumerWithException<T, U, E extends Throwable> extends 
BiConsumer<T, U> {
+
+       /**
+        * Performs this operation on the given arguments.
+        *
+        * @param t the first input argument
+        * @param u the second input argument
+        * @throws E in case of an error
+        */
+       void acceptWithException(T t, U u) throws E;
+
+       @Override
+       default void accept(T t, U u) {
+               try {
+                       acceptWithException(t, u);
+               } catch (Throwable e) {
+                       throw new RuntimeException(e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index 47efa6f..72b7c53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -246,7 +246,7 @@ public class Checkpoints {
                try (InputStream in = metadataHandle.openInputStream();
                        DataInputStream dis = new DataInputStream(in)) {
 
-                               savepoint = loadCheckpointMetadata(dis, 
classLoader);
+                       savepoint = loadCheckpointMetadata(dis, classLoader);
                }
 
                Exception exception = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 2a4b881..22c69f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -26,13 +26,15 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -56,10 +58,12 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.exceptions.JobModificationException;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
@@ -106,6 +110,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -115,6 +120,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -174,9 +180,6 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
        private final ClassLoader userCodeLoader;
 
-       /** The execution graph of this job. */
-       private final ExecutionGraph executionGraph;
-
        private final SlotPool slotPool;
 
        private final SlotPoolGateway slotPoolGateway;
@@ -201,6 +204,11 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
        private final Map<ResourceID, Tuple2<TaskManagerLocation, 
TaskExecutorGateway>> registeredTaskManagers;
 
+       // -------- Mutable fields ---------
+
+       /** The execution graph of this job. */
+       private ExecutionGraph executionGraph;
+
        // 
------------------------------------------------------------------------
 
        public JobMaster(
@@ -268,8 +276,6 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                log.info("Using restart strategy {} for {} ({}).", 
restartStrategy, jobName, jid);
 
-               CheckpointRecoveryFactory checkpointRecoveryFactory = 
highAvailabilityServices.getCheckpointRecoveryFactory();
-
                resourceManagerLeaderRetriever = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
 
                this.slotPool = new SlotPool(
@@ -289,7 +295,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        scheduledExecutorService,
                        slotPool.getSlotProvider(),
                        userCodeLoader,
-                       checkpointRecoveryFactory,
+                       highAvailabilityServices.getCheckpointRecoveryFactory(),
                        rpcTimeout,
                        restartStrategy,
                        jobMetricGroup,
@@ -447,6 +453,165 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                return CompletableFuture.completedFuture(Acknowledge.get());
        }
 
+       @Override
+       public CompletableFuture<Acknowledge> rescaleJob(
+                       int newParallelism,
+                       RescalingBehaviour rescalingBehaviour,
+                       Time timeout) {
+               final ArrayList<JobVertexID> allOperators = new 
ArrayList<>(jobGraph.getNumberOfVertices());
+
+               for (JobVertex jobVertex : jobGraph.getVertices()) {
+                       allOperators.add(jobVertex.getID());
+               }
+
+               return rescaleOperators(allOperators, newParallelism, 
rescalingBehaviour, timeout);
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> rescaleOperators(
+                       Collection<JobVertexID> operators,
+                       int newParallelism,
+                       RescalingBehaviour rescalingBehaviour,
+                       Time timeout) {
+               // 1. Check whether we can rescale the job & rescale the 
respective vertices
+               for (JobVertexID jobVertexId : operators) {
+                       final JobVertex jobVertex = 
jobGraph.findVertexByID(jobVertexId);
+
+                       // update max parallelism in case that it has not been 
configure
+                       final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
+
+                       if (executionJobVertex != null) {
+                               
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
+                       }
+
+                       try {
+                               
rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
+                       } catch (FlinkException e) {
+                               final String msg = String.format("Cannot 
rescale job %s.", jobGraph.getName());
+
+                               log.info(msg, e);
+
+                               return FutureUtils.completedExceptionally(
+                                       new JobModificationException(msg, e));
+                       }
+               }
+
+               final ExecutionGraph currentExecutionGraph = executionGraph;
+
+               final ExecutionGraph newExecutionGraph;
+
+               try {
+                       newExecutionGraph = ExecutionGraphBuilder.buildGraph(
+                               null,
+                               jobGraph,
+                               jobMasterConfiguration.getConfiguration(),
+                               scheduledExecutorService,
+                               scheduledExecutorService,
+                               slotPool.getSlotProvider(),
+                               userCodeLoader,
+                               
highAvailabilityServices.getCheckpointRecoveryFactory(),
+                               rpcTimeout,
+                               currentExecutionGraph.getRestartStrategy(),
+                               jobMetricGroup,
+                               1,
+                               blobServer,
+                               jobMasterConfiguration.getSlotRequestTimeout(),
+                               log);
+               } catch (JobExecutionException | JobException e) {
+                       return FutureUtils.completedExceptionally(
+                               new JobModificationException("Could not create 
rescaled ExecutionGraph.", e));
+               }
+
+               // 3. disable checkpoint coordinator to suppress subsequent 
checkpoints
+               final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+               checkpointCoordinator.stopCheckpointScheduler();
+
+               // 4. take a savepoint
+               final CompletableFuture<String> savepointFuture = 
triggerSavepoint(
+                       jobMasterConfiguration.getTmpDirectory(),
+                       timeout);
+
+               final CompletableFuture<ExecutionGraph> executionGraphFuture = 
savepointFuture
+                       .thenApplyAsync(
+                               (String savepointPath) -> {
+                                       try {
+                                               
newExecutionGraph.getCheckpointCoordinator().restoreSavepoint(
+                                                       savepointPath,
+                                                       false,
+                                                       
newExecutionGraph.getAllVertices(),
+                                                       userCodeLoader);
+                                       } catch (Exception e) {
+                                               disposeSavepoint(savepointPath);
+
+                                               throw new 
CompletionException(new JobModificationException("Could not restore from 
temporary rescaling savepoint.", e));
+                                       }
+
+                                       // delete the savepoint file once we 
reach a terminal state
+                                       newExecutionGraph.getTerminationFuture()
+                                               .whenCompleteAsync(
+                                                       (JobStatus jobStatus, 
Throwable throwable) -> disposeSavepoint(savepointPath),
+                                                       
scheduledExecutorService);
+
+                                       return newExecutionGraph;
+                               }, scheduledExecutorService)
+                       .exceptionally(
+                               (Throwable failure) -> {
+                                       // in case that we couldn't take a 
savepoint or restore from it, let's restart the checkpoint
+                                       // coordinator and abort the rescaling 
operation
+                                       if 
(checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
+                                               
checkpointCoordinator.startCheckpointScheduler();
+                                       }
+
+                                       throw new CompletionException(failure);
+                               });
+
+               // 5. suspend the current job
+               final CompletableFuture<JobStatus> terminationFuture = 
executionGraphFuture.thenComposeAsync(
+                       (ExecutionGraph ignored) -> {
+                               currentExecutionGraph.suspend(new 
FlinkException("Job is being rescaled."));
+                               return 
currentExecutionGraph.getTerminationFuture();
+                       },
+                       getMainThreadExecutor());
+
+               final CompletableFuture<Void> suspendedFuture = 
terminationFuture.thenAccept(
+                       (JobStatus jobStatus) -> {
+                               if (jobStatus != JobStatus.SUSPENDED) {
+                                       final String msg = String.format("Job 
%s rescaling failed because we could not suspend the execution graph.", 
jobGraph.getName());
+                                       log.info(msg);
+                                       throw new CompletionException(new 
JobModificationException(msg));
+                               }
+                       });
+
+               // 6. resume the new execution graph from the taken savepoint
+               final CompletableFuture<Acknowledge> rescalingFuture = 
suspendedFuture.thenCombineAsync(
+                       executionGraphFuture,
+                       (Void ignored, ExecutionGraph restoredExecutionGraph) 
-> {
+                               // check if the ExecutionGraph is still the same
+                               //noinspection ObjectEquality
+                               if (executionGraph == currentExecutionGraph) {
+                                       executionGraph = restoredExecutionGraph;
+
+                                       scheduleExecutionGraph();
+
+                                       return Acknowledge.get();
+                               } else {
+                                       throw new CompletionException(new 
JobModificationException("Detected concurrent modification of ExecutionGraph. 
Aborting the resacling."));
+                               }
+
+                       },
+                       getMainThreadExecutor());
+
+               rescalingFuture.whenComplete(
+                       (Acknowledge ignored, Throwable throwable) -> {
+                               if (throwable != null) {
+                                       // fail the newly created execution 
graph
+                                       newExecutionGraph.failGlobal(new 
FlinkException("Failed to rescale the job " + jobGraph.getJobID() + '.', 
throwable));
+                               }
+                       });
+
+               return rescalingFuture;
+       }
+
        /**
         * Updates the task execution state for a given task.
         *
@@ -912,15 +1077,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                }
 
                // start scheduling job in another thread
-               scheduledExecutorService.execute(
-                       () -> {
-                               try {
-                                       executionGraph.scheduleForExecution();
-                               }
-                               catch (Throwable t) {
-                                       executionGraph.failGlobal(t);
-                               }
-                       });
+               scheduledExecutorService.execute(this::scheduleExecutionGraph);
 
                return Acknowledge.get();
        }
@@ -963,6 +1120,36 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                return Acknowledge.get();
        }
 
+       /**
+        * Schedules the execution of the current {@link ExecutionGraph}.
+        */
+       private void scheduleExecutionGraph() {
+               try {
+                       executionGraph.scheduleForExecution();
+               }
+               catch (Throwable t) {
+                       executionGraph.failGlobal(t);
+               }
+       }
+
+       /**
+        * Dispose the savepoint stored under the given path.
+        *
+        * @param savepointPath path where the savepoint is stored
+        */
+       private void disposeSavepoint(String savepointPath) {
+               try {
+                       // delete the temporary savepoint
+                       Checkpoints.disposeSavepoint(
+                               savepointPath,
+                               jobMasterConfiguration.getConfiguration(),
+                               userCodeLoader,
+                               log);
+               } catch (FlinkException | IOException de) {
+                       log.info("Could not dispose temporary rescaling 
savepoint under {}.", savepointPath, de);
+               }
+       }
+
        
//----------------------------------------------------------------------------------------------
 
        private void handleFatalError(final Throwable cause) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
index 15a30e2..5a4e3b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -36,16 +37,20 @@ public class JobMasterConfiguration {
 
        private final Time slotIdleTimeout;
 
+       private final String tmpDirectory;
+
        private final Configuration configuration;
 
        public JobMasterConfiguration(
                        Time rpcTimeout,
                        Time slotRequestTimeout,
                        Time slotIdleTimeout,
+                       String tmpDirectory,
                        Configuration configuration) {
                this.rpcTimeout = Preconditions.checkNotNull(rpcTimeout);
                this.slotRequestTimeout = 
Preconditions.checkNotNull(slotRequestTimeout);
                this.slotIdleTimeout = 
Preconditions.checkNotNull(slotIdleTimeout);
+               this.tmpDirectory = Preconditions.checkNotNull(tmpDirectory);
                this.configuration = Preconditions.checkNotNull(configuration);
        }
 
@@ -61,6 +66,10 @@ public class JobMasterConfiguration {
                return slotIdleTimeout;
        }
 
+       public String getTmpDirectory() {
+               return tmpDirectory;
+       }
+
        public Configuration getConfiguration() {
                return configuration;
        }
@@ -78,10 +87,13 @@ public class JobMasterConfiguration {
                final Time slotRequestTimeout = 
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
                final Time slotIdleTimeout = 
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
 
+               final String tmpDirectory = 
ConfigurationUtils.parseTempDirectories(configuration)[0];
+
                return new JobMasterConfiguration(
                        rpcTimeout,
                        slotRequestTimeout,
                        slotIdleTimeout,
+                       tmpDirectory,
                        configuration);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 0dcf3fb..fb53237 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -73,6 +73,34 @@ public interface JobMasterGateway extends
        CompletableFuture<Acknowledge> stop(@RpcTimeout Time timeout);
 
        /**
+        * Triggers rescaling of the executed job.
+        *
+        * @param newParallelism new parallelism of the job
+        * @param rescalingBehaviour defining how strict the rescaling has to 
be executed
+        * @param timeout of this operation
+        * @return Future which is completed with {@link Acknowledge} once the 
rescaling was successful
+        */
+       CompletableFuture<Acknowledge> rescaleJob(
+               int newParallelism,
+               RescalingBehaviour rescalingBehaviour,
+               @RpcTimeout Time timeout);
+
+       /**
+        * Triggers rescaling of the given set of operators.
+        *
+        * @param operators set of operators which shall be rescaled
+        * @param newParallelism new parallelism of the given set of operators
+        * @param rescalingBehaviour defining how strict the rescaling has to 
be executed
+        * @param timeout of this operation
+        * @return Future which is completed with {@link Acknowledge} once the 
rescaling was successful
+        */
+       CompletableFuture<Acknowledge> rescaleOperators(
+               Collection<JobVertexID> operators,
+               int newParallelism,
+               RescalingBehaviour rescalingBehaviour,
+               @RpcTimeout Time timeout);
+
+       /**
         * Updates the task execution state for a given task.
         *
         * @param taskExecutionState New task execution state for a given task

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
new file mode 100644
index 0000000..7de9560
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+/**
+ * Definition of the rescaling behaviour.
+ */
+public enum RescalingBehaviour implements BiConsumerWithException<JobVertex, 
Integer, FlinkException> {
+       // rescaling is only executed if the operator can be set to the given 
parallelism
+       STRICT {
+               @Override
+               public void acceptWithException(JobVertex jobVertex, Integer 
newParallelism) throws FlinkException {
+                       if (jobVertex.getMaxParallelism() < newParallelism) {
+                               throw new FlinkException("Cannot rescale vertex 
" + jobVertex.getName() +
+                                       " because its maximum parallelism " + 
jobVertex.getMaxParallelism() +
+                                       " is smaller than the new parallelism " 
+ newParallelism + '.');
+                       } else {
+                               jobVertex.setParallelism(newParallelism);
+                       }
+               }
+       },
+       // the new parallelism will be the minimum of the given parallelism and 
the maximum parallelism
+       RELAXED {
+               @Override
+               public void acceptWithException(JobVertex jobVertex, Integer 
newParallelism) {
+                       
jobVertex.setParallelism(Math.min(jobVertex.getMaxParallelism(), 
newParallelism));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java
new file mode 100644
index 0000000..a7b62e1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobMasterException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.exceptions;
+
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Base class for all {@link JobMaster} related exceptions.
+ */
+public class JobMasterException extends FlinkException {
+       private static final long serialVersionUID = 2941885469739200908L;
+
+       public JobMasterException(String message) {
+               super(message);
+       }
+
+       public JobMasterException(Throwable cause) {
+               super(cause);
+       }
+
+       public JobMasterException(String message, Throwable cause) {
+               super(message, cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java
new file mode 100644
index 0000000..e08ec62
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/exceptions/JobModificationException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.exceptions;
+
+/**
+ * Base class for all exception which originate from a failed job modification.
+ */
+public class JobModificationException extends JobMasterException {
+
+       private static final long serialVersionUID = 2374146694058970746L;
+
+       public JobModificationException(String message) {
+               super(message);
+       }
+
+       public JobModificationException(Throwable cause) {
+               super(cause);
+       }
+
+       public JobModificationException(String message, Throwable cause) {
+               super(message, cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f83e2f77/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 168b32b..cac7e90 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
 import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -67,6 +68,16 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
        }
 
        @Override
+       public CompletableFuture<Acknowledge> rescaleJob(int newParallelism, 
RescalingBehaviour rescalingBehaviour, Time timeout) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> 
rescaleOperators(Collection<JobVertexID> operators, int newParallelism, 
RescalingBehaviour rescalingBehaviour, Time timeout) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
        public CompletableFuture<Acknowledge> 
updateTaskExecutionState(TaskExecutionState taskExecutionState) {
                throw new UnsupportedOperationException();
        }

Reply via email to