This came up again, so I wanted to push it along by proposing a specific
API for Java that could have a derived API in Python. I am writing this
quickly to get something out there, so I welcome suggestions for revision.

Today a DoFn has a @ProcessElement annotated method with various automated
parameters, but most fundamentally this:

@ProcessElement
public void process(ProcessContext ctx) {
  ctx.element() // to access the current input element
  ctx.output(something) // to write to default output collection
  ctx.output(tag, something) // to write to other output collections
}

For some time, we have hoped to unpack the context - it is a
backwards-compatibility pattern made obsolete by the new DoFn design. So
instead it would look like this:

@ProcessElement
public void process(Element element, MainOutput mainOutput, ...) {
  element.get() // to access the current input element
  mainOutput.output(something) // to write to the default output collection
  other.output(something) // to write to other output collection
}

I've deliberately left out the undecided syntax for side outputs. But it
would be nice for the tag to be built in to the parameter so it doesn't
have to be used when calling output().

One way to enhance this to deterministic input would just be this:

@ProcessElement
@RequiresDeterministicInput
public void process(Element element, MainOutput mainOutput, ...) {
  element.get() // to access the current input element
  mainOutput.output(something) // to write to the default output collection
  other.output(something) // to write to other output collection
}

There are really a lot of places where we could put an annotation or change
a type to indicate that the input PCollection should be
well-defined/deterministically-replayable. I don't have a really strong
opinion.

Kenn

On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers <bchamb...@google.com.invalid>
wrote:

> Allowing an annotation on DoFn's that produce deterministic output could be
> added in the future, but doesn't seem like a great option.
>
> 1. It is a correctness issue to assume a DoFn is deterministic and be
> wrong, so we would need to assume all transform outputs are
> non-deterministic unless annotated. Getting this correct is difficult (for
> example, GBK is surprisingly non-deterministic except in specific cases).
>
> 2. It is unlikely to be a major performance improvement, given that any
> non-deterministic transform prior to a sink (which are most likely to
> require deterministic input) will cause additional work to be needed.
>
> Based on this, it seems like the risk of allowing an annotation is high
> while the potential for performance improvements is low. The current
> proposal (not allowing an annotation) makes sense for now, until we can
> demonstrate that the impact on performance is high in cases that could be
> avoided with an annotation (in real-world use).
>
> -- Ben
>
> On Tue, Mar 21, 2017 at 2:05 PM vikas rk <vikky...@gmail.com> wrote:
>
> +1 for the general idea of runners handling it over hard-coded
> implementation strategy.
>
> For the Write transform I believe you are talking about ApplyShardingKey
> <
> https://github.com/apache/beam/blob/d66029cafde152c0a46ebd276ddfa4
> c3e7fd3433/sdks/java/core/src/main/java/org/apache/beam/sdk/
> io/Write.java#L304
> >
> which
> introduces non deterministic behavior when retried?
>
>
> *Let a DoFn declare (mechanism not important right now) that it
> "requiresdeterministic input"*
>
>
>
> *Each runner will need a way to induce deterministic input - the
> obviouschoice being a materialization.*
>
> Does this mean that a runner will always materialize (or whatever the
> strategy is) an input PCollection to this DoFn even though the PCollection
> might have been produced by deterministic transforms? Would it make sense
> to also let DoFns declare if they produce non-deterministic output?
>
> -Vikas
>
>
> On 21 March 2017 at 13:52, Stephen Sisk <s...@google.com.invalid> wrote:
>
> > Hey Kenn-
> >
> > this seems important, but I don't have all the context on what the
> problem
> > is.
> >
> > Can you explain this sentence "Specifically, there is pseudorandom data
> > generated and once it has been observed and used to produce a side
> effect,
> > it cannot be regenerated without erroneous results." ?
> >
> > Where is the pseudorandom data coming from? Perhaps a concrete example
> > would help?
> >
> > S
> >
> >
> > On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles <k...@google.com.invalid>
> > wrote:
> >
> > > Problem:
> > >
> > > I will drop all nuance and say that the `Write` transform as it exists
> in
> > > the SDK is incorrect until we add some specification and APIs. We can't
> > > keep shipping an SDK with an unsafe transform in it, and IMO this
> > certainly
> > > blocks a stable release.
> > >
> > > Specifically, there is pseudorandom data generated and once it has been
> > > observed and used to produce a side effect, it cannot be regenerated
> > > without erroneous results.
> > >
> > > This generalizes: For some side-effecting user-defined functions, it is
> > > vital that even across retries/replays they have a consistent view of
> the
> > > contents of their input PCollection, because their effect on the
> outside
> > > world cannot be retracted if/when they fail and are retried. Once the
> > > runner ensures a consistent view of the input, it is then their own
> > > responsibility to be idempotent.
> > >
> > > Ideally we should specify this requirement for the user-defined
> function
> > > without imposing any particular implementation strategy on Beam
> runners.
> > >
> > > Proposal:
> > >
> > > 1. Let a DoFn declare (mechanism not important right now) that it
> > "requires
> > > deterministic input".
> > >
> > > 2. Each runner will need a way to induce deterministic input - the
> > obvious
> > > choice being a materialization.
> > >
> > > I want to keep the discussion focused, so I'm leaving out any
> > possibilities
> > > of taking this further.
> > >
> > > Regarding performance: Today places that require this tend to be
> already
> > > paying the cost via GroupByKey / Reshuffle operations, since that was a
> > > simple way to induce determinism in batch Dataflow* (doesn't work for
> > most
> > > other runners nor for streaming Dataflow). This change will replace a
> > > hard-coded implementation strategy with a requirement that may be
> > fulfilled
> > > in the most efficient way available.
> > >
> > > Thoughts?
> > >
> > > Kenn (w/ lots of consult from colleagues, especially Ben)
> > >
> > > * There is some overlap with the reshuffle/redistribute discussion
> > because
> > > of this historical situation, but I would like to leave that broader
> > > discussion out of this correctness issue.
> > >
> >
>

Reply via email to