I don't think it really makes sense to to do this on Combine. And I agree
with you, it doesn't make sense on composites either.

On Thu, Aug 10, 2017 at 9:19 AM, Scott Wegner <sweg...@google.com.invalid>
wrote:

> Does requires-stable-input only apply to ParDo transforms?
>
> I don't think it would make sense to annotate to composite, because
> checkpointing should happen as close to the side-effecting operation as
> possible, since upstream transforms within a composite could introduce
> non-determinism. So it's the primitive transform that should own the
> requirement.
>
> Are there other primitives that should be annotated? 'Combine' is
> interesting because it optimized in Dataflow (and perhaps other runners) to
> partially apply before a GroupByKey.
>
> On Thu, Aug 10, 2017 at 9:01 AM Tyler Akidau <taki...@google.com.invalid>
> wrote:
>
> > +1 to the annotation idea, and to having it on processTimer.
> >
> > -Tyler
> >
> > On Thu, Aug 10, 2017 at 2:15 AM Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > +1 to the annotation approach. I outlined how implementing this would
> > work
> > > in the Flink runner in the Thread about the exactly-once Kafka Sink.
> > >
> > > > On 9. Aug 2017, at 23:03, Reuven Lax <re...@google.com.INVALID>
> wrote:
> > > >
> > > > Yes - I don't think we should try and make any deterministic
> guarantees
> > > > about what is in a bundle. Stability guarantees are per element only.
> > > >
> > > > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh <tg...@google.com.invalid
> >
> > > > wrote:
> > > >
> > > >> +1 to the annotation-on-ProcessElement approach. ProcessElement is
> the
> > > >> minimum implementation requirement of a DoFn, and should be where
> the
> > > >> processing logic which depends on characteristics of the inputs lie.
> > > It's a
> > > >> good way of signalling the requirements of the Fn, and letting the
> > > runner
> > > >> decide.
> > > >>
> > > >> I have a minor concern that this may not work as expected for users
> > that
> > > >> try to batch remote calls in `FinishBundle` - we should make sure we
> > > >> document that it is explicitly the input elements that will be
> > replayed,
> > > >> and bundles and other operational are still arbitrary.
> > > >>
> > > >>
> > > >>
> > > >> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax
> <re...@google.com.invalid
> > >
> > > >> wrote:
> > > >>
> > > >>> I think deterministic here means deterministically replayable. i.e.
> > no
> > > >>> matter how many times the element is retried, it will always be the
> > > same.
> > > >>>
> > > >>> I think we should also allow specifying this on processTimer. This
> > > would
> > > >>> mean that any keyed state written in a previous processElement must
> > be
> > > >>> guaranteed durable before processTimer is called.
> > > >>>
> > > >>>
> > > >>> On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers <
> bchamb...@apache.org>
> > > >>> wrote:
> > > >>>
> > > >>>> I strongly agree with this proposal. I think moving away from
> "just
> > > >>> insert
> > > >>>> a GroupByKey for one of the 3 different reasons you may want it"
> > > >> towards
> > > >>>> APIs that allow code to express the requirements they have and the
> > > >> runner
> > > >>>> to choose the best way to meet this is a major step forwards in
> > terms
> > > >> of
> > > >>>> portability.
> > > >>>>
> > > >>>> I think "deterministic" may be misleading. The actual contents of
> > the
> > > >>>> collection aren't deterministic if upstream computations aren't.
> The
> > > >>>> property we really need is that once an input may have been
> observed
> > > by
> > > >>> the
> > > >>>> side-effecting code it will never be observed with a different
> > value.
> > > >>>>
> > > >>>> I would propose something RequiresStableInput, to indicate that
> the
> > > >> input
> > > >>>> must be stable as observed by the function. I could also see
> > something
> > > >>>> hinting at the fact we don't recompute the input, such as
> > > >>>> RequiresMemoizedInput or RequiresNoRecomputation.
> > > >>>>
> > > >>>> -- Ben
> > > >>>>
> > > >>>> P.S For anyone interested other uses of GroupByKey that we may
> want
> > to
> > > >>>> discuss APIs for would be preventing retry across steps (eg.,
> > > >> preventing
> > > >>>> fusion) and redistributing inputs across workers.
> > > >>>>
> > > >>>> On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles
> > <k...@google.com.invalid
> > > >>>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> This came up again, so I wanted to push it along by proposing a
> > > >>> specific
> > > >>>>> API for Java that could have a derived API in Python. I am
> writing
> > > >> this
> > > >>>>> quickly to get something out there, so I welcome suggestions for
> > > >>>> revision.
> > > >>>>>
> > > >>>>> Today a DoFn has a @ProcessElement annotated method with various
> > > >>>> automated
> > > >>>>> parameters, but most fundamentally this:
> > > >>>>>
> > > >>>>> @ProcessElement
> > > >>>>> public void process(ProcessContext ctx) {
> > > >>>>>  ctx.element() // to access the current input element
> > > >>>>>  ctx.output(something) // to write to default output collection
> > > >>>>>  ctx.output(tag, something) // to write to other output
> collections
> > > >>>>> }
> > > >>>>>
> > > >>>>> For some time, we have hoped to unpack the context - it is a
> > > >>>>> backwards-compatibility pattern made obsolete by the new DoFn
> > design.
> > > >>> So
> > > >>>>> instead it would look like this:
> > > >>>>>
> > > >>>>> @ProcessElement
> > > >>>>> public void process(Element element, MainOutput mainOutput, ...)
> {
> > > >>>>>  element.get() // to access the current input element
> > > >>>>>  mainOutput.output(something) // to write to the default output
> > > >>>> collection
> > > >>>>>  other.output(something) // to write to other output collection
> > > >>>>> }
> > > >>>>>
> > > >>>>> I've deliberately left out the undecided syntax for side outputs.
> > But
> > > >>> it
> > > >>>>> would be nice for the tag to be built in to the parameter so it
> > > >> doesn't
> > > >>>>> have to be used when calling output().
> > > >>>>>
> > > >>>>> One way to enhance this to deterministic input would just be
> this:
> > > >>>>>
> > > >>>>> @ProcessElement
> > > >>>>> @RequiresDeterministicInput
> > > >>>>> public void process(Element element, MainOutput mainOutput, ...)
> {
> > > >>>>>  element.get() // to access the current input element
> > > >>>>>  mainOutput.output(something) // to write to the default output
> > > >>>> collection
> > > >>>>>  other.output(something) // to write to other output collection
> > > >>>>> }
> > > >>>>>
> > > >>>>> There are really a lot of places where we could put an annotation
> > or
> > > >>>> change
> > > >>>>> a type to indicate that the input PCollection should be
> > > >>>>> well-defined/deterministically-replayable. I don't have a really
> > > >>> strong
> > > >>>>> opinion.
> > > >>>>>
> > > >>>>> Kenn
> > > >>>>>
> > > >>>>> On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers
> > > >>>> <bchamb...@google.com.invalid
> > > >>>>>>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Allowing an annotation on DoFn's that produce deterministic
> output
> > > >>>> could
> > > >>>>> be
> > > >>>>>> added in the future, but doesn't seem like a great option.
> > > >>>>>>
> > > >>>>>> 1. It is a correctness issue to assume a DoFn is deterministic
> and
> > > >> be
> > > >>>>>> wrong, so we would need to assume all transform outputs are
> > > >>>>>> non-deterministic unless annotated. Getting this correct is
> > > >> difficult
> > > >>>>> (for
> > > >>>>>> example, GBK is surprisingly non-deterministic except in
> specific
> > > >>>> cases).
> > > >>>>>>
> > > >>>>>> 2. It is unlikely to be a major performance improvement, given
> > that
> > > >>> any
> > > >>>>>> non-deterministic transform prior to a sink (which are most
> likely
> > > >> to
> > > >>>>>> require deterministic input) will cause additional work to be
> > > >> needed.
> > > >>>>>>
> > > >>>>>> Based on this, it seems like the risk of allowing an annotation
> is
> > > >>> high
> > > >>>>>> while the potential for performance improvements is low. The
> > > >> current
> > > >>>>>> proposal (not allowing an annotation) makes sense for now, until
> > we
> > > >>> can
> > > >>>>>> demonstrate that the impact on performance is high in cases that
> > > >>> could
> > > >>>> be
> > > >>>>>> avoided with an annotation (in real-world use).
> > > >>>>>>
> > > >>>>>> -- Ben
> > > >>>>>>
> > > >>>>>> On Tue, Mar 21, 2017 at 2:05 PM vikas rk <vikky...@gmail.com>
> > > >> wrote:
> > > >>>>>>
> > > >>>>>> +1 for the general idea of runners handling it over hard-coded
> > > >>>>>> implementation strategy.
> > > >>>>>>
> > > >>>>>> For the Write transform I believe you are talking about
> > > >>>> ApplyShardingKey
> > > >>>>>> <
> > > >>>>>>
> > https://github.com/apache/beam/blob/d66029cafde152c0a46ebd276ddfa4
> > > >>>>>> c3e7fd3433/sdks/java/core/src/main/java/org/apache/beam/sdk/
> > > >>>>>> io/Write.java#L304
> > > >>>>>>>
> > > >>>>>> which
> > > >>>>>> introduces non deterministic behavior when retried?
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> *Let a DoFn declare (mechanism not important right now) that it
> > > >>>>>> "requiresdeterministic input"*
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> *Each runner will need a way to induce deterministic input - the
> > > >>>>>> obviouschoice being a materialization.*
> > > >>>>>>
> > > >>>>>> Does this mean that a runner will always materialize (or
> whatever
> > > >> the
> > > >>>>>> strategy is) an input PCollection to this DoFn even though the
> > > >>>>> PCollection
> > > >>>>>> might have been produced by deterministic transforms? Would it
> > make
> > > >>>> sense
> > > >>>>>> to also let DoFns declare if they produce non-deterministic
> > output?
> > > >>>>>>
> > > >>>>>> -Vikas
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On 21 March 2017 at 13:52, Stephen Sisk <s...@google.com.invalid
> >
> > > >>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hey Kenn-
> > > >>>>>>>
> > > >>>>>>> this seems important, but I don't have all the context on what
> > > >> the
> > > >>>>>> problem
> > > >>>>>>> is.
> > > >>>>>>>
> > > >>>>>>> Can you explain this sentence "Specifically, there is
> > > >> pseudorandom
> > > >>>> data
> > > >>>>>>> generated and once it has been observed and used to produce a
> > > >> side
> > > >>>>>> effect,
> > > >>>>>>> it cannot be regenerated without erroneous results." ?
> > > >>>>>>>
> > > >>>>>>> Where is the pseudorandom data coming from? Perhaps a concrete
> > > >>>> example
> > > >>>>>>> would help?
> > > >>>>>>>
> > > >>>>>>> S
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles
> > > >>>> <k...@google.com.invalid
> > > >>>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Problem:
> > > >>>>>>>>
> > > >>>>>>>> I will drop all nuance and say that the `Write` transform as
> it
> > > >>>>> exists
> > > >>>>>> in
> > > >>>>>>>> the SDK is incorrect until we add some specification and APIs.
> > > >> We
> > > >>>>> can't
> > > >>>>>>>> keep shipping an SDK with an unsafe transform in it, and IMO
> > > >> this
> > > >>>>>>> certainly
> > > >>>>>>>> blocks a stable release.
> > > >>>>>>>>
> > > >>>>>>>> Specifically, there is pseudorandom data generated and once it
> > > >>> has
> > > >>>>> been
> > > >>>>>>>> observed and used to produce a side effect, it cannot be
> > > >>>> regenerated
> > > >>>>>>>> without erroneous results.
> > > >>>>>>>>
> > > >>>>>>>> This generalizes: For some side-effecting user-defined
> > > >> functions,
> > > >>>> it
> > > >>>>> is
> > > >>>>>>>> vital that even across retries/replays they have a consistent
> > > >>> view
> > > >>>> of
> > > >>>>>> the
> > > >>>>>>>> contents of their input PCollection, because their effect on
> > > >> the
> > > >>>>>> outside
> > > >>>>>>>> world cannot be retracted if/when they fail and are retried.
> > > >> Once
> > > >>>> the
> > > >>>>>>>> runner ensures a consistent view of the input, it is then
> their
> > > >>> own
> > > >>>>>>>> responsibility to be idempotent.
> > > >>>>>>>>
> > > >>>>>>>> Ideally we should specify this requirement for the
> user-defined
> > > >>>>>> function
> > > >>>>>>>> without imposing any particular implementation strategy on
> Beam
> > > >>>>>> runners.
> > > >>>>>>>>
> > > >>>>>>>> Proposal:
> > > >>>>>>>>
> > > >>>>>>>> 1. Let a DoFn declare (mechanism not important right now) that
> > > >> it
> > > >>>>>>> "requires
> > > >>>>>>>> deterministic input".
> > > >>>>>>>>
> > > >>>>>>>> 2. Each runner will need a way to induce deterministic input -
> > > >>> the
> > > >>>>>>> obvious
> > > >>>>>>>> choice being a materialization.
> > > >>>>>>>>
> > > >>>>>>>> I want to keep the discussion focused, so I'm leaving out any
> > > >>>>>>> possibilities
> > > >>>>>>>> of taking this further.
> > > >>>>>>>>
> > > >>>>>>>> Regarding performance: Today places that require this tend to
> > > >> be
> > > >>>>>> already
> > > >>>>>>>> paying the cost via GroupByKey / Reshuffle operations, since
> > > >> that
> > > >>>>> was a
> > > >>>>>>>> simple way to induce determinism in batch Dataflow* (doesn't
> > > >> work
> > > >>>> for
> > > >>>>>>> most
> > > >>>>>>>> other runners nor for streaming Dataflow). This change will
> > > >>>> replace a
> > > >>>>>>>> hard-coded implementation strategy with a requirement that may
> > > >> be
> > > >>>>>>> fulfilled
> > > >>>>>>>> in the most efficient way available.
> > > >>>>>>>>
> > > >>>>>>>> Thoughts?
> > > >>>>>>>>
> > > >>>>>>>> Kenn (w/ lots of consult from colleagues, especially Ben)
> > > >>>>>>>>
> > > >>>>>>>> * There is some overlap with the reshuffle/redistribute
> > > >>> discussion
> > > >>>>>>> because
> > > >>>>>>>> of this historical situation, but I would like to leave that
> > > >>>> broader
> > > >>>>>>>> discussion out of this correctness issue.
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
> >
>

Reply via email to