This gets into the gory details of the Flink client API and there is plenty
room for improvement. There is relevant discussion happening on the Flink
dev list right now.

But before dwelling into that and how a workaround could look like in Beam,
do you really want to rely on the status from the Beam job server? In our
deployment we only use the job server to submit the pipeline and then
terminate it. Any monitoring is based on Flink itself.

You can obtain the job status through the Flink REST API and the metrics
through Flink as well. To assert that a job was successfully launched, it
isn't sufficient to check that it is RUNNING in any case. There is still
the possibility that it never really starts due to resource issues,
recovery from savepoint not completing and so on. To get an accurate
"LAUNCHED OK" signal, it is necessary to check Flink metrics.

Cheers,
Thomas



On Fri, Aug 16, 2019 at 3:03 AM Maximilian Michels <[email protected]> wrote:

> Hi Enrico,
>
> There is an old ticket for this:
> https://jira.apache.org/jira/browse/BEAM-593 It hasn't been prioritized
> because submitting application "detached" is still possible using the
> Flink CLI or in portable pipelines. It comes with some drawbacks that
> you explained, e.g. inaccurate pipeline status, metrics retrieval.
>
> Things have improved since with respect to async client communication
> with the introduction of RestClusterClient. The tricky part is still to
> make this work across all deployment scenarios (local/remote). I think
> there are two options:
>
> 1) Wrap the entire blocking execution of the Flink API and simply
> propagate errors to the Beam application and update its status
>
> 2) Retrieve the JobGraph from the batch/streaming API. Submit the
> JobGraph with the RestClusterClient to remote clusters. Spin up a Flink
> MiniCluster in case of local execution.
>
> On this note, for the next Flink version, there is a plan to add support
> for submitting jobs async directly from the ExecutionEnvironment.
>
> Cheers,
> Max
>
> On 16.08.19 01:22, enrico canzonieri wrote:
> > Hello,
> >
> > while launching Flink jobs using the PortableRunner I noticed that jobs
> > are marked by Beam as state RUNNING before actually running on Flink.
> > The issue doesn't seem specific to the Flink Runner though:
> >
> https://github.com/apache/beam/blob/e5e9cab009ef81045b587b2b582c72cf2522df9b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L90
> >
> > I'd assume the TODO in the code is referring exactly to this issue. For
> > Flink specifically I guess that one of the problems is that jobs are
> > submitted using a blocking API from the BeamFlinkRemoteStreamEnvironment
> > <
> https://github.com/apache/beam/blob/ae83448597f64474c3f5754d7b8e3f6b02347a6b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L361
> >.
> > So we essentially never return from the job submission, unless the job
> > is completed (never for streaming) or it's cancelled or failed. A
> > possible solution to this is to set the Flink client as detached and
> > return a JobSubmissionResult instead of a JobExecutionResult and then
> > have a Future or event loop that tracks the actual job execution and
> > changes the job state accordingly.
> >
> > Playing around with the code it seems that this would be possible but it
> > could require a large code change, including possibly the duplication of
> > the entire Flink RemoteStreamEnvironment
> > <
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
> >
> > code inside Beam to customize it even more than it is already.
> >
> > Is there any ticket tracking this already?
> >
> > Cheers,
> > Enrico
>

Reply via email to