On Tue, Jan 15, 2019 at 1:19 AM Ankur Goenka <goe...@google.com> wrote:
>
> Thanks Sam for bringing this to the list.
>
> As preparation_ids are not reusable, having preparation_id and job_id same 
> makes sense to me for Flink.

I think we change the protocol and only have one kind of ID. As well
as solving the problem at hand, it also simplifies the API.

> Another option is to have a subscription for all states/messages on the 
> JobServer.

The problem is forcing the job service to remember all logs that were
ever logged ever in case someone requests them at some future date.
Best to have a way to register a listener earlier.

> This will be similar to "docker". As the container id is created after the 
> container creation, the only way to get the container creation even is to 
> start "docker events" before starting a container.
>
> On Mon, Jan 14, 2019 at 11:13 AM Maximilian Michels <m...@apache.org> wrote:
>>
>> Hi Sam,
>>
>> Good observation. Looks like we should fix that.
>>
>> Looking at InMemoryJobService, it appears that the state can only be 
>> retrieved
>> by the client once the job is running with a job/invocation id associated.
>> Indeed, any messages until that could be lost.
>>
>> For Flink the JobId is generated here:
>> https://github.com/apache/beam/blob/3db71dd9f6f32684903c54b15a5368991cd41f36/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java#L64
>>
>> I don't see any benefit of having two separate IDs, as the IDs are already
>> scoped by preparation and invocation phase.
>>
>> - Would it be possible to just pass the preparation id as the invocation id 
>> at
>> JobInvoker#invoke(..)?
>>
>> - Alternatively, we could have an additional prepare phase for JobInvoker to 
>> get
>> the job id for the invocation, before we start the job.
>>
>> Thanks,
>> Max
>>
>> On 14.01.19 12:39, Sam Rohde wrote:
>> > Hello all,
>> >
>> > While going through the codebase I noticed a problem with the Beam 
>> > JobService.
>> > In particular, the API allows for the possibility of never seeing some 
>> > messages
>> > or states with Get(State|Message)Stream. This is because the
>> > Get(State|Message)Stream calls need to have the job id which can only be
>> > obtained from the RunJobResponse. But in order to see all messages/states 
>> > the
>> > streams need to be opened before the job starts.
>> >
>> > This is fine in Dataflow as the preparation_id == job_id, but this is not 
>> > true
>> > in Flink. What do you all think of this? Am I misunderstanding something?
>> >
>> > Thanks,
>> > Sam
>> >

Reply via email to