+1 to the annotation approach. I outlined how implementing this would work in the Flink runner in the Thread about the exactly-once Kafka Sink.
> On 9. Aug 2017, at 23:03, Reuven Lax <re...@google.com.INVALID> wrote: > > Yes - I don't think we should try and make any deterministic guarantees > about what is in a bundle. Stability guarantees are per element only. > > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh <tg...@google.com.invalid> > wrote: > >> +1 to the annotation-on-ProcessElement approach. ProcessElement is the >> minimum implementation requirement of a DoFn, and should be where the >> processing logic which depends on characteristics of the inputs lie. It's a >> good way of signalling the requirements of the Fn, and letting the runner >> decide. >> >> I have a minor concern that this may not work as expected for users that >> try to batch remote calls in `FinishBundle` - we should make sure we >> document that it is explicitly the input elements that will be replayed, >> and bundles and other operational are still arbitrary. >> >> >> >> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax <re...@google.com.invalid> >> wrote: >> >>> I think deterministic here means deterministically replayable. i.e. no >>> matter how many times the element is retried, it will always be the same. >>> >>> I think we should also allow specifying this on processTimer. This would >>> mean that any keyed state written in a previous processElement must be >>> guaranteed durable before processTimer is called. >>> >>> >>> On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers <bchamb...@apache.org> >>> wrote: >>> >>>> I strongly agree with this proposal. I think moving away from "just >>> insert >>>> a GroupByKey for one of the 3 different reasons you may want it" >> towards >>>> APIs that allow code to express the requirements they have and the >> runner >>>> to choose the best way to meet this is a major step forwards in terms >> of >>>> portability. >>>> >>>> I think "deterministic" may be misleading. The actual contents of the >>>> collection aren't deterministic if upstream computations aren't. The >>>> property we really need is that once an input may have been observed by >>> the >>>> side-effecting code it will never be observed with a different value. >>>> >>>> I would propose something RequiresStableInput, to indicate that the >> input >>>> must be stable as observed by the function. I could also see something >>>> hinting at the fact we don't recompute the input, such as >>>> RequiresMemoizedInput or RequiresNoRecomputation. >>>> >>>> -- Ben >>>> >>>> P.S For anyone interested other uses of GroupByKey that we may want to >>>> discuss APIs for would be preventing retry across steps (eg., >> preventing >>>> fusion) and redistributing inputs across workers. >>>> >>>> On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles <k...@google.com.invalid >>> >>>> wrote: >>>> >>>>> This came up again, so I wanted to push it along by proposing a >>> specific >>>>> API for Java that could have a derived API in Python. I am writing >> this >>>>> quickly to get something out there, so I welcome suggestions for >>>> revision. >>>>> >>>>> Today a DoFn has a @ProcessElement annotated method with various >>>> automated >>>>> parameters, but most fundamentally this: >>>>> >>>>> @ProcessElement >>>>> public void process(ProcessContext ctx) { >>>>> ctx.element() // to access the current input element >>>>> ctx.output(something) // to write to default output collection >>>>> ctx.output(tag, something) // to write to other output collections >>>>> } >>>>> >>>>> For some time, we have hoped to unpack the context - it is a >>>>> backwards-compatibility pattern made obsolete by the new DoFn design. >>> So >>>>> instead it would look like this: >>>>> >>>>> @ProcessElement >>>>> public void process(Element element, MainOutput mainOutput, ...) { >>>>> element.get() // to access the current input element >>>>> mainOutput.output(something) // to write to the default output >>>> collection >>>>> other.output(something) // to write to other output collection >>>>> } >>>>> >>>>> I've deliberately left out the undecided syntax for side outputs. But >>> it >>>>> would be nice for the tag to be built in to the parameter so it >> doesn't >>>>> have to be used when calling output(). >>>>> >>>>> One way to enhance this to deterministic input would just be this: >>>>> >>>>> @ProcessElement >>>>> @RequiresDeterministicInput >>>>> public void process(Element element, MainOutput mainOutput, ...) { >>>>> element.get() // to access the current input element >>>>> mainOutput.output(something) // to write to the default output >>>> collection >>>>> other.output(something) // to write to other output collection >>>>> } >>>>> >>>>> There are really a lot of places where we could put an annotation or >>>> change >>>>> a type to indicate that the input PCollection should be >>>>> well-defined/deterministically-replayable. I don't have a really >>> strong >>>>> opinion. >>>>> >>>>> Kenn >>>>> >>>>> On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers >>>> <bchamb...@google.com.invalid >>>>>> >>>>> wrote: >>>>> >>>>>> Allowing an annotation on DoFn's that produce deterministic output >>>> could >>>>> be >>>>>> added in the future, but doesn't seem like a great option. >>>>>> >>>>>> 1. It is a correctness issue to assume a DoFn is deterministic and >> be >>>>>> wrong, so we would need to assume all transform outputs are >>>>>> non-deterministic unless annotated. Getting this correct is >> difficult >>>>> (for >>>>>> example, GBK is surprisingly non-deterministic except in specific >>>> cases). >>>>>> >>>>>> 2. It is unlikely to be a major performance improvement, given that >>> any >>>>>> non-deterministic transform prior to a sink (which are most likely >> to >>>>>> require deterministic input) will cause additional work to be >> needed. >>>>>> >>>>>> Based on this, it seems like the risk of allowing an annotation is >>> high >>>>>> while the potential for performance improvements is low. The >> current >>>>>> proposal (not allowing an annotation) makes sense for now, until we >>> can >>>>>> demonstrate that the impact on performance is high in cases that >>> could >>>> be >>>>>> avoided with an annotation (in real-world use). >>>>>> >>>>>> -- Ben >>>>>> >>>>>> On Tue, Mar 21, 2017 at 2:05 PM vikas rk <vikky...@gmail.com> >> wrote: >>>>>> >>>>>> +1 for the general idea of runners handling it over hard-coded >>>>>> implementation strategy. >>>>>> >>>>>> For the Write transform I believe you are talking about >>>> ApplyShardingKey >>>>>> < >>>>>> https://github.com/apache/beam/blob/d66029cafde152c0a46ebd276ddfa4 >>>>>> c3e7fd3433/sdks/java/core/src/main/java/org/apache/beam/sdk/ >>>>>> io/Write.java#L304 >>>>>>> >>>>>> which >>>>>> introduces non deterministic behavior when retried? >>>>>> >>>>>> >>>>>> *Let a DoFn declare (mechanism not important right now) that it >>>>>> "requiresdeterministic input"* >>>>>> >>>>>> >>>>>> >>>>>> *Each runner will need a way to induce deterministic input - the >>>>>> obviouschoice being a materialization.* >>>>>> >>>>>> Does this mean that a runner will always materialize (or whatever >> the >>>>>> strategy is) an input PCollection to this DoFn even though the >>>>> PCollection >>>>>> might have been produced by deterministic transforms? Would it make >>>> sense >>>>>> to also let DoFns declare if they produce non-deterministic output? >>>>>> >>>>>> -Vikas >>>>>> >>>>>> >>>>>> On 21 March 2017 at 13:52, Stephen Sisk <s...@google.com.invalid> >>>> wrote: >>>>>> >>>>>>> Hey Kenn- >>>>>>> >>>>>>> this seems important, but I don't have all the context on what >> the >>>>>> problem >>>>>>> is. >>>>>>> >>>>>>> Can you explain this sentence "Specifically, there is >> pseudorandom >>>> data >>>>>>> generated and once it has been observed and used to produce a >> side >>>>>> effect, >>>>>>> it cannot be regenerated without erroneous results." ? >>>>>>> >>>>>>> Where is the pseudorandom data coming from? Perhaps a concrete >>>> example >>>>>>> would help? >>>>>>> >>>>>>> S >>>>>>> >>>>>>> >>>>>>> On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles >>>> <k...@google.com.invalid >>>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Problem: >>>>>>>> >>>>>>>> I will drop all nuance and say that the `Write` transform as it >>>>> exists >>>>>> in >>>>>>>> the SDK is incorrect until we add some specification and APIs. >> We >>>>> can't >>>>>>>> keep shipping an SDK with an unsafe transform in it, and IMO >> this >>>>>>> certainly >>>>>>>> blocks a stable release. >>>>>>>> >>>>>>>> Specifically, there is pseudorandom data generated and once it >>> has >>>>> been >>>>>>>> observed and used to produce a side effect, it cannot be >>>> regenerated >>>>>>>> without erroneous results. >>>>>>>> >>>>>>>> This generalizes: For some side-effecting user-defined >> functions, >>>> it >>>>> is >>>>>>>> vital that even across retries/replays they have a consistent >>> view >>>> of >>>>>> the >>>>>>>> contents of their input PCollection, because their effect on >> the >>>>>> outside >>>>>>>> world cannot be retracted if/when they fail and are retried. >> Once >>>> the >>>>>>>> runner ensures a consistent view of the input, it is then their >>> own >>>>>>>> responsibility to be idempotent. >>>>>>>> >>>>>>>> Ideally we should specify this requirement for the user-defined >>>>>> function >>>>>>>> without imposing any particular implementation strategy on Beam >>>>>> runners. >>>>>>>> >>>>>>>> Proposal: >>>>>>>> >>>>>>>> 1. Let a DoFn declare (mechanism not important right now) that >> it >>>>>>> "requires >>>>>>>> deterministic input". >>>>>>>> >>>>>>>> 2. Each runner will need a way to induce deterministic input - >>> the >>>>>>> obvious >>>>>>>> choice being a materialization. >>>>>>>> >>>>>>>> I want to keep the discussion focused, so I'm leaving out any >>>>>>> possibilities >>>>>>>> of taking this further. >>>>>>>> >>>>>>>> Regarding performance: Today places that require this tend to >> be >>>>>> already >>>>>>>> paying the cost via GroupByKey / Reshuffle operations, since >> that >>>>> was a >>>>>>>> simple way to induce determinism in batch Dataflow* (doesn't >> work >>>> for >>>>>>> most >>>>>>>> other runners nor for streaming Dataflow). This change will >>>> replace a >>>>>>>> hard-coded implementation strategy with a requirement that may >> be >>>>>>> fulfilled >>>>>>>> in the most efficient way available. >>>>>>>> >>>>>>>> Thoughts? >>>>>>>> >>>>>>>> Kenn (w/ lots of consult from colleagues, especially Ben) >>>>>>>> >>>>>>>> * There is some overlap with the reshuffle/redistribute >>> discussion >>>>>>> because >>>>>>>> of this historical situation, but I would like to leave that >>>> broader >>>>>>>> discussion out of this correctness issue. >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>