Hi Enrico, There is an old ticket for this: https://jira.apache.org/jira/browse/BEAM-593 It hasn't been prioritized because submitting application "detached" is still possible using the Flink CLI or in portable pipelines. It comes with some drawbacks that you explained, e.g. inaccurate pipeline status, metrics retrieval.
Things have improved since with respect to async client communication with the introduction of RestClusterClient. The tricky part is still to make this work across all deployment scenarios (local/remote). I think there are two options: 1) Wrap the entire blocking execution of the Flink API and simply propagate errors to the Beam application and update its status 2) Retrieve the JobGraph from the batch/streaming API. Submit the JobGraph with the RestClusterClient to remote clusters. Spin up a Flink MiniCluster in case of local execution. On this note, for the next Flink version, there is a plan to add support for submitting jobs async directly from the ExecutionEnvironment. Cheers, Max On 16.08.19 01:22, enrico canzonieri wrote: > Hello, > > while launching Flink jobs using the PortableRunner I noticed that jobs > are marked by Beam as state RUNNING before actually running on Flink. > The issue doesn't seem specific to the Flink Runner though: > https://github.com/apache/beam/blob/e5e9cab009ef81045b587b2b582c72cf2522df9b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L90 > > I'd assume the TODO in the code is referring exactly to this issue. For > Flink specifically I guess that one of the problems is that jobs are > submitted using a blocking API from the BeamFlinkRemoteStreamEnvironment > <https://github.com/apache/beam/blob/ae83448597f64474c3f5754d7b8e3f6b02347a6b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L361>. > So we essentially never return from the job submission, unless the job > is completed (never for streaming) or it's cancelled or failed. A > possible solution to this is to set the Flink client as detached and > return a JobSubmissionResult instead of a JobExecutionResult and then > have a Future or event loop that tracks the actual job execution and > changes the job state accordingly. > > Playing around with the code it seems that this would be possible but it > could require a large code change, including possibly the duplication of > the entire Flink RemoteStreamEnvironment > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java> > code inside Beam to customize it even more than it is already. > > Is there any ticket tracking this already? > > Cheers, > Enrico
