[
https://issues.apache.org/jira/browse/BEAM-12477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Beam JIRA Bot updated BEAM-12477:
---------------------------------
Labels: stale-P2 (was: )
> pipeline.run() blocks when using FlinkRunner
> --------------------------------------------
>
> Key: BEAM-12477
> URL: https://issues.apache.org/jira/browse/BEAM-12477
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.29.0
> Reporter: Stefan Wachter
> Priority: P2
> Labels: stale-P2
>
> {{pipeline.run()}} is documented to be asynchronous (cf.
> [create-your-pipeline|https://beam.apache.org/documentation/pipelines/create-your-pipeline/]).
> It seems that when using FlinkRunner (embedded or remote) the call blocks
> until the pipeline finishes.
> Digging into Flink code I found that both, {{LocalStreamEnvironment}} and
> {{RemoteStreamEnvironment}} set {{execution.attached}} to true. This causes
> that {{StreamExecutionEnvironment.execute}} blocks later on:
> {code}
> public JobExecutionResult execute(StreamGraph streamGraph) throws
> Exception {
> final JobClient jobClient = executeAsync(streamGraph);
> try {
> final JobExecutionResult jobExecutionResult;
> if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
> jobExecutionResult = jobClient.getJobExecutionResult().get();
> // <==== execution is blocked here
> } else {
> jobExecutionResult = new
> DetachedJobExecutionResult(jobClient.getJobID());
> }
> jobListeners.forEach(
> jobListener ->
> jobListener.onJobExecuted(jobExecutionResult, null));
> return jobExecutionResult;
> } catch (Throwable t) {
> // get() on the JobExecutionResult Future will throw an
> ExecutionException. This
> // behaviour was largely not there in Flink versions before the
> PipelineExecutor
> // refactoring so we should strip that exception.
> Throwable strippedException =
> ExceptionUtils.stripExecutionException(t);
> jobListeners.forEach(
> jobListener -> {
> jobListener.onJobExecuted(null, strippedException);
> });
> ExceptionUtils.rethrowException(strippedException);
> // never reached, only make javac happy
> return null;
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)