[
https://issues.apache.org/jira/browse/FLINK-8656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372674#comment-16372674
]
ASF GitHub Bot commented on FLINK-8656:
---------------------------------------
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5487#discussion_r169916387
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
---
@@ -447,6 +453,165 @@ public void postStop() throws Exception {
return CompletableFuture.completedFuture(Acknowledge.get());
}
+ @Override
+ public CompletableFuture<Acknowledge> rescaleJob(
+ int newParallelism,
+ RescalingBehaviour rescalingBehaviour,
+ Time timeout) {
+ final ArrayList<JobVertexID> allOperators = new
ArrayList<>(jobGraph.getNumberOfVertices());
+
+ for (JobVertex jobVertex : jobGraph.getVertices()) {
+ allOperators.add(jobVertex.getID());
+ }
+
+ return rescaleOperators(allOperators, newParallelism,
rescalingBehaviour, timeout);
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> rescaleOperators(
+ Collection<JobVertexID> operators,
+ int newParallelism,
+ RescalingBehaviour rescalingBehaviour,
+ Time timeout) {
+ // 1. Check whether we can rescale the job & rescale the
respective vertices
+ for (JobVertexID jobVertexId : operators) {
+ final JobVertex jobVertex =
jobGraph.findVertexByID(jobVertexId);
+
+ // update max parallelism in case that it has not been
configure
+ final ExecutionJobVertex executionJobVertex =
executionGraph.getJobVertex(jobVertexId);
+
+ if (executionJobVertex != null) {
+
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
+ }
+
+ try {
+
rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
+ } catch (FlinkException e) {
+ final String msg = String.format("Cannot
rescale job %s.", jobGraph.getName());
+
+ log.info(msg, e);
+
+ return FutureUtils.completedExceptionally(
+ new JobModificationException(msg, e));
+ }
+ }
+
+ final ExecutionGraph currentExecutionGraph = executionGraph;
+
+ final ExecutionGraph newExecutionGraph;
+
+ try {
+ newExecutionGraph = ExecutionGraphBuilder.buildGraph(
+ null,
+ jobGraph,
+ jobMasterConfiguration.getConfiguration(),
+ scheduledExecutorService,
+ scheduledExecutorService,
+ slotPool.getSlotProvider(),
+ userCodeLoader,
+
highAvailabilityServices.getCheckpointRecoveryFactory(),
+ rpcTimeout,
+ currentExecutionGraph.getRestartStrategy(),
+ jobMetricGroup,
+ 1,
+ blobServer,
+ jobMasterConfiguration.getSlotRequestTimeout(),
+ log);
+ } catch (JobExecutionException | JobException e) {
+ return FutureUtils.completedExceptionally(
+ new JobModificationException("Could not create
rescaled ExecutionGraph.", e));
+ }
+
+ // 3. disable checkpoint coordinator to suppress subsequent
checkpoints
+ final CheckpointCoordinator checkpointCoordinator =
executionGraph.getCheckpointCoordinator();
+ checkpointCoordinator.stopCheckpointScheduler();
+
+ // 4. take a savepoint
+ final CompletableFuture<String> savepointFuture =
triggerSavepoint(
+ jobMasterConfiguration.getTmpDirectory(),
--- End diff --
Why is it ok to use a tmp directory as the target directory? Shouldn't the
directory be visible from all hosts?
> Add CLI command for rescaling
> -----------------------------
>
> Key: FLINK-8656
> URL: https://issues.apache.org/jira/browse/FLINK-8656
> Project: Flink
> Issue Type: New Feature
> Components: Client
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Major
> Labels: flip-6
> Fix For: 1.5.0
>
>
> The REST rescaling calls should be made accessible via the {{CliFrontend}}.
> In order to do that I propose to add a {{modify}} command to the
> {{CliFrontend}} to which we can pass a new parallelism.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)