[ https://issues.apache.org/jira/browse/FLINK-8656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372673#comment-16372673 ]
ASF GitHub Bot commented on FLINK-8656: --------------------------------------- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5487#discussion_r169917224 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -447,6 +453,165 @@ public void postStop() throws Exception { return CompletableFuture.completedFuture(Acknowledge.get()); } + @Override + public CompletableFuture<Acknowledge> rescaleJob( + int newParallelism, + RescalingBehaviour rescalingBehaviour, + Time timeout) { + final ArrayList<JobVertexID> allOperators = new ArrayList<>(jobGraph.getNumberOfVertices()); + + for (JobVertex jobVertex : jobGraph.getVertices()) { + allOperators.add(jobVertex.getID()); + } + + return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout); + } + + @Override + public CompletableFuture<Acknowledge> rescaleOperators( + Collection<JobVertexID> operators, + int newParallelism, + RescalingBehaviour rescalingBehaviour, + Time timeout) { + // 1. Check whether we can rescale the job & rescale the respective vertices + for (JobVertexID jobVertexId : operators) { + final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId); + + // update max parallelism in case that it has not been configure + final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + if (executionJobVertex != null) { + jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism()); + } + + try { + rescalingBehaviour.acceptWithException(jobVertex, newParallelism); + } catch (FlinkException e) { + final String msg = String.format("Cannot rescale job %s.", jobGraph.getName()); + + log.info(msg, e); + + return FutureUtils.completedExceptionally( + new JobModificationException(msg, e)); + } + } + + final ExecutionGraph currentExecutionGraph = executionGraph; + + final ExecutionGraph newExecutionGraph; + + try { + newExecutionGraph = ExecutionGraphBuilder.buildGraph( + null, + jobGraph, + jobMasterConfiguration.getConfiguration(), + scheduledExecutorService, + scheduledExecutorService, + slotPool.getSlotProvider(), + userCodeLoader, + highAvailabilityServices.getCheckpointRecoveryFactory(), + rpcTimeout, + currentExecutionGraph.getRestartStrategy(), + jobMetricGroup, + 1, + blobServer, + jobMasterConfiguration.getSlotRequestTimeout(), + log); + } catch (JobExecutionException | JobException e) { + return FutureUtils.completedExceptionally( + new JobModificationException("Could not create rescaled ExecutionGraph.", e)); + } + + // 3. disable checkpoint coordinator to suppress subsequent checkpoints + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + checkpointCoordinator.stopCheckpointScheduler(); + + // 4. take a savepoint + final CompletableFuture<String> savepointFuture = triggerSavepoint( + jobMasterConfiguration.getTmpDirectory(), + timeout); + + final CompletableFuture<ExecutionGraph> executionGraphFuture = savepointFuture + .thenApplyAsync( + (String savepointPath) -> { + try { + newExecutionGraph.getCheckpointCoordinator().restoreSavepoint( + savepointPath, + false, + newExecutionGraph.getAllVertices(), + userCodeLoader); + } catch (Exception e) { + disposeSavepoint(savepointPath); + + throw new CompletionException(new JobModificationException("Could not restore from temporary rescaling savepoint.", e)); + } + + // delete the savepoint file once we reach a terminal state + newExecutionGraph.getTerminationFuture() + .whenCompleteAsync( + (JobStatus jobStatus, Throwable throwable) -> disposeSavepoint(savepointPath), + scheduledExecutorService); + + return newExecutionGraph; + }, scheduledExecutorService) + .exceptionally( + (Throwable failure) -> { + // in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint + // coordinator and abort the rescaling operation + if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) { --- End diff -- The check could move inside `startCheckpointScheduler` > Add CLI command for rescaling > ----------------------------- > > Key: FLINK-8656 > URL: https://issues.apache.org/jira/browse/FLINK-8656 > Project: Flink > Issue Type: New Feature > Components: Client > Affects Versions: 1.5.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The REST rescaling calls should be made accessible via the {{CliFrontend}}. > In order to do that I propose to add a {{modify}} command to the > {{CliFrontend}} to which we can pass a new parallelism. -- This message was sent by Atlassian JIRA (v7.6.3#76005)