Hello, while launching Flink jobs using the PortableRunner I noticed that jobs are marked by Beam as state RUNNING before actually running on Flink. The issue doesn't seem specific to the Flink Runner though: https://github.com/apache/beam/blob/e5e9cab009ef81045b587b2b582c72cf2522df9b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java#L90
I'd assume the TODO in the code is referring exactly to this issue. For Flink specifically I guess that one of the problems is that jobs are submitted using a blocking API from the BeamFlinkRemoteStreamEnvironment <https://github.com/apache/beam/blob/ae83448597f64474c3f5754d7b8e3f6b02347a6b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L361>. So we essentially never return from the job submission, unless the job is completed (never for streaming) or it's cancelled or failed. A possible solution to this is to set the Flink client as detached and return a JobSubmissionResult instead of a JobExecutionResult and then have a Future or event loop that tracks the actual job execution and changes the job state accordingly. Playing around with the code it seems that this would be possible but it could require a large code change, including possibly the duplication of the entire Flink RemoteStreamEnvironment <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java> code inside Beam to customize it even more than it is already. Is there any ticket tracking this already? Cheers, Enrico
