+1 to the annotation idea, and to having it on processTimer. -Tyler
On Thu, Aug 10, 2017 at 2:15 AM Aljoscha Krettek <aljos...@apache.org> wrote: > +1 to the annotation approach. I outlined how implementing this would work > in the Flink runner in the Thread about the exactly-once Kafka Sink. > > > On 9. Aug 2017, at 23:03, Reuven Lax <re...@google.com.INVALID> wrote: > > > > Yes - I don't think we should try and make any deterministic guarantees > > about what is in a bundle. Stability guarantees are per element only. > > > > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh <tg...@google.com.invalid> > > wrote: > > > >> +1 to the annotation-on-ProcessElement approach. ProcessElement is the > >> minimum implementation requirement of a DoFn, and should be where the > >> processing logic which depends on characteristics of the inputs lie. > It's a > >> good way of signalling the requirements of the Fn, and letting the > runner > >> decide. > >> > >> I have a minor concern that this may not work as expected for users that > >> try to batch remote calls in `FinishBundle` - we should make sure we > >> document that it is explicitly the input elements that will be replayed, > >> and bundles and other operational are still arbitrary. > >> > >> > >> > >> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax <re...@google.com.invalid> > >> wrote: > >> > >>> I think deterministic here means deterministically replayable. i.e. no > >>> matter how many times the element is retried, it will always be the > same. > >>> > >>> I think we should also allow specifying this on processTimer. This > would > >>> mean that any keyed state written in a previous processElement must be > >>> guaranteed durable before processTimer is called. > >>> > >>> > >>> On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers <bchamb...@apache.org> > >>> wrote: > >>> > >>>> I strongly agree with this proposal. I think moving away from "just > >>> insert > >>>> a GroupByKey for one of the 3 different reasons you may want it" > >> towards > >>>> APIs that allow code to express the requirements they have and the > >> runner > >>>> to choose the best way to meet this is a major step forwards in terms > >> of > >>>> portability. > >>>> > >>>> I think "deterministic" may be misleading. The actual contents of the > >>>> collection aren't deterministic if upstream computations aren't. The > >>>> property we really need is that once an input may have been observed > by > >>> the > >>>> side-effecting code it will never be observed with a different value. > >>>> > >>>> I would propose something RequiresStableInput, to indicate that the > >> input > >>>> must be stable as observed by the function. I could also see something > >>>> hinting at the fact we don't recompute the input, such as > >>>> RequiresMemoizedInput or RequiresNoRecomputation. > >>>> > >>>> -- Ben > >>>> > >>>> P.S For anyone interested other uses of GroupByKey that we may want to > >>>> discuss APIs for would be preventing retry across steps (eg., > >> preventing > >>>> fusion) and redistributing inputs across workers. > >>>> > >>>> On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles <k...@google.com.invalid > >>> > >>>> wrote: > >>>> > >>>>> This came up again, so I wanted to push it along by proposing a > >>> specific > >>>>> API for Java that could have a derived API in Python. I am writing > >> this > >>>>> quickly to get something out there, so I welcome suggestions for > >>>> revision. > >>>>> > >>>>> Today a DoFn has a @ProcessElement annotated method with various > >>>> automated > >>>>> parameters, but most fundamentally this: > >>>>> > >>>>> @ProcessElement > >>>>> public void process(ProcessContext ctx) { > >>>>> ctx.element() // to access the current input element > >>>>> ctx.output(something) // to write to default output collection > >>>>> ctx.output(tag, something) // to write to other output collections > >>>>> } > >>>>> > >>>>> For some time, we have hoped to unpack the context - it is a > >>>>> backwards-compatibility pattern made obsolete by the new DoFn design. > >>> So > >>>>> instead it would look like this: > >>>>> > >>>>> @ProcessElement > >>>>> public void process(Element element, MainOutput mainOutput, ...) { > >>>>> element.get() // to access the current input element > >>>>> mainOutput.output(something) // to write to the default output > >>>> collection > >>>>> other.output(something) // to write to other output collection > >>>>> } > >>>>> > >>>>> I've deliberately left out the undecided syntax for side outputs. But > >>> it > >>>>> would be nice for the tag to be built in to the parameter so it > >> doesn't > >>>>> have to be used when calling output(). > >>>>> > >>>>> One way to enhance this to deterministic input would just be this: > >>>>> > >>>>> @ProcessElement > >>>>> @RequiresDeterministicInput > >>>>> public void process(Element element, MainOutput mainOutput, ...) { > >>>>> element.get() // to access the current input element > >>>>> mainOutput.output(something) // to write to the default output > >>>> collection > >>>>> other.output(something) // to write to other output collection > >>>>> } > >>>>> > >>>>> There are really a lot of places where we could put an annotation or > >>>> change > >>>>> a type to indicate that the input PCollection should be > >>>>> well-defined/deterministically-replayable. I don't have a really > >>> strong > >>>>> opinion. > >>>>> > >>>>> Kenn > >>>>> > >>>>> On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers > >>>> <bchamb...@google.com.invalid > >>>>>> > >>>>> wrote: > >>>>> > >>>>>> Allowing an annotation on DoFn's that produce deterministic output > >>>> could > >>>>> be > >>>>>> added in the future, but doesn't seem like a great option. > >>>>>> > >>>>>> 1. It is a correctness issue to assume a DoFn is deterministic and > >> be > >>>>>> wrong, so we would need to assume all transform outputs are > >>>>>> non-deterministic unless annotated. Getting this correct is > >> difficult > >>>>> (for > >>>>>> example, GBK is surprisingly non-deterministic except in specific > >>>> cases). > >>>>>> > >>>>>> 2. It is unlikely to be a major performance improvement, given that > >>> any > >>>>>> non-deterministic transform prior to a sink (which are most likely > >> to > >>>>>> require deterministic input) will cause additional work to be > >> needed. > >>>>>> > >>>>>> Based on this, it seems like the risk of allowing an annotation is > >>> high > >>>>>> while the potential for performance improvements is low. The > >> current > >>>>>> proposal (not allowing an annotation) makes sense for now, until we > >>> can > >>>>>> demonstrate that the impact on performance is high in cases that > >>> could > >>>> be > >>>>>> avoided with an annotation (in real-world use). > >>>>>> > >>>>>> -- Ben > >>>>>> > >>>>>> On Tue, Mar 21, 2017 at 2:05 PM vikas rk <vikky...@gmail.com> > >> wrote: > >>>>>> > >>>>>> +1 for the general idea of runners handling it over hard-coded > >>>>>> implementation strategy. > >>>>>> > >>>>>> For the Write transform I believe you are talking about > >>>> ApplyShardingKey > >>>>>> < > >>>>>> https://github.com/apache/beam/blob/d66029cafde152c0a46ebd276ddfa4 > >>>>>> c3e7fd3433/sdks/java/core/src/main/java/org/apache/beam/sdk/ > >>>>>> io/Write.java#L304 > >>>>>>> > >>>>>> which > >>>>>> introduces non deterministic behavior when retried? > >>>>>> > >>>>>> > >>>>>> *Let a DoFn declare (mechanism not important right now) that it > >>>>>> "requiresdeterministic input"* > >>>>>> > >>>>>> > >>>>>> > >>>>>> *Each runner will need a way to induce deterministic input - the > >>>>>> obviouschoice being a materialization.* > >>>>>> > >>>>>> Does this mean that a runner will always materialize (or whatever > >> the > >>>>>> strategy is) an input PCollection to this DoFn even though the > >>>>> PCollection > >>>>>> might have been produced by deterministic transforms? Would it make > >>>> sense > >>>>>> to also let DoFns declare if they produce non-deterministic output? > >>>>>> > >>>>>> -Vikas > >>>>>> > >>>>>> > >>>>>> On 21 March 2017 at 13:52, Stephen Sisk <s...@google.com.invalid> > >>>> wrote: > >>>>>> > >>>>>>> Hey Kenn- > >>>>>>> > >>>>>>> this seems important, but I don't have all the context on what > >> the > >>>>>> problem > >>>>>>> is. > >>>>>>> > >>>>>>> Can you explain this sentence "Specifically, there is > >> pseudorandom > >>>> data > >>>>>>> generated and once it has been observed and used to produce a > >> side > >>>>>> effect, > >>>>>>> it cannot be regenerated without erroneous results." ? > >>>>>>> > >>>>>>> Where is the pseudorandom data coming from? Perhaps a concrete > >>>> example > >>>>>>> would help? > >>>>>>> > >>>>>>> S > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles > >>>> <k...@google.com.invalid > >>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Problem: > >>>>>>>> > >>>>>>>> I will drop all nuance and say that the `Write` transform as it > >>>>> exists > >>>>>> in > >>>>>>>> the SDK is incorrect until we add some specification and APIs. > >> We > >>>>> can't > >>>>>>>> keep shipping an SDK with an unsafe transform in it, and IMO > >> this > >>>>>>> certainly > >>>>>>>> blocks a stable release. > >>>>>>>> > >>>>>>>> Specifically, there is pseudorandom data generated and once it > >>> has > >>>>> been > >>>>>>>> observed and used to produce a side effect, it cannot be > >>>> regenerated > >>>>>>>> without erroneous results. > >>>>>>>> > >>>>>>>> This generalizes: For some side-effecting user-defined > >> functions, > >>>> it > >>>>> is > >>>>>>>> vital that even across retries/replays they have a consistent > >>> view > >>>> of > >>>>>> the > >>>>>>>> contents of their input PCollection, because their effect on > >> the > >>>>>> outside > >>>>>>>> world cannot be retracted if/when they fail and are retried. > >> Once > >>>> the > >>>>>>>> runner ensures a consistent view of the input, it is then their > >>> own > >>>>>>>> responsibility to be idempotent. > >>>>>>>> > >>>>>>>> Ideally we should specify this requirement for the user-defined > >>>>>> function > >>>>>>>> without imposing any particular implementation strategy on Beam > >>>>>> runners. > >>>>>>>> > >>>>>>>> Proposal: > >>>>>>>> > >>>>>>>> 1. Let a DoFn declare (mechanism not important right now) that > >> it > >>>>>>> "requires > >>>>>>>> deterministic input". > >>>>>>>> > >>>>>>>> 2. Each runner will need a way to induce deterministic input - > >>> the > >>>>>>> obvious > >>>>>>>> choice being a materialization. > >>>>>>>> > >>>>>>>> I want to keep the discussion focused, so I'm leaving out any > >>>>>>> possibilities > >>>>>>>> of taking this further. > >>>>>>>> > >>>>>>>> Regarding performance: Today places that require this tend to > >> be > >>>>>> already > >>>>>>>> paying the cost via GroupByKey / Reshuffle operations, since > >> that > >>>>> was a > >>>>>>>> simple way to induce determinism in batch Dataflow* (doesn't > >> work > >>>> for > >>>>>>> most > >>>>>>>> other runners nor for streaming Dataflow). This change will > >>>> replace a > >>>>>>>> hard-coded implementation strategy with a requirement that may > >> be > >>>>>>> fulfilled > >>>>>>>> in the most efficient way available. > >>>>>>>> > >>>>>>>> Thoughts? > >>>>>>>> > >>>>>>>> Kenn (w/ lots of consult from colleagues, especially Ben) > >>>>>>>> > >>>>>>>> * There is some overlap with the reshuffle/redistribute > >>> discussion > >>>>>>> because > >>>>>>>> of this historical situation, but I would like to leave that > >>>> broader > >>>>>>>> discussion out of this correctness issue. > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > >