Hi Sam,
Good observation. Looks like we should fix that.
Looking at InMemoryJobService, it appears that the state can only be retrieved
by the client once the job is running with a job/invocation id associated.
Indeed, any messages until that could be lost.
For Flink the JobId is generated here:
https://github.com/apache/beam/blob/3db71dd9f6f32684903c54b15a5368991cd41f36/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java#L64
I don't see any benefit of having two separate IDs, as the IDs are already
scoped by preparation and invocation phase.
- Would it be possible to just pass the preparation id as the invocation id at
JobInvoker#invoke(..)?
- Alternatively, we could have an additional prepare phase for JobInvoker to get
the job id for the invocation, before we start the job.
Thanks,
Max
On 14.01.19 12:39, Sam Rohde wrote:
Hello all,
While going through the codebase I noticed a problem with the Beam JobService.
In particular, the API allows for the possibility of never seeing some messages
or states with Get(State|Message)Stream. This is because the
Get(State|Message)Stream calls need to have the job id which can only be
obtained from the RunJobResponse. But in order to see all messages/states the
streams need to be opened before the job starts.
This is fine in Dataflow as the preparation_id == job_id, but this is not true
in Flink. What do you all think of this? Am I misunderstanding something?
Thanks,
Sam