As I said, a minor concern; we should be explicit in our documentation that
it is only the input _elements_ that are
deterministic/stable/replayable/etc, and not operational concerns
surrounding them (such as bundling). I'd generally avoid making the actual
annotation more verbose.

On Wed, Aug 9, 2017 at 1:49 PM, Kenneth Knowles <k...@google.com.invalid>
wrote:

> On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh <tg...@google.com.invalid>
> wrote:
>
> > I have a minor concern that this may not work as expected for users that
> > try to batch remote calls in `FinishBundle` - we should make sure we
> > document that it is explicitly the input elements that will be replayed,
> > and bundles and other operational are still arbitrary.
> >
>
> I think it is safe since writing has to be idempotent per-element anyhow,
> and a DoFn must be discarded on failure and a new version used for any
> retries.
>
> Was your concern something else? Were you concerned that the phrasing may
> imply a deterministic ordering of calls to the DoFn's methods? Perhaps
> @RequiresStableInputContents to make clear it has nothing to do with
> replaying method calls?
>
> Kenn
>
>
> >
> >
> > On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax <re...@google.com.invalid>
> > wrote:
> >
> > > I think deterministic here means deterministically replayable. i.e. no
> > > matter how many times the element is retried, it will always be the
> same.
> > >
> > > I think we should also allow specifying this on processTimer. This
> would
> > > mean that any keyed state written in a previous processElement must be
> > > guaranteed durable before processTimer is called.
> > >
> > >
> > > On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers <bchamb...@apache.org>
> > > wrote:
> > >
> > > > I strongly agree with this proposal. I think moving away from "just
> > > insert
> > > > a GroupByKey for one of the 3 different reasons you may want it"
> > towards
> > > > APIs that allow code to express the requirements they have and the
> > runner
> > > > to choose the best way to meet this is a major step forwards in terms
> > of
> > > > portability.
> > > >
> > > > I think "deterministic" may be misleading. The actual contents of the
> > > > collection aren't deterministic if upstream computations aren't. The
> > > > property we really need is that once an input may have been observed
> by
> > > the
> > > > side-effecting code it will never be observed with a different value.
> > > >
> > > > I would propose something RequiresStableInput, to indicate that the
> > input
> > > > must be stable as observed by the function. I could also see
> something
> > > > hinting at the fact we don't recompute the input, such as
> > > > RequiresMemoizedInput or RequiresNoRecomputation.
> > > >
> > > > -- Ben
> > > >
> > > > P.S For anyone interested other uses of GroupByKey that we may want
> to
> > > > discuss APIs for would be preventing retry across steps (eg.,
> > preventing
> > > > fusion) and redistributing inputs across workers.
> > > >
> > > > On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles
> <k...@google.com.invalid
> > >
> > > > wrote:
> > > >
> > > > > This came up again, so I wanted to push it along by proposing a
> > > specific
> > > > > API for Java that could have a derived API in Python. I am writing
> > this
> > > > > quickly to get something out there, so I welcome suggestions for
> > > > revision.
> > > > >
> > > > > Today a DoFn has a @ProcessElement annotated method with various
> > > > automated
> > > > > parameters, but most fundamentally this:
> > > > >
> > > > > @ProcessElement
> > > > > public void process(ProcessContext ctx) {
> > > > >   ctx.element() // to access the current input element
> > > > >   ctx.output(something) // to write to default output collection
> > > > >   ctx.output(tag, something) // to write to other output
> collections
> > > > > }
> > > > >
> > > > > For some time, we have hoped to unpack the context - it is a
> > > > > backwards-compatibility pattern made obsolete by the new DoFn
> design.
> > > So
> > > > > instead it would look like this:
> > > > >
> > > > > @ProcessElement
> > > > > public void process(Element element, MainOutput mainOutput, ...) {
> > > > >   element.get() // to access the current input element
> > > > >   mainOutput.output(something) // to write to the default output
> > > > collection
> > > > >   other.output(something) // to write to other output collection
> > > > > }
> > > > >
> > > > > I've deliberately left out the undecided syntax for side outputs.
> But
> > > it
> > > > > would be nice for the tag to be built in to the parameter so it
> > doesn't
> > > > > have to be used when calling output().
> > > > >
> > > > > One way to enhance this to deterministic input would just be this:
> > > > >
> > > > > @ProcessElement
> > > > > @RequiresDeterministicInput
> > > > > public void process(Element element, MainOutput mainOutput, ...) {
> > > > >   element.get() // to access the current input element
> > > > >   mainOutput.output(something) // to write to the default output
> > > > collection
> > > > >   other.output(something) // to write to other output collection
> > > > > }
> > > > >
> > > > > There are really a lot of places where we could put an annotation
> or
> > > > change
> > > > > a type to indicate that the input PCollection should be
> > > > > well-defined/deterministically-replayable. I don't have a really
> > > strong
> > > > > opinion.
> > > > >
> > > > > Kenn
> > > > >
> > > > > On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers
> > > > <bchamb...@google.com.invalid
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Allowing an annotation on DoFn's that produce deterministic
> output
> > > > could
> > > > > be
> > > > > > added in the future, but doesn't seem like a great option.
> > > > > >
> > > > > > 1. It is a correctness issue to assume a DoFn is deterministic
> and
> > be
> > > > > > wrong, so we would need to assume all transform outputs are
> > > > > > non-deterministic unless annotated. Getting this correct is
> > difficult
> > > > > (for
> > > > > > example, GBK is surprisingly non-deterministic except in specific
> > > > cases).
> > > > > >
> > > > > > 2. It is unlikely to be a major performance improvement, given
> that
> > > any
> > > > > > non-deterministic transform prior to a sink (which are most
> likely
> > to
> > > > > > require deterministic input) will cause additional work to be
> > needed.
> > > > > >
> > > > > > Based on this, it seems like the risk of allowing an annotation
> is
> > > high
> > > > > > while the potential for performance improvements is low. The
> > current
> > > > > > proposal (not allowing an annotation) makes sense for now, until
> we
> > > can
> > > > > > demonstrate that the impact on performance is high in cases that
> > > could
> > > > be
> > > > > > avoided with an annotation (in real-world use).
> > > > > >
> > > > > > -- Ben
> > > > > >
> > > > > > On Tue, Mar 21, 2017 at 2:05 PM vikas rk <vikky...@gmail.com>
> > wrote:
> > > > > >
> > > > > > +1 for the general idea of runners handling it over hard-coded
> > > > > > implementation strategy.
> > > > > >
> > > > > > For the Write transform I believe you are talking about
> > > > ApplyShardingKey
> > > > > > <
> > > > > > https://github.com/apache/beam/blob/
> d66029cafde152c0a46ebd276ddfa4
> > > > > > c3e7fd3433/sdks/java/core/src/main/java/org/apache/beam/sdk/
> > > > > > io/Write.java#L304
> > > > > > >
> > > > > > which
> > > > > > introduces non deterministic behavior when retried?
> > > > > >
> > > > > >
> > > > > > *Let a DoFn declare (mechanism not important right now) that it
> > > > > > "requiresdeterministic input"*
> > > > > >
> > > > > >
> > > > > >
> > > > > > *Each runner will need a way to induce deterministic input - the
> > > > > > obviouschoice being a materialization.*
> > > > > >
> > > > > > Does this mean that a runner will always materialize (or whatever
> > the
> > > > > > strategy is) an input PCollection to this DoFn even though the
> > > > > PCollection
> > > > > > might have been produced by deterministic transforms? Would it
> make
> > > > sense
> > > > > > to also let DoFns declare if they produce non-deterministic
> output?
> > > > > >
> > > > > > -Vikas
> > > > > >
> > > > > >
> > > > > > On 21 March 2017 at 13:52, Stephen Sisk <s...@google.com.invalid
> >
> > > > wrote:
> > > > > >
> > > > > > > Hey Kenn-
> > > > > > >
> > > > > > > this seems important, but I don't have all the context on what
> > the
> > > > > > problem
> > > > > > > is.
> > > > > > >
> > > > > > > Can you explain this sentence "Specifically, there is
> > pseudorandom
> > > > data
> > > > > > > generated and once it has been observed and used to produce a
> > side
> > > > > > effect,
> > > > > > > it cannot be regenerated without erroneous results." ?
> > > > > > >
> > > > > > > Where is the pseudorandom data coming from? Perhaps a concrete
> > > > example
> > > > > > > would help?
> > > > > > >
> > > > > > > S
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles
> > > > <k...@google.com.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Problem:
> > > > > > > >
> > > > > > > > I will drop all nuance and say that the `Write` transform as
> it
> > > > > exists
> > > > > > in
> > > > > > > > the SDK is incorrect until we add some specification and
> APIs.
> > We
> > > > > can't
> > > > > > > > keep shipping an SDK with an unsafe transform in it, and IMO
> > this
> > > > > > > certainly
> > > > > > > > blocks a stable release.
> > > > > > > >
> > > > > > > > Specifically, there is pseudorandom data generated and once
> it
> > > has
> > > > > been
> > > > > > > > observed and used to produce a side effect, it cannot be
> > > > regenerated
> > > > > > > > without erroneous results.
> > > > > > > >
> > > > > > > > This generalizes: For some side-effecting user-defined
> > functions,
> > > > it
> > > > > is
> > > > > > > > vital that even across retries/replays they have a consistent
> > > view
> > > > of
> > > > > > the
> > > > > > > > contents of their input PCollection, because their effect on
> > the
> > > > > > outside
> > > > > > > > world cannot be retracted if/when they fail and are retried.
> > Once
> > > > the
> > > > > > > > runner ensures a consistent view of the input, it is then
> their
> > > own
> > > > > > > > responsibility to be idempotent.
> > > > > > > >
> > > > > > > > Ideally we should specify this requirement for the
> user-defined
> > > > > > function
> > > > > > > > without imposing any particular implementation strategy on
> Beam
> > > > > > runners.
> > > > > > > >
> > > > > > > > Proposal:
> > > > > > > >
> > > > > > > > 1. Let a DoFn declare (mechanism not important right now)
> that
> > it
> > > > > > > "requires
> > > > > > > > deterministic input".
> > > > > > > >
> > > > > > > > 2. Each runner will need a way to induce deterministic input
> -
> > > the
> > > > > > > obvious
> > > > > > > > choice being a materialization.
> > > > > > > >
> > > > > > > > I want to keep the discussion focused, so I'm leaving out any
> > > > > > > possibilities
> > > > > > > > of taking this further.
> > > > > > > >
> > > > > > > > Regarding performance: Today places that require this tend to
> > be
> > > > > > already
> > > > > > > > paying the cost via GroupByKey / Reshuffle operations, since
> > that
> > > > > was a
> > > > > > > > simple way to induce determinism in batch Dataflow* (doesn't
> > work
> > > > for
> > > > > > > most
> > > > > > > > other runners nor for streaming Dataflow). This change will
> > > > replace a
> > > > > > > > hard-coded implementation strategy with a requirement that
> may
> > be
> > > > > > > fulfilled
> > > > > > > > in the most efficient way available.
> > > > > > > >
> > > > > > > > Thoughts?
> > > > > > > >
> > > > > > > > Kenn (w/ lots of consult from colleagues, especially Ben)
> > > > > > > >
> > > > > > > > * There is some overlap with the reshuffle/redistribute
> > > discussion
> > > > > > > because
> > > > > > > > of this historical situation, but I would like to leave that
> > > > broader
> > > > > > > > discussion out of this correctness issue.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to