Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5487#discussion_r170014907 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -447,6 +453,165 @@ public void postStop() throws Exception { return CompletableFuture.completedFuture(Acknowledge.get()); } + @Override + public CompletableFuture<Acknowledge> rescaleJob( + int newParallelism, + RescalingBehaviour rescalingBehaviour, + Time timeout) { + final ArrayList<JobVertexID> allOperators = new ArrayList<>(jobGraph.getNumberOfVertices()); + + for (JobVertex jobVertex : jobGraph.getVertices()) { + allOperators.add(jobVertex.getID()); + } + + return rescaleOperators(allOperators, newParallelism, rescalingBehaviour, timeout); + } + + @Override + public CompletableFuture<Acknowledge> rescaleOperators( + Collection<JobVertexID> operators, + int newParallelism, + RescalingBehaviour rescalingBehaviour, + Time timeout) { + // 1. Check whether we can rescale the job & rescale the respective vertices + for (JobVertexID jobVertexId : operators) { + final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId); + + // update max parallelism in case that it has not been configure + final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + if (executionJobVertex != null) { + jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism()); + } + + try { + rescalingBehaviour.acceptWithException(jobVertex, newParallelism); + } catch (FlinkException e) { + final String msg = String.format("Cannot rescale job %s.", jobGraph.getName()); + + log.info(msg, e); + + return FutureUtils.completedExceptionally( + new JobModificationException(msg, e)); + } + } + + final ExecutionGraph currentExecutionGraph = executionGraph; + + final ExecutionGraph newExecutionGraph; + + try { + newExecutionGraph = ExecutionGraphBuilder.buildGraph( + null, + jobGraph, + jobMasterConfiguration.getConfiguration(), + scheduledExecutorService, + scheduledExecutorService, + slotPool.getSlotProvider(), + userCodeLoader, + highAvailabilityServices.getCheckpointRecoveryFactory(), + rpcTimeout, + currentExecutionGraph.getRestartStrategy(), + jobMetricGroup, + 1, + blobServer, + jobMasterConfiguration.getSlotRequestTimeout(), + log); + } catch (JobExecutionException | JobException e) { + return FutureUtils.completedExceptionally( + new JobModificationException("Could not create rescaled ExecutionGraph.", e)); + } + + // 3. disable checkpoint coordinator to suppress subsequent checkpoints + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + checkpointCoordinator.stopCheckpointScheduler(); + + // 4. take a savepoint + final CompletableFuture<String> savepointFuture = triggerSavepoint( + jobMasterConfiguration.getTmpDirectory(), + timeout); + + final CompletableFuture<ExecutionGraph> executionGraphFuture = savepointFuture + .thenApplyAsync( + (String savepointPath) -> { + try { + newExecutionGraph.getCheckpointCoordinator().restoreSavepoint( + savepointPath, + false, + newExecutionGraph.getAllVertices(), + userCodeLoader); + } catch (Exception e) { + disposeSavepoint(savepointPath); + + throw new CompletionException(new JobModificationException("Could not restore from temporary rescaling savepoint.", e)); + } + + // delete the savepoint file once we reach a terminal state + newExecutionGraph.getTerminationFuture() + .whenCompleteAsync( + (JobStatus jobStatus, Throwable throwable) -> disposeSavepoint(savepointPath), + scheduledExecutorService); + + return newExecutionGraph; + }, scheduledExecutorService) + .exceptionally( + (Throwable failure) -> { + // in case that we couldn't take a savepoint or restore from it, let's restart the checkpoint + // coordinator and abort the rescaling operation + if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) { + checkpointCoordinator.startCheckpointScheduler(); + } + + throw new CompletionException(failure); + }); + + // 5. suspend the current job + final CompletableFuture<JobStatus> terminationFuture = executionGraphFuture.thenComposeAsync( + (ExecutionGraph ignored) -> { + currentExecutionGraph.suspend(new FlinkException("Job is being rescaled.")); + return currentExecutionGraph.getTerminationFuture(); + }, + getMainThreadExecutor()); + + final CompletableFuture<Void> suspendedFuture = terminationFuture.thenAccept( + (JobStatus jobStatus) -> { + if (jobStatus != JobStatus.SUSPENDED) { + final String msg = String.format("Job %s rescaling failed because we could not suspend the execution graph.", jobGraph.getName()); + log.info(msg); + throw new CompletionException(new JobModificationException(msg)); + } + }); + + // 6. resume the new execution graph from the taken savepoint + final CompletableFuture<Acknowledge> rescalingFuture = suspendedFuture.thenCombineAsync( + executionGraphFuture, + (Void ignored, ExecutionGraph restoredExecutionGraph) -> { + // check if the ExecutionGraph is still the same + //noinspection ObjectEquality --- End diff -- There is. The `a == b` causes this. But here we want to check for referential identity.
---