Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r169917845
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
    @@ -447,6 +453,165 @@ public void postStop() throws Exception {
                return CompletableFuture.completedFuture(Acknowledge.get());
        }
     
    +   @Override
    +   public CompletableFuture<Acknowledge> rescaleJob(
    +                   int newParallelism,
    +                   RescalingBehaviour rescalingBehaviour,
    +                   Time timeout) {
    +           final ArrayList<JobVertexID> allOperators = new 
ArrayList<>(jobGraph.getNumberOfVertices());
    +
    +           for (JobVertex jobVertex : jobGraph.getVertices()) {
    +                   allOperators.add(jobVertex.getID());
    +           }
    +
    +           return rescaleOperators(allOperators, newParallelism, 
rescalingBehaviour, timeout);
    +   }
    +
    +   @Override
    +   public CompletableFuture<Acknowledge> rescaleOperators(
    +                   Collection<JobVertexID> operators,
    +                   int newParallelism,
    +                   RescalingBehaviour rescalingBehaviour,
    +                   Time timeout) {
    +           // 1. Check whether we can rescale the job & rescale the 
respective vertices
    +           for (JobVertexID jobVertexId : operators) {
    +                   final JobVertex jobVertex = 
jobGraph.findVertexByID(jobVertexId);
    +
    +                   // update max parallelism in case that it has not been 
configure
    +                   final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
    +
    +                   if (executionJobVertex != null) {
    +                           
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
    +                   }
    +
    +                   try {
    +                           
rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
    +                   } catch (FlinkException e) {
    +                           final String msg = String.format("Cannot 
rescale job %s.", jobGraph.getName());
    +
    +                           log.info(msg, e);
    +
    +                           return FutureUtils.completedExceptionally(
    +                                   new JobModificationException(msg, e));
    +                   }
    +           }
    +
    +           final ExecutionGraph currentExecutionGraph = executionGraph;
    +
    +           final ExecutionGraph newExecutionGraph;
    +
    +           try {
    +                   newExecutionGraph = ExecutionGraphBuilder.buildGraph(
    +                           null,
    +                           jobGraph,
    +                           jobMasterConfiguration.getConfiguration(),
    +                           scheduledExecutorService,
    +                           scheduledExecutorService,
    +                           slotPool.getSlotProvider(),
    +                           userCodeLoader,
    +                           
highAvailabilityServices.getCheckpointRecoveryFactory(),
    +                           rpcTimeout,
    +                           currentExecutionGraph.getRestartStrategy(),
    +                           jobMetricGroup,
    +                           1,
    +                           blobServer,
    +                           jobMasterConfiguration.getSlotRequestTimeout(),
    +                           log);
    +           } catch (JobExecutionException | JobException e) {
    +                   return FutureUtils.completedExceptionally(
    +                           new JobModificationException("Could not create 
rescaled ExecutionGraph.", e));
    +           }
    +
    +           // 3. disable checkpoint coordinator to suppress subsequent 
checkpoints
    +           final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
    +           checkpointCoordinator.stopCheckpointScheduler();
    +
    +           // 4. take a savepoint
    +           final CompletableFuture<String> savepointFuture = 
triggerSavepoint(
    +                   jobMasterConfiguration.getTmpDirectory(),
    +                   timeout);
    +
    +           final CompletableFuture<ExecutionGraph> executionGraphFuture = 
savepointFuture
    +                   .thenApplyAsync(
    +                           (String savepointPath) -> {
    +                                   try {
    +                                           
newExecutionGraph.getCheckpointCoordinator().restoreSavepoint(
    +                                                   savepointPath,
    +                                                   false,
    +                                                   
newExecutionGraph.getAllVertices(),
    +                                                   userCodeLoader);
    +                                   } catch (Exception e) {
    +                                           disposeSavepoint(savepointPath);
    +
    +                                           throw new 
CompletionException(new JobModificationException("Could not restore from 
temporary rescaling savepoint.", e));
    +                                   }
    +
    +                                   // delete the savepoint file once we 
reach a terminal state
    +                                   newExecutionGraph.getTerminationFuture()
    +                                           .whenCompleteAsync(
    +                                                   (JobStatus jobStatus, 
Throwable throwable) -> disposeSavepoint(savepointPath),
    +                                                   
scheduledExecutorService);
    +
    +                                   return newExecutionGraph;
    +                           }, scheduledExecutorService)
    +                   .exceptionally(
    +                           (Throwable failure) -> {
    +                                   // in case that we couldn't take a 
savepoint or restore from it, let's restart the checkpoint
    +                                   // coordinator and abort the rescaling 
operation
    +                                   if 
(checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
    +                                           
checkpointCoordinator.startCheckpointScheduler();
    +                                   }
    +
    +                                   throw new CompletionException(failure);
    +                           });
    +
    +           // 5. suspend the current job
    +           final CompletableFuture<JobStatus> terminationFuture = 
executionGraphFuture.thenComposeAsync(
    +                   (ExecutionGraph ignored) -> {
    +                           currentExecutionGraph.suspend(new 
FlinkException("Job is being rescaled."));
    +                           return 
currentExecutionGraph.getTerminationFuture();
    +                   },
    +                   getMainThreadExecutor());
    +
    +           final CompletableFuture<Void> suspendedFuture = 
terminationFuture.thenAccept(
    +                   (JobStatus jobStatus) -> {
    +                           if (jobStatus != JobStatus.SUSPENDED) {
    +                                   final String msg = String.format("Job 
%s rescaling failed because we could not suspend the execution graph.", 
jobGraph.getName());
    +                                   log.info(msg);
    +                                   throw new CompletionException(new 
JobModificationException(msg));
    +                           }
    +                   });
    +
    +           // 6. resume the new execution graph from the taken savepoint
    +           final CompletableFuture<Acknowledge> rescalingFuture = 
suspendedFuture.thenCombineAsync(
    +                   executionGraphFuture,
    +                   (Void ignored, ExecutionGraph restoredExecutionGraph) 
-> {
    +                           // check if the ExecutionGraph is still the same
    +                           //noinspection ObjectEquality
    --- End diff --
    
    I think on this line there is no warning to suppress. 


---

Reply via email to