Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5487#discussion_r169916387
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
    @@ -447,6 +453,165 @@ public void postStop() throws Exception {
                return CompletableFuture.completedFuture(Acknowledge.get());
        }
     
    +   @Override
    +   public CompletableFuture<Acknowledge> rescaleJob(
    +                   int newParallelism,
    +                   RescalingBehaviour rescalingBehaviour,
    +                   Time timeout) {
    +           final ArrayList<JobVertexID> allOperators = new 
ArrayList<>(jobGraph.getNumberOfVertices());
    +
    +           for (JobVertex jobVertex : jobGraph.getVertices()) {
    +                   allOperators.add(jobVertex.getID());
    +           }
    +
    +           return rescaleOperators(allOperators, newParallelism, 
rescalingBehaviour, timeout);
    +   }
    +
    +   @Override
    +   public CompletableFuture<Acknowledge> rescaleOperators(
    +                   Collection<JobVertexID> operators,
    +                   int newParallelism,
    +                   RescalingBehaviour rescalingBehaviour,
    +                   Time timeout) {
    +           // 1. Check whether we can rescale the job & rescale the 
respective vertices
    +           for (JobVertexID jobVertexId : operators) {
    +                   final JobVertex jobVertex = 
jobGraph.findVertexByID(jobVertexId);
    +
    +                   // update max parallelism in case that it has not been 
configure
    +                   final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
    +
    +                   if (executionJobVertex != null) {
    +                           
jobVertex.setMaxParallelism(executionJobVertex.getMaxParallelism());
    +                   }
    +
    +                   try {
    +                           
rescalingBehaviour.acceptWithException(jobVertex, newParallelism);
    +                   } catch (FlinkException e) {
    +                           final String msg = String.format("Cannot 
rescale job %s.", jobGraph.getName());
    +
    +                           log.info(msg, e);
    +
    +                           return FutureUtils.completedExceptionally(
    +                                   new JobModificationException(msg, e));
    +                   }
    +           }
    +
    +           final ExecutionGraph currentExecutionGraph = executionGraph;
    +
    +           final ExecutionGraph newExecutionGraph;
    +
    +           try {
    +                   newExecutionGraph = ExecutionGraphBuilder.buildGraph(
    +                           null,
    +                           jobGraph,
    +                           jobMasterConfiguration.getConfiguration(),
    +                           scheduledExecutorService,
    +                           scheduledExecutorService,
    +                           slotPool.getSlotProvider(),
    +                           userCodeLoader,
    +                           
highAvailabilityServices.getCheckpointRecoveryFactory(),
    +                           rpcTimeout,
    +                           currentExecutionGraph.getRestartStrategy(),
    +                           jobMetricGroup,
    +                           1,
    +                           blobServer,
    +                           jobMasterConfiguration.getSlotRequestTimeout(),
    +                           log);
    +           } catch (JobExecutionException | JobException e) {
    +                   return FutureUtils.completedExceptionally(
    +                           new JobModificationException("Could not create 
rescaled ExecutionGraph.", e));
    +           }
    +
    +           // 3. disable checkpoint coordinator to suppress subsequent 
checkpoints
    +           final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
    +           checkpointCoordinator.stopCheckpointScheduler();
    +
    +           // 4. take a savepoint
    +           final CompletableFuture<String> savepointFuture = 
triggerSavepoint(
    +                   jobMasterConfiguration.getTmpDirectory(),
    --- End diff --
    
    Why is it ok to use a tmp directory as the target directory? Shouldn't the 
directory be visible from all hosts?


---

Reply via email to