I think deterministic here means deterministically replayable. i.e. no
matter how many times the element is retried, it will always be the same.

I think we should also allow specifying this on processTimer. This would
mean that any keyed state written in a previous processElement must be
guaranteed durable before processTimer is called.


On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers <bchamb...@apache.org> wrote:

> I strongly agree with this proposal. I think moving away from "just insert
> a GroupByKey for one of the 3 different reasons you may want it" towards
> APIs that allow code to express the requirements they have and the runner
> to choose the best way to meet this is a major step forwards in terms of
> portability.
>
> I think "deterministic" may be misleading. The actual contents of the
> collection aren't deterministic if upstream computations aren't. The
> property we really need is that once an input may have been observed by the
> side-effecting code it will never be observed with a different value.
>
> I would propose something RequiresStableInput, to indicate that the input
> must be stable as observed by the function. I could also see something
> hinting at the fact we don't recompute the input, such as
> RequiresMemoizedInput or RequiresNoRecomputation.
>
> -- Ben
>
> P.S For anyone interested other uses of GroupByKey that we may want to
> discuss APIs for would be preventing retry across steps (eg., preventing
> fusion) and redistributing inputs across workers.
>
> On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles <k...@google.com.invalid>
> wrote:
>
> > This came up again, so I wanted to push it along by proposing a specific
> > API for Java that could have a derived API in Python. I am writing this
> > quickly to get something out there, so I welcome suggestions for
> revision.
> >
> > Today a DoFn has a @ProcessElement annotated method with various
> automated
> > parameters, but most fundamentally this:
> >
> > @ProcessElement
> > public void process(ProcessContext ctx) {
> >   ctx.element() // to access the current input element
> >   ctx.output(something) // to write to default output collection
> >   ctx.output(tag, something) // to write to other output collections
> > }
> >
> > For some time, we have hoped to unpack the context - it is a
> > backwards-compatibility pattern made obsolete by the new DoFn design. So
> > instead it would look like this:
> >
> > @ProcessElement
> > public void process(Element element, MainOutput mainOutput, ...) {
> >   element.get() // to access the current input element
> >   mainOutput.output(something) // to write to the default output
> collection
> >   other.output(something) // to write to other output collection
> > }
> >
> > I've deliberately left out the undecided syntax for side outputs. But it
> > would be nice for the tag to be built in to the parameter so it doesn't
> > have to be used when calling output().
> >
> > One way to enhance this to deterministic input would just be this:
> >
> > @ProcessElement
> > @RequiresDeterministicInput
> > public void process(Element element, MainOutput mainOutput, ...) {
> >   element.get() // to access the current input element
> >   mainOutput.output(something) // to write to the default output
> collection
> >   other.output(something) // to write to other output collection
> > }
> >
> > There are really a lot of places where we could put an annotation or
> change
> > a type to indicate that the input PCollection should be
> > well-defined/deterministically-replayable. I don't have a really strong
> > opinion.
> >
> > Kenn
> >
> > On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers
> <bchamb...@google.com.invalid
> > >
> > wrote:
> >
> > > Allowing an annotation on DoFn's that produce deterministic output
> could
> > be
> > > added in the future, but doesn't seem like a great option.
> > >
> > > 1. It is a correctness issue to assume a DoFn is deterministic and be
> > > wrong, so we would need to assume all transform outputs are
> > > non-deterministic unless annotated. Getting this correct is difficult
> > (for
> > > example, GBK is surprisingly non-deterministic except in specific
> cases).
> > >
> > > 2. It is unlikely to be a major performance improvement, given that any
> > > non-deterministic transform prior to a sink (which are most likely to
> > > require deterministic input) will cause additional work to be needed.
> > >
> > > Based on this, it seems like the risk of allowing an annotation is high
> > > while the potential for performance improvements is low. The current
> > > proposal (not allowing an annotation) makes sense for now, until we can
> > > demonstrate that the impact on performance is high in cases that could
> be
> > > avoided with an annotation (in real-world use).
> > >
> > > -- Ben
> > >
> > > On Tue, Mar 21, 2017 at 2:05 PM vikas rk <vikky...@gmail.com> wrote:
> > >
> > > +1 for the general idea of runners handling it over hard-coded
> > > implementation strategy.
> > >
> > > For the Write transform I believe you are talking about
> ApplyShardingKey
> > > <
> > > https://github.com/apache/beam/blob/d66029cafde152c0a46ebd276ddfa4
> > > c3e7fd3433/sdks/java/core/src/main/java/org/apache/beam/sdk/
> > > io/Write.java#L304
> > > >
> > > which
> > > introduces non deterministic behavior when retried?
> > >
> > >
> > > *Let a DoFn declare (mechanism not important right now) that it
> > > "requiresdeterministic input"*
> > >
> > >
> > >
> > > *Each runner will need a way to induce deterministic input - the
> > > obviouschoice being a materialization.*
> > >
> > > Does this mean that a runner will always materialize (or whatever the
> > > strategy is) an input PCollection to this DoFn even though the
> > PCollection
> > > might have been produced by deterministic transforms? Would it make
> sense
> > > to also let DoFns declare if they produce non-deterministic output?
> > >
> > > -Vikas
> > >
> > >
> > > On 21 March 2017 at 13:52, Stephen Sisk <s...@google.com.invalid>
> wrote:
> > >
> > > > Hey Kenn-
> > > >
> > > > this seems important, but I don't have all the context on what the
> > > problem
> > > > is.
> > > >
> > > > Can you explain this sentence "Specifically, there is pseudorandom
> data
> > > > generated and once it has been observed and used to produce a side
> > > effect,
> > > > it cannot be regenerated without erroneous results." ?
> > > >
> > > > Where is the pseudorandom data coming from? Perhaps a concrete
> example
> > > > would help?
> > > >
> > > > S
> > > >
> > > >
> > > > On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles
> <k...@google.com.invalid
> > >
> > > > wrote:
> > > >
> > > > > Problem:
> > > > >
> > > > > I will drop all nuance and say that the `Write` transform as it
> > exists
> > > in
> > > > > the SDK is incorrect until we add some specification and APIs. We
> > can't
> > > > > keep shipping an SDK with an unsafe transform in it, and IMO this
> > > > certainly
> > > > > blocks a stable release.
> > > > >
> > > > > Specifically, there is pseudorandom data generated and once it has
> > been
> > > > > observed and used to produce a side effect, it cannot be
> regenerated
> > > > > without erroneous results.
> > > > >
> > > > > This generalizes: For some side-effecting user-defined functions,
> it
> > is
> > > > > vital that even across retries/replays they have a consistent view
> of
> > > the
> > > > > contents of their input PCollection, because their effect on the
> > > outside
> > > > > world cannot be retracted if/when they fail and are retried. Once
> the
> > > > > runner ensures a consistent view of the input, it is then their own
> > > > > responsibility to be idempotent.
> > > > >
> > > > > Ideally we should specify this requirement for the user-defined
> > > function
> > > > > without imposing any particular implementation strategy on Beam
> > > runners.
> > > > >
> > > > > Proposal:
> > > > >
> > > > > 1. Let a DoFn declare (mechanism not important right now) that it
> > > > "requires
> > > > > deterministic input".
> > > > >
> > > > > 2. Each runner will need a way to induce deterministic input - the
> > > > obvious
> > > > > choice being a materialization.
> > > > >
> > > > > I want to keep the discussion focused, so I'm leaving out any
> > > > possibilities
> > > > > of taking this further.
> > > > >
> > > > > Regarding performance: Today places that require this tend to be
> > > already
> > > > > paying the cost via GroupByKey / Reshuffle operations, since that
> > was a
> > > > > simple way to induce determinism in batch Dataflow* (doesn't work
> for
> > > > most
> > > > > other runners nor for streaming Dataflow). This change will
> replace a
> > > > > hard-coded implementation strategy with a requirement that may be
> > > > fulfilled
> > > > > in the most efficient way available.
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > Kenn (w/ lots of consult from colleagues, especially Ben)
> > > > >
> > > > > * There is some overlap with the reshuffle/redistribute discussion
> > > > because
> > > > > of this historical situation, but I would like to leave that
> broader
> > > > > discussion out of this correctness issue.
> > > > >
> > > >
> > >
> >
>

Reply via email to