TisonKun commented on a change in pull request #9607: [FLINK-13946] Remove job 
session related code from ExecutionEnvironment
URL: https://github.com/apache/flink/pull/9607#discussion_r320670163
 
 

 ##########
 File path: 
flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
 ##########
 @@ -178,65 +149,71 @@ public boolean isRunning() {
         */
        @Override
        public JobExecutionResult executePlan(Plan plan) throws Exception {
-               if (plan == null) {
-                       throw new IllegalArgumentException("The plan may not be 
null.");
-               }
+               checkNotNull(plan);
 
-               synchronized (this.lock) {
+               JobExecutorService executorService = null;
+               try {
+                       final Configuration jobExecutorServiceConfiguration = 
configureExecution(plan);
 
-                       // check if we start a session dedicated for this 
execution
-                       final boolean shutDownAtEnd;
+                       executorService = 
startExecutorService(jobExecutorServiceConfiguration);
 
-                       if (jobExecutorService == null) {
-                               shutDownAtEnd = true;
+                       Optimizer pc = new Optimizer(new DataStatistics(), 
jobExecutorServiceConfiguration);
+                       OptimizedPlan op = pc.compile(plan);
 
-                               // configure the number of local slots equal to 
the parallelism of the local plan
-                               if (this.taskManagerNumSlots == 
DEFAULT_TASK_MANAGER_NUM_SLOTS) {
-                                       int maxParallelism = 
plan.getMaximumParallelism();
-                                       if (maxParallelism > 0) {
-                                               this.taskManagerNumSlots = 
maxParallelism;
-                                       }
-                               }
+                       JobGraphGenerator jgg = new 
JobGraphGenerator(jobExecutorServiceConfiguration);
+                       JobGraph jobGraph = jgg.compileJobGraph(op, 
plan.getJobId());
 
-                               // start the cluster for us
-                               start();
+                       return executorService.executeJobBlocking(jobGraph);
+               } finally {
+                       if (executorService != null) {
+                               stopExecutorService(executorService);
                        }
-                       else {
-                               // we use the existing session
-                               shutDownAtEnd = false;
+               }
+       }
+
+       private Configuration configureExecution(final Plan executable) {
 
 Review comment:
   Why the parameter named `executable`  that different from `plan` anywhere 
others in this file?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to