Well the Fn API is still being designed, so this is something we'd have to think about.
On Tue, Aug 15, 2017 at 2:19 PM, Robert Bradshaw < rober...@google.com.invalid> wrote: > On Tue, Aug 15, 2017 at 2:14 PM, Reuven Lax <re...@google.com.invalid> > wrote: > > On Tue, Aug 15, 2017 at 1:59 PM, Robert Bradshaw < > > rober...@google.com.invalid> wrote: > > > >> On Sat, Aug 12, 2017 at 1:13 AM, Reuven Lax <re...@google.com.invalid> > >> wrote: > >> > On Fri, Aug 11, 2017 at 10:52 PM, Robert Bradshaw < > >> >> The question here is whether the ordering is part of the "content" of > >> >> an iterable. > >> > > >> > My initial instinct was to say yes - but maybe it should not be until > >> Beam > >> > has a first-class notion of sorted values after a GBK? > >> > >> Yeah, I'm not sure on this either. Interestingly, if we consider > >> ordering to be important, than the composite gbk + ungroup will be > >> stable despite its components not being so. > >> > >> >> >> As I mention above, the iterable is semantically [part of] a > single > >> >> >> element. So just to unpack this, to make sure we are talking about > >> the > >> >> same > >> >> >> thing, I think you are talking about GBK as implemented via GBKO + > >> GABW. > >> >> >> > >> >> >> When the output of GABW is required to be stable but the output of > >> GBKO > >> >> is > >> >> >> not stable, we don't have stability for free in all cases by > >> inserting a > >> >> >> GBK, but require something more to make the output of GABW > stable, in > >> >> the > >> >> >> worst case a full materialization. > >> >> >> > >> >> > > >> >> > Correct. My point is that there are alternate, cheaper ways of > doing > >> >> this. > >> >> > If GABW stores state in an ordered list, it can simply checkpoint a > >> >> market > >> >> > into that list to ensure that the output is stabl. > >> >> > >> >> In the presence of non-trivial triggering and/or late data, I'm not > so > >> >> sure this is "easy." E.g. A bundle may fail, and more data may come > in > >> >> from upstream (and get appended to the buffer) before it is retried. > >> >> > >> > > >> > That will still work. If the subsequent ParDo has processed the > Iterable, > >> > that means we'll have successfully checkpointed a marker to the list > >> (using > >> > whatever technique the runner supports). More data coming in will get > >> > appended after the marker, so we can ensure that the retry still sees > the > >> > same elements in the Iterable. > >> > >> I'm thinking of the following. > >> > >> 1. (k, v1) and (k, v2) come into the GABW and [v1, v2] gets stored in > >> the state. A trigger gets set. > >> 2. The trigger is fired and (k, [v1, v2]) gets sent downstream, but > >> for some reason fails. > >> 3. (k, v3) comes into the GABW and [v3] gets appended to the state. > >> 4. The trigger is again fired, and this time (k, [v1, v2, v3]) is sent > >> downstream. > >> > >> > > If you add the annotation specifying stableinput, then we will not do > this. > > Before we send anything downstream, we will add a marker to the list, and > > only forward data downstream once the marker has been checkpointed. This > > adds a bit of cost and latency of course, but the assumption is that > adding > > this annotation will always add some cost. > > I don't think you can checkpoint anything "before sending data > downstream" if its being executed as part of a fused graph, unless we > add special support for this in the Fn API. I suppose the runner could > pre-emptively modify the state of any GABW operations before firing > triggers... > > >> It is unclear when a marker would be added to the list. Is this in > >> step 2 which, despite failing, still result in modified state [v1, v2, > >> marker]? (And this state modification would have to be committed > >> before attempting the bundle, in case the "failure" was something like > >> a VM shutdown.) And only on success the state is modified to be (say > >> this is accumulating mode) [v1, v2]? > >> > >> I think it could be done, but it may significantly complicate things. > >> >