Hi Enrico,

There is an old ticket for this:
https://jira.apache.org/jira/browse/BEAM-593 It hasn't been prioritized
because submitting application "detached" is still possible using the
Flink CLI or in portable pipelines. It comes with some drawbacks that
you explained, e.g. inaccurate pipeline status, metrics retrieval.

Things have improved since with respect to async client communication
with the introduction of RestClusterClient. The tricky part is still to
make this work across all deployment scenarios (local/remote). I think
there are two options:

1) Wrap the entire blocking execution of the Flink API and simply
propagate errors to the Beam application and update its status

2) Retrieve the JobGraph from the batch/streaming API. Submit the
JobGraph with the RestClusterClient to remote clusters. Spin up a Flink
MiniCluster in case of local execution.

On this note, for the next Flink version, there is a plan to add support
for submitting jobs async directly from the ExecutionEnvironment.

Cheers,
Max

On 16.08.19 01:22, enrico canzonieri wrote:
> Hello,
> 
> while launching Flink jobs using the PortableRunner I noticed that jobs
> are marked by Beam as state RUNNING before actually running on Flink.
> The issue doesn't seem specific to the Flink Runner though:
> https://github.com/apache/beam/blob/e5e9cab009ef81045b587b2b582c72cf2522df9b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L90
> 
> I'd assume the TODO in the code is referring exactly to this issue. For
> Flink specifically I guess that one of the problems is that jobs are
> submitted using a blocking API from the BeamFlinkRemoteStreamEnvironment
> <https://github.com/apache/beam/blob/ae83448597f64474c3f5754d7b8e3f6b02347a6b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L361>.
> So we essentially never return from the job submission, unless the job
> is completed (never for streaming) or it's cancelled or failed. A
> possible solution to this is to set the Flink client as detached and
> return a JobSubmissionResult instead of a JobExecutionResult and then
> have a Future or event loop that tracks the actual job execution and
> changes the job state accordingly.
> 
> Playing around with the code it seems that this would be possible but it
> could require a large code change, including possibly the duplication of
> the entire Flink RemoteStreamEnvironment
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java>
> code inside Beam to customize it even more than it is already.
> 
> Is there any ticket tracking this already?
> 
> Cheers,
> Enrico

Reply via email to