[
https://issues.apache.org/jira/browse/BEAM-12477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17399379#comment-17399379
]
Beam JIRA Bot commented on BEAM-12477:
--------------------------------------
This issue is P2 but has been unassigned without any comment for 60 days so it
has been labeled "stale-P2". If this issue is still affecting you, we care!
Please comment and remove the label. Otherwise, in 14 days the issue will be
moved to P3.
Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed
explanation of what these priorities mean.
> pipeline.run() blocks when using FlinkRunner
> --------------------------------------------
>
> Key: BEAM-12477
> URL: https://issues.apache.org/jira/browse/BEAM-12477
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.29.0
> Reporter: Stefan Wachter
> Priority: P2
> Labels: stale-P2
>
> {{pipeline.run()}} is documented to be asynchronous (cf.
> [create-your-pipeline|https://beam.apache.org/documentation/pipelines/create-your-pipeline/]).
> It seems that when using FlinkRunner (embedded or remote) the call blocks
> until the pipeline finishes.
> Digging into Flink code I found that both, {{LocalStreamEnvironment}} and
> {{RemoteStreamEnvironment}} set {{execution.attached}} to true. This causes
> that {{StreamExecutionEnvironment.execute}} blocks later on:
> {code}
> public JobExecutionResult execute(StreamGraph streamGraph) throws
> Exception {
> final JobClient jobClient = executeAsync(streamGraph);
> try {
> final JobExecutionResult jobExecutionResult;
> if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
> jobExecutionResult = jobClient.getJobExecutionResult().get();
> // <==== execution is blocked here
> } else {
> jobExecutionResult = new
> DetachedJobExecutionResult(jobClient.getJobID());
> }
> jobListeners.forEach(
> jobListener ->
> jobListener.onJobExecuted(jobExecutionResult, null));
> return jobExecutionResult;
> } catch (Throwable t) {
> // get() on the JobExecutionResult Future will throw an
> ExecutionException. This
> // behaviour was largely not there in Flink versions before the
> PipelineExecutor
> // refactoring so we should strip that exception.
> Throwable strippedException =
> ExceptionUtils.stripExecutionException(t);
> jobListeners.forEach(
> jobListener -> {
> jobListener.onJobExecuted(null, strippedException);
> });
> ExceptionUtils.rethrowException(strippedException);
> // never reached, only make javac happy
> return null;
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)