This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 566d10d [FLINK-12312][runtime] Remove CLI command for rescaling
566d10d is described below
commit 566d10d958d71fd113f8f7ecc08fa9b63a072919
Author: Gary Yao <[email protected]>
AuthorDate: Wed Apr 24 21:50:22 2019 +0200
[FLINK-12312][runtime] Remove CLI command for rescaling
This closes #8260.
---
docs/ops/cli.md | 23 --
docs/ops/cli.zh.md | 23 --
.../org/apache/flink/client/cli/CliFrontend.java | 59 -----
.../apache/flink/client/cli/CliFrontendParser.java | 27 --
.../apache/flink/client/program/ClusterClient.java | 11 -
.../client/program/rest/RestClusterClient.java | 41 ---
.../flink/client/cli/CliFrontendModifyTest.java | 136 ----------
.../flink/runtime/dispatcher/Dispatcher.java | 10 -
.../apache/flink/runtime/jobmaster/JobMaster.java | 289 +--------------------
.../flink/runtime/jobmaster/JobMasterGateway.java | 36 +--
.../runtime/jobmaster/RescalingBehaviour.java | 49 ----
.../handler/job/rescaling/RescalingHandlers.java | 56 ++--
.../flink/runtime/webmonitor/RestfulGateway.java | 18 --
.../jobmaster/utils/TestingJobMasterGateway.java | 21 --
.../utils/TestingJobMasterGatewayBuilder.java | 15 +-
15 files changed, 30 insertions(+), 784 deletions(-)
diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index 42698e8..b414b30 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -37,7 +37,6 @@ The command line can be used to
- provide information about a job,
- list running and waiting jobs,
- trigger and dispose savepoints, and
-- modify a running job
A prerequisite to using the command line interface is that the Flink
master (JobManager) has been started (via
@@ -126,10 +125,6 @@ available.
./bin/flink stop -s [targetDirectory] -d <jobID>
-- Modify a running job (streaming jobs only):
-
- ./bin/flink modify <jobID> -p <newParallelism>
-
**NOTE**: The difference between cancelling and stopping a (streaming) job is
the following:
@@ -429,24 +424,6 @@ Action "savepoint" triggers savepoints for a running job
or disposes existing on
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths
for high availability mode
-
-
-
-Action "modify" modifies a running job (e.g. change of parallelism).
-
- Syntax: modify <Job ID> [OPTIONS]
- "modify" action options:
- -h,--help Show the help message for the CLI
- Frontend or the action.
- -p,--parallelism <newParallelism> New parallelism for the specified job.
- -v,--verbose This option is deprecated.
- Options for default mode:
- -m,--jobmanager <arg> Address of the JobManager (master) to
which
- to connect. Use this flag to connect to a
- different JobManager than the one
specified
- in the configuration.
- -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths
- for high availability mode
{% endhighlight %}
{% top %}
diff --git a/docs/ops/cli.zh.md b/docs/ops/cli.zh.md
index bf995b4..7c02047 100644
--- a/docs/ops/cli.zh.md
+++ b/docs/ops/cli.zh.md
@@ -37,7 +37,6 @@ The command line can be used to
- provide information about a job,
- list running and waiting jobs,
- trigger and dispose savepoints, and
-- modify a running job
A prerequisite to using the command line interface is that the Flink
master (JobManager) has been started (via
@@ -126,10 +125,6 @@ available.
./bin/flink stop -s [targetDirectory] -d <jobID>
-- Modify a running job (streaming jobs only):
-
- ./bin/flink modify <jobID> -p <newParallelism>
-
**NOTE**: The difference between cancelling and stopping a (streaming) job is
the following:
@@ -429,24 +424,6 @@ Action "savepoint" triggers savepoints for a running job
or disposes existing on
in the configuration.
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths
for high availability mode
-
-
-
-Action "modify" modifies a running job (e.g. change of parallelism).
-
- Syntax: modify <Job ID> [OPTIONS]
- "modify" action options:
- -h,--help Show the help message for the CLI
- Frontend or the action.
- -p,--parallelism <newParallelism> New parallelism for the specified job.
- -v,--verbose This option is deprecated.
- Options for default mode:
- -m,--jobmanager <arg> Address of the JobManager (master) to
which
- to connect. Use this flag to connect to a
- different JobManager than the one
specified
- in the configuration.
- -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths
- for high availability mode
{% endhighlight %}
{% top %}
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index cdd7616..c6b5c9a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -87,9 +87,6 @@ import java.util.stream.Collectors;
import scala.concurrent.duration.FiniteDuration;
-import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION;
-import static
org.apache.flink.client.cli.CliFrontendParser.MODIFY_PARALLELISM_OPTION;
-
/**
* Implementation of a simple command line frontend for executing programs.
*/
@@ -104,7 +101,6 @@ public class CliFrontend {
private static final String ACTION_CANCEL = "cancel";
private static final String ACTION_STOP = "stop";
private static final String ACTION_SAVEPOINT = "savepoint";
- private static final String ACTION_MODIFY = "modify";
// configuration dir parameters
private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
@@ -736,58 +732,6 @@ public class CliFrontend {
logAndSysout("Savepoint '" + savepointPath + "' disposed.");
}
- protected void modify(String[] args) throws CliArgsException,
FlinkException {
- LOG.info("Running 'modify' command.");
-
- final Options commandOptions =
CliFrontendParser.getModifyOptions();
-
- final Options commandLineOptions =
CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);
-
- final CommandLine commandLine =
CliFrontendParser.parse(commandLineOptions, args, false);
-
- if (commandLine.hasOption(HELP_OPTION.getOpt())) {
-
CliFrontendParser.printHelpForModify(customCommandLines);
- }
-
- final JobID jobId;
- final String[] modifyArgs = commandLine.getArgs();
-
- if (modifyArgs.length > 0) {
- jobId = parseJobId(modifyArgs[0]);
- } else {
- throw new CliArgsException("Missing JobId");
- }
-
- final int newParallelism;
- if (commandLine.hasOption(MODIFY_PARALLELISM_OPTION.getOpt())) {
- try {
- newParallelism =
Integer.parseInt(commandLine.getOptionValue(MODIFY_PARALLELISM_OPTION.getOpt()));
- } catch (NumberFormatException e) {
- throw new CliArgsException("Could not parse the
parallelism which is supposed to be an integer.", e);
- }
- } else {
- throw new CliArgsException("Missing new parallelism.");
- }
-
- final CustomCommandLine<?> activeCommandLine =
getActiveCustomCommandLine(commandLine);
-
- logAndSysout("Modify job " + jobId + '.');
- runClusterAction(
- activeCommandLine,
- commandLine,
- clusterClient -> {
- CompletableFuture<Acknowledge> rescaleFuture =
clusterClient.rescaleJob(jobId, newParallelism);
-
- try {
- rescaleFuture.get();
- } catch (Exception e) {
- throw new FlinkException("Could not
rescale job " + jobId + '.', ExceptionUtils.stripExecutionException(e));
- }
- logAndSysout("Rescaled job " + jobId + ". Its
new parallelism is " + newParallelism + '.');
- }
- );
- }
-
//
--------------------------------------------------------------------------------------------
// Interaction with programs and JobManager
//
--------------------------------------------------------------------------------------------
@@ -1053,9 +997,6 @@ public class CliFrontend {
case ACTION_SAVEPOINT:
savepoint(params);
return 0;
- case ACTION_MODIFY:
- modify(params);
- return 0;
case "-h":
case "--help":
CliFrontendParser.printHelp(customCommandLines);
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 398efd2..cea3998 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -118,8 +118,6 @@ public class CliFrontendParser {
public static final Option STOP_AND_DRAIN = new Option("d", "drain",
false,
"Send MAX_WATERMARK before taking the savepoint and
stopping the pipelne.");
- static final Option MODIFY_PARALLELISM_OPTION = new Option("p",
"parallelism", true, "New parallelism for the specified job.");
-
static {
HELP_OPTION.setRequired(false);
@@ -162,9 +160,6 @@ public class CliFrontendParser {
CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory");
CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true);
- MODIFY_PARALLELISM_OPTION.setRequired(false);
- MODIFY_PARALLELISM_OPTION.setArgName("newParallelism");
-
STOP_WITH_SAVEPOINT.setRequired(false);
STOP_WITH_SAVEPOINT.setArgName("withSavepoint");
STOP_WITH_SAVEPOINT.setOptionalArg(true);
@@ -240,12 +235,6 @@ public class CliFrontendParser {
return options.addOption(JAR_OPTION);
}
- static Options getModifyOptions() {
- final Options options = buildGeneralOptions(new Options());
- options.addOption(MODIFY_PARALLELISM_OPTION);
- return options;
- }
-
//
--------------------------------------------------------------------------------------------
// Help
//
--------------------------------------------------------------------------------------------
@@ -297,7 +286,6 @@ public class CliFrontendParser {
printHelpForStop(customCommandLines);
printHelpForCancel(customCommandLines);
printHelpForSavepoint(customCommandLines);
- printHelpForModify(customCommandLines);
System.out.println();
}
@@ -390,21 +378,6 @@ public class CliFrontendParser {
System.out.println();
}
- public static void printHelpForModify(Collection<CustomCommandLine<?>>
customCommandLines) {
- HelpFormatter formatter = new HelpFormatter();
- formatter.setLeftPadding(5);
- formatter.setWidth(80);
-
- System.out.println("\nAction \"modify\" modifies a running job
(e.g. change of parallelism).");
- System.out.println("\n Syntax: modify <Job ID> [OPTIONS]");
- formatter.setSyntaxPrefix(" \"modify\" action options:");
- formatter.printHelp(" ", getModifyOptions());
-
- printCustomCliOptions(customCommandLines, formatter, false);
-
- System.out.println();
- }
-
/**
* Prints custom cli options.
* @param formatter The formatter to use for printing
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 0be7dff..0da7997 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -499,17 +499,6 @@ public abstract class ClusterClient<T> {
public abstract JobSubmissionResult submitJob(JobGraph jobGraph,
ClassLoader classLoader)
throws ProgramInvocationException;
- /**
- * Rescales the specified job such that it will have the new
parallelism.
- *
- * @param jobId specifying the job to modify
- * @param newParallelism specifying the new parallelism of the rescaled
job
- * @return Future which is completed once the rescaling has been
completed
- */
- public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int
newParallelism) {
- throw new UnsupportedOperationException("The " +
getClass().getSimpleName() + " does not support rescaling.");
- }
-
public void shutDownCluster() {
throw new UnsupportedOperationException("The " +
getClass().getSimpleName() + " does not support shutDownCluster.");
}
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 269462a..1d08992 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -46,10 +46,6 @@ import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
-import
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeaders;
-import
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusMessageParameters;
-import
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders;
-import
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -532,43 +528,6 @@ public class RestClusterClient<T> extends ClusterClient<T>
implements NewCluster
}
@Override
- public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int
newParallelism) {
-
- final RescalingTriggerHeaders rescalingTriggerHeaders =
RescalingTriggerHeaders.getInstance();
- final RescalingTriggerMessageParameters
rescalingTriggerMessageParameters =
rescalingTriggerHeaders.getUnresolvedMessageParameters();
-
rescalingTriggerMessageParameters.jobPathParameter.resolve(jobId);
-
rescalingTriggerMessageParameters.rescalingParallelismQueryParameter.resolve(Collections.singletonList(newParallelism));
-
- final CompletableFuture<TriggerResponse>
rescalingTriggerResponseFuture = sendRequest(
- rescalingTriggerHeaders,
- rescalingTriggerMessageParameters);
-
- final CompletableFuture<AsynchronousOperationInfo>
rescalingOperationFuture = rescalingTriggerResponseFuture.thenCompose(
- (TriggerResponse triggerResponse) -> {
- final TriggerId triggerId =
triggerResponse.getTriggerId();
- final RescalingStatusHeaders
rescalingStatusHeaders = RescalingStatusHeaders.getInstance();
- final RescalingStatusMessageParameters
rescalingStatusMessageParameters =
rescalingStatusHeaders.getUnresolvedMessageParameters();
-
-
rescalingStatusMessageParameters.jobPathParameter.resolve(jobId);
-
rescalingStatusMessageParameters.triggerIdPathParameter.resolve(triggerId);
-
- return pollResourceAsync(
- () -> sendRequest(
- rescalingStatusHeaders,
-
rescalingStatusMessageParameters));
- });
-
- return rescalingOperationFuture.thenApply(
- (AsynchronousOperationInfo asynchronousOperationInfo)
-> {
- if (asynchronousOperationInfo.getFailureCause()
== null) {
- return Acknowledge.get();
- } else {
- throw new
CompletionException(asynchronousOperationInfo.getFailureCause());
- }
- });
- }
-
- @Override
public CompletableFuture<Acknowledge> disposeSavepoint(String
savepointPath) {
final SavepointDisposalRequest savepointDisposalRequest = new
SavepointDisposalRequest(savepointPath);
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
deleted file mode 100644
index 05dacec..0000000
---
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendModifyTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.cli;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.cli.util.MockedCliFrontend;
-import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.messages.Acknowledge;
-
-import org.hamcrest.Matchers;
-import org.junit.Test;
-
-import java.util.concurrent.CompletableFuture;
-
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the modify command.
- */
-public class CliFrontendModifyTest extends CliFrontendTestBase {
-
- @Test
- public void testModifyJob() throws Exception {
- final JobID jobId = new JobID();
- final int parallelism = 42;
- String[] args = {jobId.toString(), "-p",
String.valueOf(parallelism)};
-
- Tuple2<JobID, Integer> jobIdParallelism = callModify(args);
-
- assertThat(jobIdParallelism.f0, Matchers.is(jobId));
- assertThat(jobIdParallelism.f1, Matchers.is(parallelism));
- }
-
- @Test
- public void testMissingJobId() throws Exception {
- final int parallelism = 42;
- final String[] args = {"-p", String.valueOf(parallelism)};
-
- try {
- callModify(args);
- fail("Expected CliArgsException");
- } catch (CliArgsException expected) {
- // expected
- }
- }
-
- @Test
- public void testMissingParallelism() throws Exception {
- final JobID jobId = new JobID();
- final String[] args = {jobId.toString()};
-
- try {
- callModify(args);
- fail("Expected CliArgsException");
- } catch (CliArgsException expected) {
- // expected
- }
- }
-
- @Test
- public void testUnparsableParalllelism() throws Exception {
- final JobID jobId = new JobID();
- final String[] args = {jobId.toString(), "-p", "foobar"};
-
- try {
- callModify(args);
- fail("Expected CliArgsException");
- } catch (CliArgsException expected) {
- // expected
- }
- }
-
- @Test
- public void testUnparsableJobId() throws Exception {
- final int parallelism = 42;
- final String[] args = {"foobar", "-p",
String.valueOf(parallelism)};
-
- try {
- callModify(args);
- fail("Expected CliArgsException");
- } catch (CliArgsException expected) {
- // expected
- }
- }
-
- private Tuple2<JobID, Integer> callModify(String[] args) throws
Exception {
- final CompletableFuture<Tuple2<JobID, Integer>>
rescaleJobFuture = new CompletableFuture<>();
- final TestingClusterClient clusterClient = new
TestingClusterClient(rescaleJobFuture, getConfiguration());
- final MockedCliFrontend cliFrontend = new
MockedCliFrontend(clusterClient);
-
- cliFrontend.modify(args);
-
- assertThat(rescaleJobFuture.isDone(), Matchers.is(true));
-
- return rescaleJobFuture.get();
- }
-
- private static final class TestingClusterClient extends
RestClusterClient<StandaloneClusterId> {
-
- private final CompletableFuture<Tuple2<JobID, Integer>>
rescaleJobFuture;
-
- TestingClusterClient(CompletableFuture<Tuple2<JobID, Integer>>
rescaleJobFuture, Configuration configuration) throws Exception {
- super(configuration, StandaloneClusterId.getInstance());
-
- this.rescaleJobFuture = rescaleJobFuture;
- }
-
- @Override
- public CompletableFuture<Acknowledge> rescaleJob(JobID jobId,
int newParallelism) {
- rescaleJobFuture.complete(Tuple2.of(jobId,
newParallelism));
-
- return
CompletableFuture.completedFuture(Acknowledge.get());
- }
- }
-
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 3c4028c..1f1fc62 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -43,7 +43,6 @@ import
org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
import
org.apache.flink.runtime.jobmaster.factories.DefaultJobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -423,15 +422,6 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId> impleme
}
@Override
- public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int
newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) {
- final CompletableFuture<JobMasterGateway>
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
-
- return jobMasterGatewayFuture.thenCompose(
- (JobMasterGateway jobMasterGateway) ->
- jobMasterGateway.rescaleJob(newParallelism,
rescalingBehaviour, timeout));
- }
-
- @Override
public CompletableFuture<ClusterOverview> requestClusterOverview(Time
timeout) {
CompletableFuture<ResourceOverview> taskManagerOverviewFuture =
runResourceManagerCommand(resourceManagerGateway ->
resourceManagerGateway.requestResourceOverview(timeout));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f912f45..6c83c7a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -31,10 +31,7 @@ import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointException;
-import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.client.JobExecutionException;
@@ -42,7 +39,6 @@ import
org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -62,12 +58,10 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
-import org.apache.flink.runtime.jobmaster.exceptions.JobModificationException;
import
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulerFactory;
@@ -115,9 +109,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -208,9 +200,6 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId> implements JobMast
private JobManagerJobMetricGroup jobManagerJobMetricGroup;
@Nullable
- private String lastInternalSavepoint;
-
- @Nullable
private ResourceManagerAddress resourceManagerAddress;
@Nullable
@@ -291,7 +280,6 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId> implements JobMast
this.registeredTaskManagers = new HashMap<>(4);
this.backPressureStatsTracker =
checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker());
- this.lastInternalSavepoint = null;
this.jobManagerJobMetricGroup =
jobMetricGroupFactory.create(jobGraph);
this.executionGraph =
createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
@@ -365,15 +353,7 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId> implements JobMast
// shut down will internally release all registered slots
slotPool.close();
- final CompletableFuture<Void> disposeInternalSavepointFuture;
-
- if (lastInternalSavepoint != null) {
- disposeInternalSavepointFuture =
CompletableFuture.runAsync(() -> disposeSavepoint(lastInternalSavepoint));
- } else {
- disposeInternalSavepointFuture =
CompletableFuture.completedFuture(null);
- }
-
- return
FutureUtils.completeAll(Collections.singletonList(disposeInternalSavepointFuture));
+ return CompletableFuture.completedFuture(null);
}
//----------------------------------------------------------------------------------------------
@@ -387,130 +367,6 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId> implements JobMast
return CompletableFuture.completedFuture(Acknowledge.get());
}
- @Override
- public CompletableFuture<Acknowledge> rescaleJob(
- int newParallelism,
- RescalingBehaviour rescalingBehaviour,
- Time timeout) {
- final ArrayList<JobVertexID> allOperators = new
ArrayList<>(jobGraph.getNumberOfVertices());
-
- for (JobVertex jobVertex : jobGraph.getVertices()) {
- allOperators.add(jobVertex.getID());
- }
-
- return rescaleOperators(allOperators, newParallelism,
rescalingBehaviour, timeout);
- }
-
- @Override
- public CompletableFuture<Acknowledge> rescaleOperators(
- Collection<JobVertexID> operators,
- int newParallelism,
- RescalingBehaviour rescalingBehaviour,
- Time timeout) {
-
- if (newParallelism <= 0) {
- return FutureUtils.completedExceptionally(
- new JobModificationException("The target
parallelism of a rescaling operation must be larger than 0."));
- }
-
- // 1. Check whether we can rescale the job & rescale the
respective vertices
- try {
- rescaleJobGraph(operators, newParallelism,
rescalingBehaviour);
- } catch (FlinkException e) {
- final String msg = String.format("Cannot rescale job
%s.", jobGraph.getName());
-
- log.info(msg, e);
- return FutureUtils.completedExceptionally(new
JobModificationException(msg, e));
- }
-
- final ExecutionGraph currentExecutionGraph = executionGraph;
-
- final JobManagerJobMetricGroup newJobManagerJobMetricGroup =
jobMetricGroupFactory.create(jobGraph);
- final ExecutionGraph newExecutionGraph;
-
- try {
- newExecutionGraph =
createExecutionGraph(newJobManagerJobMetricGroup);
- } catch (JobExecutionException | JobException e) {
- return FutureUtils.completedExceptionally(
- new JobModificationException("Could not create
rescaled ExecutionGraph.", e));
- }
-
- // 3. disable checkpoint coordinator to suppress subsequent
checkpoints
- final CheckpointCoordinator checkpointCoordinator =
currentExecutionGraph.getCheckpointCoordinator();
- checkpointCoordinator.stopCheckpointScheduler();
-
- // 4. take a savepoint
- final CompletableFuture<String> savepointFuture =
getJobModificationSavepoint(timeout);
-
- final CompletableFuture<ExecutionGraph> executionGraphFuture =
restoreExecutionGraphFromRescalingSavepoint(
- newExecutionGraph,
- savepointFuture)
- .handleAsync(
- (ExecutionGraph executionGraph, Throwable
failure) -> {
- if (failure != null) {
- // in case that we couldn't
take a savepoint or restore from it, let's restart the checkpoint
- // coordinator and abort the
rescaling operation
- if
(checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
-
checkpointCoordinator.startCheckpointScheduler();
- }
-
- throw new
CompletionException(ExceptionUtils.stripCompletionException(failure));
- } else {
- return executionGraph;
- }
- },
- getMainThreadExecutor());
-
- // 5. suspend the current job
- final CompletableFuture<JobStatus> terminationFuture =
executionGraphFuture.thenComposeAsync(
- (ExecutionGraph ignored) -> {
- suspendExecutionGraph(new FlinkException("Job
is being rescaled."));
- return
currentExecutionGraph.getTerminationFuture();
- },
- getMainThreadExecutor());
-
- final CompletableFuture<Void> suspendedFuture =
terminationFuture.thenAccept(
- (JobStatus jobStatus) -> {
- if (jobStatus != JobStatus.SUSPENDED) {
- final String msg = String.format("Job
%s rescaling failed because we could not suspend the execution graph.",
jobGraph.getName());
- log.info(msg);
- throw new CompletionException(new
JobModificationException(msg));
- }
- });
-
- // 6. resume the new execution graph from the taken savepoint
- final CompletableFuture<Acknowledge> rescalingFuture =
suspendedFuture.thenCombineAsync(
- executionGraphFuture,
- (Void ignored, ExecutionGraph restoredExecutionGraph)
-> {
- // check if the ExecutionGraph is still the same
- if (executionGraph == currentExecutionGraph) {
- clearExecutionGraphFields();
-
assignExecutionGraph(restoredExecutionGraph, newJobManagerJobMetricGroup);
- scheduleExecutionGraph();
-
- return Acknowledge.get();
- } else {
- throw new CompletionException(new
JobModificationException("Detected concurrent modification of ExecutionGraph.
Aborting the rescaling."));
- }
-
- },
- getMainThreadExecutor());
-
- rescalingFuture.whenCompleteAsync(
- (Acknowledge ignored, Throwable throwable) -> {
- if (throwable != null) {
- // fail the newly created execution
graph
- newExecutionGraph.failGlobal(
- new SuppressRestartsException(
- new FlinkException(
-
String.format("Failed to rescale the job %s.", jobGraph.getJobID()),
- throwable)));
- }
- }, getMainThreadExecutor());
-
- return rescalingFuture;
- }
-
/**
* Updates the task execution state for a given task.
*
@@ -1261,24 +1117,6 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId> implements JobMast
}
/**
- * Dispose the savepoint stored under the given path.
- *
- * @param savepointPath path where the savepoint is stored
- */
- private void disposeSavepoint(String savepointPath) {
- try {
- // delete the temporary savepoint
- Checkpoints.disposeSavepoint(
- savepointPath,
- jobMasterConfiguration.getConfiguration(),
- userCodeLoader,
- log);
- } catch (FlinkException | IOException e) {
- log.info("Could not dispose temporary rescaling
savepoint under {}.", savepointPath, e);
- }
- }
-
- /**
* Tries to restore the given {@link ExecutionGraph} from the provided
{@link SavepointRestoreSettings}.
*
* @param executionGraphToRestore {@link ExecutionGraph} which is
supposed to be restored
@@ -1435,131 +1273,6 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId> implements JobMast
slotPool.disconnectResourceManager();
}
- /**
- * Restore the given {@link ExecutionGraph} from the rescaling
savepoint. If the {@link ExecutionGraph} could
- * be restored, then this savepoint will be recorded as the latest
successful modification savepoint. A previous
- * savepoint will be disposed. If the rescaling savepoint is empty, the
job will be restored from the initially
- * provided savepoint.
- *
- * @param newExecutionGraph to restore
- * @param savepointFuture containing the path to the internal
modification savepoint
- * @return Future which is completed with the restored {@link
ExecutionGraph}
- */
- private CompletableFuture<ExecutionGraph>
restoreExecutionGraphFromRescalingSavepoint(ExecutionGraph newExecutionGraph,
CompletableFuture<String> savepointFuture) {
- return savepointFuture
- .thenApplyAsync(
- (@Nullable String savepointPath) -> {
- if (savepointPath != null) {
- try {
-
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph,
SavepointRestoreSettings.forPath(savepointPath, false));
- } catch (Exception e) {
- final String message =
String.format("Could not restore from temporary rescaling savepoint. This might
indicate " +
- "that
the savepoint %s got corrupted. Deleting this savepoint as a precaution.",
- savepointPath);
-
- log.info(message);
-
- CompletableFuture
- .runAsync(
- () -> {
-
if (savepointPath.equals(lastInternalSavepoint)) {
-
lastInternalSavepoint = null;
-
}
- },
-
getMainThreadExecutor())
- .thenRunAsync(
- () ->
disposeSavepoint(savepointPath),
-
scheduledExecutorService);
-
- throw new
CompletionException(new JobModificationException(message, e));
- }
- } else {
- // No rescaling savepoint,
restart from the initial savepoint or none
- try {
-
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph,
jobGraph.getSavepointRestoreSettings());
- } catch (Exception e) {
- final String message =
String.format("Could not restore from initial savepoint. This might indicate " +
- "that the
savepoint %s got corrupted.",
jobGraph.getSavepointRestoreSettings().getRestorePath());
-
- log.info(message);
-
- throw new
CompletionException(new JobModificationException(message, e));
- }
- }
-
- return newExecutionGraph;
- }, scheduledExecutorService);
- }
-
- /**
- * Takes an internal savepoint for job modification purposes. If the
savepoint was not successful because
- * not all tasks were running, it returns the last successful
modification savepoint.
- *
- * @param timeout for the operation
- * @return Future which is completed with the savepoint path or the
last successful modification savepoint if the
- * former was not successful
- */
- private CompletableFuture<String> getJobModificationSavepoint(Time
timeout) {
- return triggerSavepoint(
- null,
- false,
- timeout)
- .handleAsync(
- (String savepointPath, Throwable throwable) -> {
- if (throwable != null) {
- final Throwable
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
- if (strippedThrowable
instanceof CheckpointException) {
- final
CheckpointException checkpointException = (CheckpointException)
strippedThrowable;
-
- if
(checkpointException.getCheckpointFailureReason() ==
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
- return
lastInternalSavepoint;
- } else {
- throw new
CompletionException(checkpointException);
- }
- } else {
- throw new
CompletionException(strippedThrowable);
- }
- } else {
- final String savepointToDispose
= lastInternalSavepoint;
- lastInternalSavepoint =
savepointPath;
-
- if (savepointToDispose != null)
{
- // dispose the old
savepoint asynchronously
-
CompletableFuture.runAsync(
- () ->
disposeSavepoint(savepointToDispose),
-
scheduledExecutorService);
- }
-
- return lastInternalSavepoint;
- }
- },
- getMainThreadExecutor());
- }
-
- /**
- * Rescales the given operators of the {@link JobGraph} of this {@link
JobMaster} with respect to given
- * parallelism and {@link RescalingBehaviour}.
- *
- * @param operators to rescale
- * @param newParallelism new parallelism for these operators
- * @param rescalingBehaviour of the rescaling operation
- * @throws FlinkException if the {@link JobGraph} could not be rescaled
- */
- private void rescaleJobGraph(Collection<JobVertexID> operators, int
newParallelism, RescalingBehaviour rescalingBehaviour) throws FlinkException {
- for (JobVertexID jobVertexId : operators) {
- final JobVertex jobVertex =
jobGraph.findVertexByID(jobVertexId);
-
- // update max parallelism in case that it has not been
configured
- final ExecutionJobVertex executionJobVertex =
executionGraph.getJobVertex(jobVertexId);
-
- if (executionJobVertex != null) {
-
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
- }
-
- rescalingBehaviour.accept(jobVertex, newParallelism);
- }
- }
-
//----------------------------------------------------------------------------------------------
// Service methods
//----------------------------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index b15d704..b2a282d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,9 +18,6 @@
package org.apache.flink.runtime.jobmaster;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
@@ -48,6 +45,11 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
/**
* {@link JobMaster} rpc gateway interface.
*/
@@ -66,34 +68,6 @@ public interface JobMasterGateway extends
CompletableFuture<Acknowledge> cancel(@RpcTimeout Time timeout);
/**
- * Triggers rescaling of the executed job.
- *
- * @param newParallelism new parallelism of the job
- * @param rescalingBehaviour defining how strict the rescaling has to
be executed
- * @param timeout of this operation
- * @return Future which is completed with {@link Acknowledge} once the
rescaling was successful
- */
- CompletableFuture<Acknowledge> rescaleJob(
- int newParallelism,
- RescalingBehaviour rescalingBehaviour,
- @RpcTimeout Time timeout);
-
- /**
- * Triggers rescaling of the given set of operators.
- *
- * @param operators set of operators which shall be rescaled
- * @param newParallelism new parallelism of the given set of operators
- * @param rescalingBehaviour defining how strict the rescaling has to
be executed
- * @param timeout of this operation
- * @return Future which is completed with {@link Acknowledge} once the
rescaling was successful
- */
- CompletableFuture<Acknowledge> rescaleOperators(
- Collection<JobVertexID> operators,
- int newParallelism,
- RescalingBehaviour rescalingBehaviour,
- @RpcTimeout Time timeout);
-
- /**
* Updates the task execution state for a given task.
*
* @param taskExecutionState New task execution state for a given task
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
deleted file mode 100644
index 64e2ffa..0000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RescalingBehaviour.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster;
-
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.function.BiConsumerWithException;
-
-/**
- * Definition of the rescaling behaviour.
- */
-public enum RescalingBehaviour implements BiConsumerWithException<JobVertex,
Integer, FlinkException> {
- // rescaling is only executed if the operator can be set to the given
parallelism
- STRICT {
- @Override
- public void accept(JobVertex jobVertex, Integer newParallelism)
throws FlinkException {
- if (jobVertex.getMaxParallelism() < newParallelism) {
- throw new FlinkException("Cannot rescale vertex
" + jobVertex.getName() +
- " because its maximum parallelism " +
jobVertex.getMaxParallelism() +
- " is smaller than the new parallelism "
+ newParallelism + '.');
- } else {
- jobVertex.setParallelism(newParallelism);
- }
- }
- },
- // the new parallelism will be the minimum of the given parallelism and
the maximum parallelism
- RELAXED {
- @Override
- public void accept(JobVertex jobVertex, Integer newParallelism)
{
-
jobVertex.setParallelism(Math.min(jobVertex.getMaxParallelism(),
newParallelism));
- }
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java
index 8860e5b..2b0f156 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescaling/RescalingHandlers.java
@@ -18,28 +18,23 @@
package org.apache.flink.runtime.rest.handler.job.rescaling;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import
org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
+import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
+import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
-import
org.apache.flink.runtime.rest.messages.RescalingParallelismQueryParameter;
-import org.apache.flink.runtime.rest.messages.TriggerId;
-import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
-import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.SerializedThrowable;
import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import java.util.List;
+import javax.annotation.Nonnull;
+
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -48,6 +43,10 @@ import java.util.concurrent.CompletableFuture;
*/
public class RescalingHandlers extends
AbstractAsynchronousOperationHandlers<AsynchronousJobOperationKey, Acknowledge>
{
+ private static RestHandlerException featureDisabledException() {
+ return new RestHandlerException("Rescaling is temporarily
disabled. See FLINK-12312.", HttpResponseStatus.SERVICE_UNAVAILABLE);
+ }
+
/**
* Handler which triggers the rescaling of the specified job.
*/
@@ -65,29 +64,18 @@ public class RescalingHandlers extends
AbstractAsynchronousOperationHandlers<Asy
}
@Override
- protected CompletableFuture<Acknowledge>
triggerOperation(HandlerRequest<EmptyRequestBody,
RescalingTriggerMessageParameters> request, RestfulGateway gateway) throws
RestHandlerException {
- final JobID jobId =
request.getPathParameter(JobIDPathParameter.class);
- final List<Integer> queryParameter =
request.getQueryParameter(RescalingParallelismQueryParameter.class);
-
- if (queryParameter.isEmpty()) {
- throw new RestHandlerException("No new
parallelism was specified.", HttpResponseStatus.BAD_REQUEST);
- }
-
- final int newParallelism = queryParameter.get(0);
-
- final CompletableFuture<Acknowledge> rescalingFuture =
gateway.rescaleJob(
- jobId,
- newParallelism,
- RescalingBehaviour.STRICT,
- RpcUtils.INF_TIMEOUT);
+ public CompletableFuture<TriggerResponse>
handleRequest(@Nonnull final HandlerRequest<EmptyRequestBody,
RescalingTriggerMessageParameters> request, @Nonnull final RestfulGateway
gateway) throws RestHandlerException {
+ throw featureDisabledException();
+ }
- return rescalingFuture;
+ @Override
+ protected CompletableFuture<Acknowledge>
triggerOperation(HandlerRequest<EmptyRequestBody,
RescalingTriggerMessageParameters> request, RestfulGateway gateway) {
+ throw new UnsupportedOperationException();
}
@Override
protected AsynchronousJobOperationKey
createOperationKey(HandlerRequest<EmptyRequestBody,
RescalingTriggerMessageParameters> request) {
- final JobID jobId =
request.getPathParameter(JobIDPathParameter.class);
- return AsynchronousJobOperationKey.of(new TriggerId(),
jobId);
+ throw new UnsupportedOperationException();
}
}
@@ -108,21 +96,23 @@ public class RescalingHandlers extends
AbstractAsynchronousOperationHandlers<Asy
}
@Override
- protected AsynchronousJobOperationKey
getOperationKey(HandlerRequest<EmptyRequestBody,
RescalingStatusMessageParameters> request) {
- final JobID jobId =
request.getPathParameter(JobIDPathParameter.class);
- final TriggerId triggerId =
request.getPathParameter(TriggerIdPathParameter.class);
+ public
CompletableFuture<AsynchronousOperationResult<AsynchronousOperationInfo>>
handleRequest(@Nonnull final HandlerRequest<EmptyRequestBody,
RescalingStatusMessageParameters> request, @Nonnull final RestfulGateway
gateway) throws RestHandlerException {
+ throw featureDisabledException();
+ }
- return AsynchronousJobOperationKey.of(triggerId, jobId);
+ @Override
+ protected AsynchronousJobOperationKey
getOperationKey(HandlerRequest<EmptyRequestBody,
RescalingStatusMessageParameters> request) {
+ throw new UnsupportedOperationException();
}
@Override
protected AsynchronousOperationInfo
exceptionalOperationResultResponse(Throwable throwable) {
- return
AsynchronousOperationInfo.completeExceptional(new
SerializedThrowable(throwable));
+ throw new UnsupportedOperationException();
}
@Override
protected AsynchronousOperationInfo
operationResultResponse(Acknowledge operationResult) {
- return AsynchronousOperationInfo.complete();
+ throw new UnsupportedOperationException();
}
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index dcfbf6b..33bb50a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -27,7 +27,6 @@ import
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
@@ -184,23 +183,6 @@ public interface RestfulGateway extends RpcGateway {
throw new UnsupportedOperationException();
}
- /**
- * Trigger rescaling of the given job.
- *
- * @param jobId specifying the job to rescale
- * @param newParallelism new parallelism of the job
- * @param rescalingBehaviour defining how strict the rescaling has to
be executed
- * @param timeout of this operation
- * @return Future which is completed with {@link Acknowledge} once the
rescaling was successful
- */
- default CompletableFuture<Acknowledge> rescaleJob(
- JobID jobId,
- int newParallelism,
- RescalingBehaviour rescalingBehaviour,
- @RpcTimeout Time timeout) {
- throw new UnsupportedOperationException();
- }
-
default CompletableFuture<Acknowledge> shutDownCluster() {
throw new UnsupportedOperationException();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 13ffe52..a2d3c3b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
-import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -82,12 +81,6 @@ public class TestingJobMasterGateway implements
JobMasterGateway {
private final Supplier<CompletableFuture<Acknowledge>> cancelFunction;
@Nonnull
- private final BiFunction<Integer, RescalingBehaviour,
CompletableFuture<Acknowledge>> rescalingJobFunction;
-
- @Nonnull
- private final TriFunction<Collection<JobVertexID>, Integer,
RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingOperatorsFunction;
-
- @Nonnull
private final Function<TaskExecutionState,
CompletableFuture<Acknowledge>> updateTaskExecutionStateFunction;
@Nonnull
@@ -166,8 +159,6 @@ public class TestingJobMasterGateway implements
JobMasterGateway {
@Nonnull String address,
@Nonnull String hostname,
@Nonnull Supplier<CompletableFuture<Acknowledge>>
cancelFunction,
- @Nonnull BiFunction<Integer, RescalingBehaviour,
CompletableFuture<Acknowledge>> rescalingJobFunction,
- @Nonnull TriFunction<Collection<JobVertexID>, Integer,
RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingOperatorsFunction,
@Nonnull Function<TaskExecutionState,
CompletableFuture<Acknowledge>> updateTaskExecutionStateFunction,
@Nonnull BiFunction<JobVertexID, ExecutionAttemptID,
CompletableFuture<SerializedInputSplit>> requestNextInputSplitFunction,
@Nonnull BiFunction<IntermediateDataSetID,
ResultPartitionID, CompletableFuture<ExecutionState>>
requestPartitionStateFunction,
@@ -196,8 +187,6 @@ public class TestingJobMasterGateway implements
JobMasterGateway {
this.address = address;
this.hostname = hostname;
this.cancelFunction = cancelFunction;
- this.rescalingJobFunction = rescalingJobFunction;
- this.rescalingOperatorsFunction = rescalingOperatorsFunction;
this.updateTaskExecutionStateFunction =
updateTaskExecutionStateFunction;
this.requestNextInputSplitFunction =
requestNextInputSplitFunction;
this.requestPartitionStateFunction =
requestPartitionStateFunction;
@@ -231,16 +220,6 @@ public class TestingJobMasterGateway implements
JobMasterGateway {
}
@Override
- public CompletableFuture<Acknowledge> rescaleJob(int newParallelism,
RescalingBehaviour rescalingBehaviour, Time timeout) {
- return rescalingJobFunction.apply(newParallelism,
rescalingBehaviour);
- }
-
- @Override
- public CompletableFuture<Acknowledge>
rescaleOperators(Collection<JobVertexID> operators, int newParallelism,
RescalingBehaviour rescalingBehaviour, Time timeout) {
- return rescalingOperatorsFunction.apply(operators,
newParallelism, rescalingBehaviour);
- }
-
- @Override
public CompletableFuture<Acknowledge>
updateTaskExecutionState(TaskExecutionState taskExecutionState) {
return
updateTaskExecutionStateFunction.apply(taskExecutionState);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index c13cbf6..a023917 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -36,7 +36,6 @@ import
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterId;
-import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -76,8 +75,6 @@ public class TestingJobMasterGatewayBuilder {
private String address =
"akka.tcp://flink@localhost:6130/user/jobmanager";
private String hostname = "localhost";
private Supplier<CompletableFuture<Acknowledge>> cancelFunction = () ->
CompletableFuture.completedFuture(Acknowledge.get());
- private BiFunction<Integer, RescalingBehaviour,
CompletableFuture<Acknowledge>> rescalingJobFunction = (ignoredA, ignoredB) ->
CompletableFuture.completedFuture(Acknowledge.get());
- private TriFunction<Collection<JobVertexID>, Integer,
RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingOperatorsFunction
= (ignoredA, ignoredB, ignoredC) ->
CompletableFuture.completedFuture(Acknowledge.get());
private Function<TaskExecutionState, CompletableFuture<Acknowledge>>
updateTaskExecutionStateFunction = ignored ->
CompletableFuture.completedFuture(Acknowledge.get());
private BiFunction<JobVertexID, ExecutionAttemptID,
CompletableFuture<SerializedInputSplit>> requestNextInputSplitFunction =
(ignoredA, ignoredB) -> CompletableFuture.completedFuture(new
SerializedInputSplit(null));
private BiFunction<IntermediateDataSetID, ResultPartitionID,
CompletableFuture<ExecutionState>> requestPartitionStateFunction = (ignoredA,
ignoredB) -> CompletableFuture.completedFuture(ExecutionState.RUNNING);
@@ -119,16 +116,6 @@ public class TestingJobMasterGatewayBuilder {
return this;
}
- public TestingJobMasterGatewayBuilder
setRescalingJobFunction(BiFunction<Integer, RescalingBehaviour,
CompletableFuture<Acknowledge>> rescalingJobFunction) {
- this.rescalingJobFunction = rescalingJobFunction;
- return this;
- }
-
- public TestingJobMasterGatewayBuilder
setRescalingOperatorsFunction(TriFunction<Collection<JobVertexID>, Integer,
RescalingBehaviour, CompletableFuture<Acknowledge>> rescalingOperatorsFunction)
{
- this.rescalingOperatorsFunction = rescalingOperatorsFunction;
- return this;
- }
-
public TestingJobMasterGatewayBuilder
setUpdateTaskExecutionStateFunction(Function<TaskExecutionState,
CompletableFuture<Acknowledge>> updateTaskExecutionStateFunction) {
this.updateTaskExecutionStateFunction =
updateTaskExecutionStateFunction;
return this;
@@ -255,6 +242,6 @@ public class TestingJobMasterGatewayBuilder {
}
public TestingJobMasterGateway build() {
- return new TestingJobMasterGateway(address, hostname,
cancelFunction, rescalingJobFunction, rescalingOperatorsFunction,
updateTaskExecutionStateFunction, requestNextInputSplitFunction,
requestPartitionStateFunction, scheduleOrUpdateConsumersFunction,
disconnectTaskManagerFunction, disconnectResourceManagerConsumer,
classloadingPropsSupplier, offerSlotsFunction, failSlotConsumer,
registerTaskManagerFunction, taskManagerHeartbeatConsumer,
resourceManagerHeartbeatConsumer, requestJobDetai [...]
+ return new TestingJobMasterGateway(address, hostname,
cancelFunction, updateTaskExecutionStateFunction,
requestNextInputSplitFunction, requestPartitionStateFunction,
scheduleOrUpdateConsumersFunction, disconnectTaskManagerFunction,
disconnectResourceManagerConsumer, classloadingPropsSupplier,
offerSlotsFunction, failSlotConsumer, registerTaskManagerFunction,
taskManagerHeartbeatConsumer, resourceManagerHeartbeatConsumer,
requestJobDetailsSupplier, requestJobSupplier, triggerSavepointFu [...]
}
}