I don't think it really makes sense to to do this on Combine. And I agree with you, it doesn't make sense on composites either.
On Thu, Aug 10, 2017 at 9:19 AM, Scott Wegner <sweg...@google.com.invalid> wrote: > Does requires-stable-input only apply to ParDo transforms? > > I don't think it would make sense to annotate to composite, because > checkpointing should happen as close to the side-effecting operation as > possible, since upstream transforms within a composite could introduce > non-determinism. So it's the primitive transform that should own the > requirement. > > Are there other primitives that should be annotated? 'Combine' is > interesting because it optimized in Dataflow (and perhaps other runners) to > partially apply before a GroupByKey. > > On Thu, Aug 10, 2017 at 9:01 AM Tyler Akidau <taki...@google.com.invalid> > wrote: > > > +1 to the annotation idea, and to having it on processTimer. > > > > -Tyler > > > > On Thu, Aug 10, 2017 at 2:15 AM Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > > > +1 to the annotation approach. I outlined how implementing this would > > work > > > in the Flink runner in the Thread about the exactly-once Kafka Sink. > > > > > > > On 9. Aug 2017, at 23:03, Reuven Lax <re...@google.com.INVALID> > wrote: > > > > > > > > Yes - I don't think we should try and make any deterministic > guarantees > > > > about what is in a bundle. Stability guarantees are per element only. > > > > > > > > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh <tg...@google.com.invalid > > > > > > wrote: > > > > > > > >> +1 to the annotation-on-ProcessElement approach. ProcessElement is > the > > > >> minimum implementation requirement of a DoFn, and should be where > the > > > >> processing logic which depends on characteristics of the inputs lie. > > > It's a > > > >> good way of signalling the requirements of the Fn, and letting the > > > runner > > > >> decide. > > > >> > > > >> I have a minor concern that this may not work as expected for users > > that > > > >> try to batch remote calls in `FinishBundle` - we should make sure we > > > >> document that it is explicitly the input elements that will be > > replayed, > > > >> and bundles and other operational are still arbitrary. > > > >> > > > >> > > > >> > > > >> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax > <re...@google.com.invalid > > > > > > >> wrote: > > > >> > > > >>> I think deterministic here means deterministically replayable. i.e. > > no > > > >>> matter how many times the element is retried, it will always be the > > > same. > > > >>> > > > >>> I think we should also allow specifying this on processTimer. This > > > would > > > >>> mean that any keyed state written in a previous processElement must > > be > > > >>> guaranteed durable before processTimer is called. > > > >>> > > > >>> > > > >>> On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers < > bchamb...@apache.org> > > > >>> wrote: > > > >>> > > > >>>> I strongly agree with this proposal. I think moving away from > "just > > > >>> insert > > > >>>> a GroupByKey for one of the 3 different reasons you may want it" > > > >> towards > > > >>>> APIs that allow code to express the requirements they have and the > > > >> runner > > > >>>> to choose the best way to meet this is a major step forwards in > > terms > > > >> of > > > >>>> portability. > > > >>>> > > > >>>> I think "deterministic" may be misleading. The actual contents of > > the > > > >>>> collection aren't deterministic if upstream computations aren't. > The > > > >>>> property we really need is that once an input may have been > observed > > > by > > > >>> the > > > >>>> side-effecting code it will never be observed with a different > > value. > > > >>>> > > > >>>> I would propose something RequiresStableInput, to indicate that > the > > > >> input > > > >>>> must be stable as observed by the function. I could also see > > something > > > >>>> hinting at the fact we don't recompute the input, such as > > > >>>> RequiresMemoizedInput or RequiresNoRecomputation. > > > >>>> > > > >>>> -- Ben > > > >>>> > > > >>>> P.S For anyone interested other uses of GroupByKey that we may > want > > to > > > >>>> discuss APIs for would be preventing retry across steps (eg., > > > >> preventing > > > >>>> fusion) and redistributing inputs across workers. > > > >>>> > > > >>>> On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles > > <k...@google.com.invalid > > > >>> > > > >>>> wrote: > > > >>>> > > > >>>>> This came up again, so I wanted to push it along by proposing a > > > >>> specific > > > >>>>> API for Java that could have a derived API in Python. I am > writing > > > >> this > > > >>>>> quickly to get something out there, so I welcome suggestions for > > > >>>> revision. > > > >>>>> > > > >>>>> Today a DoFn has a @ProcessElement annotated method with various > > > >>>> automated > > > >>>>> parameters, but most fundamentally this: > > > >>>>> > > > >>>>> @ProcessElement > > > >>>>> public void process(ProcessContext ctx) { > > > >>>>> ctx.element() // to access the current input element > > > >>>>> ctx.output(something) // to write to default output collection > > > >>>>> ctx.output(tag, something) // to write to other output > collections > > > >>>>> } > > > >>>>> > > > >>>>> For some time, we have hoped to unpack the context - it is a > > > >>>>> backwards-compatibility pattern made obsolete by the new DoFn > > design. > > > >>> So > > > >>>>> instead it would look like this: > > > >>>>> > > > >>>>> @ProcessElement > > > >>>>> public void process(Element element, MainOutput mainOutput, ...) > { > > > >>>>> element.get() // to access the current input element > > > >>>>> mainOutput.output(something) // to write to the default output > > > >>>> collection > > > >>>>> other.output(something) // to write to other output collection > > > >>>>> } > > > >>>>> > > > >>>>> I've deliberately left out the undecided syntax for side outputs. > > But > > > >>> it > > > >>>>> would be nice for the tag to be built in to the parameter so it > > > >> doesn't > > > >>>>> have to be used when calling output(). > > > >>>>> > > > >>>>> One way to enhance this to deterministic input would just be > this: > > > >>>>> > > > >>>>> @ProcessElement > > > >>>>> @RequiresDeterministicInput > > > >>>>> public void process(Element element, MainOutput mainOutput, ...) > { > > > >>>>> element.get() // to access the current input element > > > >>>>> mainOutput.output(something) // to write to the default output > > > >>>> collection > > > >>>>> other.output(something) // to write to other output collection > > > >>>>> } > > > >>>>> > > > >>>>> There are really a lot of places where we could put an annotation > > or > > > >>>> change > > > >>>>> a type to indicate that the input PCollection should be > > > >>>>> well-defined/deterministically-replayable. I don't have a really > > > >>> strong > > > >>>>> opinion. > > > >>>>> > > > >>>>> Kenn > > > >>>>> > > > >>>>> On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers > > > >>>> <bchamb...@google.com.invalid > > > >>>>>> > > > >>>>> wrote: > > > >>>>> > > > >>>>>> Allowing an annotation on DoFn's that produce deterministic > output > > > >>>> could > > > >>>>> be > > > >>>>>> added in the future, but doesn't seem like a great option. > > > >>>>>> > > > >>>>>> 1. It is a correctness issue to assume a DoFn is deterministic > and > > > >> be > > > >>>>>> wrong, so we would need to assume all transform outputs are > > > >>>>>> non-deterministic unless annotated. Getting this correct is > > > >> difficult > > > >>>>> (for > > > >>>>>> example, GBK is surprisingly non-deterministic except in > specific > > > >>>> cases). > > > >>>>>> > > > >>>>>> 2. It is unlikely to be a major performance improvement, given > > that > > > >>> any > > > >>>>>> non-deterministic transform prior to a sink (which are most > likely > > > >> to > > > >>>>>> require deterministic input) will cause additional work to be > > > >> needed. > > > >>>>>> > > > >>>>>> Based on this, it seems like the risk of allowing an annotation > is > > > >>> high > > > >>>>>> while the potential for performance improvements is low. The > > > >> current > > > >>>>>> proposal (not allowing an annotation) makes sense for now, until > > we > > > >>> can > > > >>>>>> demonstrate that the impact on performance is high in cases that > > > >>> could > > > >>>> be > > > >>>>>> avoided with an annotation (in real-world use). > > > >>>>>> > > > >>>>>> -- Ben > > > >>>>>> > > > >>>>>> On Tue, Mar 21, 2017 at 2:05 PM vikas rk <vikky...@gmail.com> > > > >> wrote: > > > >>>>>> > > > >>>>>> +1 for the general idea of runners handling it over hard-coded > > > >>>>>> implementation strategy. > > > >>>>>> > > > >>>>>> For the Write transform I believe you are talking about > > > >>>> ApplyShardingKey > > > >>>>>> < > > > >>>>>> > > https://github.com/apache/beam/blob/d66029cafde152c0a46ebd276ddfa4 > > > >>>>>> c3e7fd3433/sdks/java/core/src/main/java/org/apache/beam/sdk/ > > > >>>>>> io/Write.java#L304 > > > >>>>>>> > > > >>>>>> which > > > >>>>>> introduces non deterministic behavior when retried? > > > >>>>>> > > > >>>>>> > > > >>>>>> *Let a DoFn declare (mechanism not important right now) that it > > > >>>>>> "requiresdeterministic input"* > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> *Each runner will need a way to induce deterministic input - the > > > >>>>>> obviouschoice being a materialization.* > > > >>>>>> > > > >>>>>> Does this mean that a runner will always materialize (or > whatever > > > >> the > > > >>>>>> strategy is) an input PCollection to this DoFn even though the > > > >>>>> PCollection > > > >>>>>> might have been produced by deterministic transforms? Would it > > make > > > >>>> sense > > > >>>>>> to also let DoFns declare if they produce non-deterministic > > output? > > > >>>>>> > > > >>>>>> -Vikas > > > >>>>>> > > > >>>>>> > > > >>>>>> On 21 March 2017 at 13:52, Stephen Sisk <s...@google.com.invalid > > > > > >>>> wrote: > > > >>>>>> > > > >>>>>>> Hey Kenn- > > > >>>>>>> > > > >>>>>>> this seems important, but I don't have all the context on what > > > >> the > > > >>>>>> problem > > > >>>>>>> is. > > > >>>>>>> > > > >>>>>>> Can you explain this sentence "Specifically, there is > > > >> pseudorandom > > > >>>> data > > > >>>>>>> generated and once it has been observed and used to produce a > > > >> side > > > >>>>>> effect, > > > >>>>>>> it cannot be regenerated without erroneous results." ? > > > >>>>>>> > > > >>>>>>> Where is the pseudorandom data coming from? Perhaps a concrete > > > >>>> example > > > >>>>>>> would help? > > > >>>>>>> > > > >>>>>>> S > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles > > > >>>> <k...@google.com.invalid > > > >>>>>> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>>> Problem: > > > >>>>>>>> > > > >>>>>>>> I will drop all nuance and say that the `Write` transform as > it > > > >>>>> exists > > > >>>>>> in > > > >>>>>>>> the SDK is incorrect until we add some specification and APIs. > > > >> We > > > >>>>> can't > > > >>>>>>>> keep shipping an SDK with an unsafe transform in it, and IMO > > > >> this > > > >>>>>>> certainly > > > >>>>>>>> blocks a stable release. > > > >>>>>>>> > > > >>>>>>>> Specifically, there is pseudorandom data generated and once it > > > >>> has > > > >>>>> been > > > >>>>>>>> observed and used to produce a side effect, it cannot be > > > >>>> regenerated > > > >>>>>>>> without erroneous results. > > > >>>>>>>> > > > >>>>>>>> This generalizes: For some side-effecting user-defined > > > >> functions, > > > >>>> it > > > >>>>> is > > > >>>>>>>> vital that even across retries/replays they have a consistent > > > >>> view > > > >>>> of > > > >>>>>> the > > > >>>>>>>> contents of their input PCollection, because their effect on > > > >> the > > > >>>>>> outside > > > >>>>>>>> world cannot be retracted if/when they fail and are retried. > > > >> Once > > > >>>> the > > > >>>>>>>> runner ensures a consistent view of the input, it is then > their > > > >>> own > > > >>>>>>>> responsibility to be idempotent. > > > >>>>>>>> > > > >>>>>>>> Ideally we should specify this requirement for the > user-defined > > > >>>>>> function > > > >>>>>>>> without imposing any particular implementation strategy on > Beam > > > >>>>>> runners. > > > >>>>>>>> > > > >>>>>>>> Proposal: > > > >>>>>>>> > > > >>>>>>>> 1. Let a DoFn declare (mechanism not important right now) that > > > >> it > > > >>>>>>> "requires > > > >>>>>>>> deterministic input". > > > >>>>>>>> > > > >>>>>>>> 2. Each runner will need a way to induce deterministic input - > > > >>> the > > > >>>>>>> obvious > > > >>>>>>>> choice being a materialization. > > > >>>>>>>> > > > >>>>>>>> I want to keep the discussion focused, so I'm leaving out any > > > >>>>>>> possibilities > > > >>>>>>>> of taking this further. > > > >>>>>>>> > > > >>>>>>>> Regarding performance: Today places that require this tend to > > > >> be > > > >>>>>> already > > > >>>>>>>> paying the cost via GroupByKey / Reshuffle operations, since > > > >> that > > > >>>>> was a > > > >>>>>>>> simple way to induce determinism in batch Dataflow* (doesn't > > > >> work > > > >>>> for > > > >>>>>>> most > > > >>>>>>>> other runners nor for streaming Dataflow). This change will > > > >>>> replace a > > > >>>>>>>> hard-coded implementation strategy with a requirement that may > > > >> be > > > >>>>>>> fulfilled > > > >>>>>>>> in the most efficient way available. > > > >>>>>>>> > > > >>>>>>>> Thoughts? > > > >>>>>>>> > > > >>>>>>>> Kenn (w/ lots of consult from colleagues, especially Ben) > > > >>>>>>>> > > > >>>>>>>> * There is some overlap with the reshuffle/redistribute > > > >>> discussion > > > >>>>>>> because > > > >>>>>>>> of this historical situation, but I would like to leave that > > > >>>> broader > > > >>>>>>>> discussion out of this correctness issue. > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >>> > > > >> > > > > > > > > >