damccorm opened a new issue, #20992:
URL: https://github.com/apache/beam/issues/20992

   `pipeline.run()` is documented to be asynchronous (cf. 
[create-your-pipeline](https://beam.apache.org/documentation/pipelines/create-your-pipeline/)).
 It seems that when using FlinkRunner (embedded or remote) the call blocks 
until the pipeline finishes.
   
   Digging into Flink code I found that both, `LocalStreamEnvironment` and 
`RemoteStreamEnvironment` set `execution.attached` to true. This causes that 
`StreamExecutionEnvironment.execute` blocks later on:
   
   ```
   
       public JobExecutionResult execute(StreamGraph streamGraph) throws 
Exception {
           final JobClient
   jobClient = executeAsync(streamGraph);
   
           try {
               final JobExecutionResult jobExecutionResult;
   
   
              if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
                   jobExecutionResult
   = jobClient.getJobExecutionResult().get(); // <==== execution is blocked here
               } else {
   
                  jobExecutionResult = new 
DetachedJobExecutionResult(jobClient.getJobID());
           
      }
   
               jobListeners.forEach(
                       jobListener -> 
jobListener.onJobExecuted(jobExecutionResult,
   null));
   
               return jobExecutionResult;
           } catch (Throwable t) {
               // get()
   on the JobExecutionResult Future will throw an ExecutionException. This
               // behaviour was
   largely not there in Flink versions before the PipelineExecutor
               // refactoring so we should
   strip that exception.
               Throwable strippedException = 
ExceptionUtils.stripExecutionException(t);
   
   
              jobListeners.forEach(
                       jobListener -> {
                           jobListener.onJobExecuted(null,
   strippedException);
                       });
               ExceptionUtils.rethrowException(strippedException);
   
   
              // never reached, only make javac happy
               return null;
           }
       }
   
   ```
   
   
   Imported from Jira 
[BEAM-12477](https://issues.apache.org/jira/browse/BEAM-12477). Original Jira 
may contain additional context.
   Reported by: [email protected].


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to