On Tue, Aug 15, 2017 at 2:14 PM, Reuven Lax <re...@google.com.invalid> wrote:
> On Tue, Aug 15, 2017 at 1:59 PM, Robert Bradshaw <
> rober...@google.com.invalid> wrote:
>
>> On Sat, Aug 12, 2017 at 1:13 AM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>> > On Fri, Aug 11, 2017 at 10:52 PM, Robert Bradshaw <
>> >> The question here is whether the ordering is part of the "content" of
>> >> an iterable.
>> >
>> > My initial instinct was to say yes - but maybe it should not be until
>> Beam
>> > has a first-class notion of sorted values after a GBK?
>>
>> Yeah, I'm not sure on this either. Interestingly, if we consider
>> ordering to be important, than the composite gbk + ungroup will be
>> stable despite its components not being so.
>>
>> >> >> As I mention above, the iterable is semantically [part of] a single
>> >> >> element. So just to unpack this, to make sure we are talking about
>> the
>> >> same
>> >> >> thing, I think you are talking about GBK as implemented via GBKO +
>> GABW.
>> >> >>
>> >> >> When the output of GABW is required to be stable but the output of
>> GBKO
>> >> is
>> >> >> not stable, we don't have stability for free in all cases by
>> inserting a
>> >> >> GBK, but require something more to make the output of GABW stable, in
>> >> the
>> >> >> worst case a full materialization.
>> >> >>
>> >> >
>> >> > Correct. My point is that there are alternate, cheaper ways of doing
>> >> this.
>> >> > If GABW stores state in an ordered list, it can simply checkpoint a
>> >> market
>> >> > into that list to ensure that the output is stabl.
>> >>
>> >> In the presence of non-trivial triggering and/or late data, I'm not so
>> >> sure this is "easy." E.g. A bundle may fail, and more data may come in
>> >> from upstream (and get appended to the buffer) before it is retried.
>> >>
>> >
>> > That will still work. If the subsequent ParDo has processed the Iterable,
>> > that means we'll have successfully checkpointed a marker to the list
>> (using
>> > whatever technique the runner supports). More data coming in will get
>> > appended after the marker, so we can ensure that the retry still sees the
>> > same elements in the Iterable.
>>
>> I'm thinking of the following.
>>
>> 1. (k, v1) and (k, v2) come into the GABW and [v1, v2] gets stored in
>> the state. A trigger gets set.
>> 2. The trigger is fired and (k, [v1, v2]) gets sent downstream, but
>> for some reason fails.
>> 3. (k, v3) comes into the GABW and [v3] gets appended to the state.
>> 4. The trigger is again fired, and this time (k, [v1, v2, v3]) is sent
>> downstream.
>>
>>
> If you add the annotation specifying stableinput, then we will not do this.
> Before we send anything downstream, we will add a marker to the list, and
> only forward data downstream once the marker has been checkpointed. This
> adds a bit of cost and latency of course, but the assumption is that adding
> this annotation will always add some cost.

I don't think you can checkpoint anything "before sending data
downstream" if its being executed as part of a fused graph, unless we
add special support for this in the Fn API. I suppose the runner could
pre-emptively modify the state of any GABW operations before firing
triggers...

>> It is unclear when a marker would be added to the list. Is this in
>> step 2 which, despite failing, still result in modified state [v1, v2,
>> marker]? (And this state modification would have to be committed
>> before attempting the bundle, in case the "failure" was something like
>> a VM shutdown.) And only on success the state is modified to be (say
>> this is accumulating mode) [v1, v2]?
>>
>> I think it could be done, but it may significantly complicate things.
>>

Reply via email to