On Sat, Aug 12, 2017 at 1:13 AM, Reuven Lax <[email protected]> wrote:
> On Fri, Aug 11, 2017 at 10:52 PM, Robert Bradshaw <
>> The question here is whether the ordering is part of the "content" of
>> an iterable.
>
> My initial instinct was to say yes - but maybe it should not be until Beam
> has a first-class notion of sorted values after a GBK?

Yeah, I'm not sure on this either. Interestingly, if we consider
ordering to be important, than the composite gbk + ungroup will be
stable despite its components not being so.

>> >> As I mention above, the iterable is semantically [part of] a single
>> >> element. So just to unpack this, to make sure we are talking about the
>> same
>> >> thing, I think you are talking about GBK as implemented via GBKO + GABW.
>> >>
>> >> When the output of GABW is required to be stable but the output of GBKO
>> is
>> >> not stable, we don't have stability for free in all cases by inserting a
>> >> GBK, but require something more to make the output of GABW stable, in
>> the
>> >> worst case a full materialization.
>> >>
>> >
>> > Correct. My point is that there are alternate, cheaper ways of doing
>> this.
>> > If GABW stores state in an ordered list, it can simply checkpoint a
>> market
>> > into that list to ensure that the output is stabl.
>>
>> In the presence of non-trivial triggering and/or late data, I'm not so
>> sure this is "easy." E.g. A bundle may fail, and more data may come in
>> from upstream (and get appended to the buffer) before it is retried.
>>
>
> That will still work. If the subsequent ParDo has processed the Iterable,
> that means we'll have successfully checkpointed a marker to the list (using
> whatever technique the runner supports). More data coming in will get
> appended after the marker, so we can ensure that the retry still sees the
> same elements in the Iterable.

I'm thinking of the following.

1. (k, v1) and (k, v2) come into the GABW and [v1, v2] gets stored in
the state. A trigger gets set.
2. The trigger is fired and (k, [v1, v2]) gets sent downstream, but
for some reason fails.
3. (k, v3) comes into the GABW and [v3] gets appended to the state.
4. The trigger is again fired, and this time (k, [v1, v2, v3]) is sent
downstream.

It is unclear when a marker would be added to the list. Is this in
step 2 which, despite failing, still result in modified state [v1, v2,
marker]? (And this state modification would have to be committed
before attempting the bundle, in case the "failure" was something like
a VM shutdown.) And only on success the state is modified to be (say
this is accumulating mode) [v1, v2]?

I think it could be done, but it may significantly complicate things.

Reply via email to