aljoscha commented on a change in pull request #9607: [FLINK-13946] Remove job session related code from ExecutionEnvironment URL: https://github.com/apache/flink/pull/9607#discussion_r320774343
########## File path: flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java ########## @@ -178,65 +139,65 @@ public boolean isRunning() { */ @Override public JobExecutionResult executePlan(Plan plan) throws Exception { - if (plan == null) { - throw new IllegalArgumentException("The plan may not be null."); - } + checkNotNull(plan); - synchronized (this.lock) { + final Configuration jobExecutorServiceConfiguration = configureExecution(plan); - // check if we start a session dedicated for this execution - final boolean shutDownAtEnd; + try (final JobExecutorService executorService = createJobExecutorService(jobExecutorServiceConfiguration)) { - if (jobExecutorService == null) { - shutDownAtEnd = true; + Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration); + OptimizedPlan op = pc.compile(plan); - // configure the number of local slots equal to the parallelism of the local plan - if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) { - int maxParallelism = plan.getMaximumParallelism(); - if (maxParallelism > 0) { - this.taskManagerNumSlots = maxParallelism; - } - } + JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration); + JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId()); - // start the cluster for us - start(); - } - else { - // we use the existing session - shutDownAtEnd = false; + return executorService.executeJobBlocking(jobGraph); + } + } + + private Configuration configureExecution(final Plan plan) { + setNumberOfTaskSlots(plan); + final Configuration executorConfiguration = createExecutorServiceConfig(); + setPlanParallelism(plan, executorConfiguration); + return executorConfiguration; + } + + private void setNumberOfTaskSlots(final Plan plan) { + if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) { + int maxParallelism = plan.getMaximumParallelism(); + if (maxParallelism > 0) { + this.taskManagerNumSlots = maxParallelism; } + } + } - try { - // TODO: Set job's default parallelism to max number of slots - final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots); - final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers); + private Configuration createExecutorServiceConfig() { + final Configuration newConfiguration = new Configuration(); + newConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots); + newConfiguration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, defaultOverwriteFiles); - Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration); - OptimizedPlan op = pc.compile(plan); + newConfiguration.addAll(baseConfiguration); - JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration); - JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId()); + return newConfiguration; + } - return jobExecutorService.executeJobBlocking(jobGraph); - } - finally { - if (shutDownAtEnd) { - stop(); - } - } - } + private void setPlanParallelism(final Plan plan, final Configuration executorServiceConfig) { Review comment: Here I also think it's more complicated than it needs to be. We put the values in the configuration just so we can retrieve them here again. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services