Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2732#discussion_r89141153
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java ---
    @@ -172,29 +174,40 @@ public JobExecutionResult executePlan(Plan plan) 
throws Exception {
     
                                // start the cluster for us
                                start();
    -                   }
    -                   else {
    +                   } else {
                                // we use the existing session
                                shutDownAtEnd = false;
                        }
     
    -                   try {
    -                           Configuration configuration = 
this.flink.configuration();
    +                   Configuration configuration = 
this.flink.configuration();
     
    -                           Optimizer pc = new Optimizer(new 
DataStatistics(), configuration);
    -                           OptimizedPlan op = pc.compile(plan);
    +                   Optimizer pc = new Optimizer(new DataStatistics(), 
configuration);
    +                   OptimizedPlan op = pc.compile(plan);
     
    -                           JobGraphGenerator jgg = new 
JobGraphGenerator(configuration);
    -                           JobGraph jobGraph = jgg.compileJobGraph(op, 
plan.getJobId());
    +                   JobGraphGenerator jgg = new 
JobGraphGenerator(configuration);
    +                   JobGraph jobGraph = jgg.compileJobGraph(op, 
plan.getJobId());
     
    -                           boolean sysoutPrint = 
isPrintingStatusDuringExecution();
    -                           return flink.submitJobAndWait(jobGraph, 
sysoutPrint);
    -                   }
    -                   finally {
    -                           if (shutDownAtEnd) {
    -                                   stop();
    +                   boolean sysoutPrint = isPrintingStatusDuringExecution();
    +
    +
    +                   JobListeningContext jobListeningContext = 
flink.submitJob(jobGraph, sysoutPrint);
    +                   JobClientEager jobClient = new 
JobClientEager(jobListeningContext);
    +
    +                   Runnable cleanup = new Runnable() {
    +                           @Override
    +                           public void run() {
    +                                   if (shutDownAtEnd) {
    --- End diff --
    
    We could but it wouldn't make any semantic difference since the enclosed 
variable must be final.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to