+1 to the annotation idea, and to having it on processTimer.

-Tyler

On Thu, Aug 10, 2017 at 2:15 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> +1 to the annotation approach. I outlined how implementing this would work
> in the Flink runner in the Thread about the exactly-once Kafka Sink.
>
> > On 9. Aug 2017, at 23:03, Reuven Lax <re...@google.com.INVALID> wrote:
> >
> > Yes - I don't think we should try and make any deterministic guarantees
> > about what is in a bundle. Stability guarantees are per element only.
> >
> > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh <tg...@google.com.invalid>
> > wrote:
> >
> >> +1 to the annotation-on-ProcessElement approach. ProcessElement is the
> >> minimum implementation requirement of a DoFn, and should be where the
> >> processing logic which depends on characteristics of the inputs lie.
> It's a
> >> good way of signalling the requirements of the Fn, and letting the
> runner
> >> decide.
> >>
> >> I have a minor concern that this may not work as expected for users that
> >> try to batch remote calls in `FinishBundle` - we should make sure we
> >> document that it is explicitly the input elements that will be replayed,
> >> and bundles and other operational are still arbitrary.
> >>
> >>
> >>
> >> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax <re...@google.com.invalid>
> >> wrote:
> >>
> >>> I think deterministic here means deterministically replayable. i.e. no
> >>> matter how many times the element is retried, it will always be the
> same.
> >>>
> >>> I think we should also allow specifying this on processTimer. This
> would
> >>> mean that any keyed state written in a previous processElement must be
> >>> guaranteed durable before processTimer is called.
> >>>
> >>>
> >>> On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers <bchamb...@apache.org>
> >>> wrote:
> >>>
> >>>> I strongly agree with this proposal. I think moving away from "just
> >>> insert
> >>>> a GroupByKey for one of the 3 different reasons you may want it"
> >> towards
> >>>> APIs that allow code to express the requirements they have and the
> >> runner
> >>>> to choose the best way to meet this is a major step forwards in terms
> >> of
> >>>> portability.
> >>>>
> >>>> I think "deterministic" may be misleading. The actual contents of the
> >>>> collection aren't deterministic if upstream computations aren't. The
> >>>> property we really need is that once an input may have been observed
> by
> >>> the
> >>>> side-effecting code it will never be observed with a different value.
> >>>>
> >>>> I would propose something RequiresStableInput, to indicate that the
> >> input
> >>>> must be stable as observed by the function. I could also see something
> >>>> hinting at the fact we don't recompute the input, such as
> >>>> RequiresMemoizedInput or RequiresNoRecomputation.
> >>>>
> >>>> -- Ben
> >>>>
> >>>> P.S For anyone interested other uses of GroupByKey that we may want to
> >>>> discuss APIs for would be preventing retry across steps (eg.,
> >> preventing
> >>>> fusion) and redistributing inputs across workers.
> >>>>
> >>>> On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles <k...@google.com.invalid
> >>>
> >>>> wrote:
> >>>>
> >>>>> This came up again, so I wanted to push it along by proposing a
> >>> specific
> >>>>> API for Java that could have a derived API in Python. I am writing
> >> this
> >>>>> quickly to get something out there, so I welcome suggestions for
> >>>> revision.
> >>>>>
> >>>>> Today a DoFn has a @ProcessElement annotated method with various
> >>>> automated
> >>>>> parameters, but most fundamentally this:
> >>>>>
> >>>>> @ProcessElement
> >>>>> public void process(ProcessContext ctx) {
> >>>>>  ctx.element() // to access the current input element
> >>>>>  ctx.output(something) // to write to default output collection
> >>>>>  ctx.output(tag, something) // to write to other output collections
> >>>>> }
> >>>>>
> >>>>> For some time, we have hoped to unpack the context - it is a
> >>>>> backwards-compatibility pattern made obsolete by the new DoFn design.
> >>> So
> >>>>> instead it would look like this:
> >>>>>
> >>>>> @ProcessElement
> >>>>> public void process(Element element, MainOutput mainOutput, ...) {
> >>>>>  element.get() // to access the current input element
> >>>>>  mainOutput.output(something) // to write to the default output
> >>>> collection
> >>>>>  other.output(something) // to write to other output collection
> >>>>> }
> >>>>>
> >>>>> I've deliberately left out the undecided syntax for side outputs. But
> >>> it
> >>>>> would be nice for the tag to be built in to the parameter so it
> >> doesn't
> >>>>> have to be used when calling output().
> >>>>>
> >>>>> One way to enhance this to deterministic input would just be this:
> >>>>>
> >>>>> @ProcessElement
> >>>>> @RequiresDeterministicInput
> >>>>> public void process(Element element, MainOutput mainOutput, ...) {
> >>>>>  element.get() // to access the current input element
> >>>>>  mainOutput.output(something) // to write to the default output
> >>>> collection
> >>>>>  other.output(something) // to write to other output collection
> >>>>> }
> >>>>>
> >>>>> There are really a lot of places where we could put an annotation or
> >>>> change
> >>>>> a type to indicate that the input PCollection should be
> >>>>> well-defined/deterministically-replayable. I don't have a really
> >>> strong
> >>>>> opinion.
> >>>>>
> >>>>> Kenn
> >>>>>
> >>>>> On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers
> >>>> <bchamb...@google.com.invalid
> >>>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Allowing an annotation on DoFn's that produce deterministic output
> >>>> could
> >>>>> be
> >>>>>> added in the future, but doesn't seem like a great option.
> >>>>>>
> >>>>>> 1. It is a correctness issue to assume a DoFn is deterministic and
> >> be
> >>>>>> wrong, so we would need to assume all transform outputs are
> >>>>>> non-deterministic unless annotated. Getting this correct is
> >> difficult
> >>>>> (for
> >>>>>> example, GBK is surprisingly non-deterministic except in specific
> >>>> cases).
> >>>>>>
> >>>>>> 2. It is unlikely to be a major performance improvement, given that
> >>> any
> >>>>>> non-deterministic transform prior to a sink (which are most likely
> >> to
> >>>>>> require deterministic input) will cause additional work to be
> >> needed.
> >>>>>>
> >>>>>> Based on this, it seems like the risk of allowing an annotation is
> >>> high
> >>>>>> while the potential for performance improvements is low. The
> >> current
> >>>>>> proposal (not allowing an annotation) makes sense for now, until we
> >>> can
> >>>>>> demonstrate that the impact on performance is high in cases that
> >>> could
> >>>> be
> >>>>>> avoided with an annotation (in real-world use).
> >>>>>>
> >>>>>> -- Ben
> >>>>>>
> >>>>>> On Tue, Mar 21, 2017 at 2:05 PM vikas rk <vikky...@gmail.com>
> >> wrote:
> >>>>>>
> >>>>>> +1 for the general idea of runners handling it over hard-coded
> >>>>>> implementation strategy.
> >>>>>>
> >>>>>> For the Write transform I believe you are talking about
> >>>> ApplyShardingKey
> >>>>>> <
> >>>>>> https://github.com/apache/beam/blob/d66029cafde152c0a46ebd276ddfa4
> >>>>>> c3e7fd3433/sdks/java/core/src/main/java/org/apache/beam/sdk/
> >>>>>> io/Write.java#L304
> >>>>>>>
> >>>>>> which
> >>>>>> introduces non deterministic behavior when retried?
> >>>>>>
> >>>>>>
> >>>>>> *Let a DoFn declare (mechanism not important right now) that it
> >>>>>> "requiresdeterministic input"*
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> *Each runner will need a way to induce deterministic input - the
> >>>>>> obviouschoice being a materialization.*
> >>>>>>
> >>>>>> Does this mean that a runner will always materialize (or whatever
> >> the
> >>>>>> strategy is) an input PCollection to this DoFn even though the
> >>>>> PCollection
> >>>>>> might have been produced by deterministic transforms? Would it make
> >>>> sense
> >>>>>> to also let DoFns declare if they produce non-deterministic output?
> >>>>>>
> >>>>>> -Vikas
> >>>>>>
> >>>>>>
> >>>>>> On 21 March 2017 at 13:52, Stephen Sisk <s...@google.com.invalid>
> >>>> wrote:
> >>>>>>
> >>>>>>> Hey Kenn-
> >>>>>>>
> >>>>>>> this seems important, but I don't have all the context on what
> >> the
> >>>>>> problem
> >>>>>>> is.
> >>>>>>>
> >>>>>>> Can you explain this sentence "Specifically, there is
> >> pseudorandom
> >>>> data
> >>>>>>> generated and once it has been observed and used to produce a
> >> side
> >>>>>> effect,
> >>>>>>> it cannot be regenerated without erroneous results." ?
> >>>>>>>
> >>>>>>> Where is the pseudorandom data coming from? Perhaps a concrete
> >>>> example
> >>>>>>> would help?
> >>>>>>>
> >>>>>>> S
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles
> >>>> <k...@google.com.invalid
> >>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Problem:
> >>>>>>>>
> >>>>>>>> I will drop all nuance and say that the `Write` transform as it
> >>>>> exists
> >>>>>> in
> >>>>>>>> the SDK is incorrect until we add some specification and APIs.
> >> We
> >>>>> can't
> >>>>>>>> keep shipping an SDK with an unsafe transform in it, and IMO
> >> this
> >>>>>>> certainly
> >>>>>>>> blocks a stable release.
> >>>>>>>>
> >>>>>>>> Specifically, there is pseudorandom data generated and once it
> >>> has
> >>>>> been
> >>>>>>>> observed and used to produce a side effect, it cannot be
> >>>> regenerated
> >>>>>>>> without erroneous results.
> >>>>>>>>
> >>>>>>>> This generalizes: For some side-effecting user-defined
> >> functions,
> >>>> it
> >>>>> is
> >>>>>>>> vital that even across retries/replays they have a consistent
> >>> view
> >>>> of
> >>>>>> the
> >>>>>>>> contents of their input PCollection, because their effect on
> >> the
> >>>>>> outside
> >>>>>>>> world cannot be retracted if/when they fail and are retried.
> >> Once
> >>>> the
> >>>>>>>> runner ensures a consistent view of the input, it is then their
> >>> own
> >>>>>>>> responsibility to be idempotent.
> >>>>>>>>
> >>>>>>>> Ideally we should specify this requirement for the user-defined
> >>>>>> function
> >>>>>>>> without imposing any particular implementation strategy on Beam
> >>>>>> runners.
> >>>>>>>>
> >>>>>>>> Proposal:
> >>>>>>>>
> >>>>>>>> 1. Let a DoFn declare (mechanism not important right now) that
> >> it
> >>>>>>> "requires
> >>>>>>>> deterministic input".
> >>>>>>>>
> >>>>>>>> 2. Each runner will need a way to induce deterministic input -
> >>> the
> >>>>>>> obvious
> >>>>>>>> choice being a materialization.
> >>>>>>>>
> >>>>>>>> I want to keep the discussion focused, so I'm leaving out any
> >>>>>>> possibilities
> >>>>>>>> of taking this further.
> >>>>>>>>
> >>>>>>>> Regarding performance: Today places that require this tend to
> >> be
> >>>>>> already
> >>>>>>>> paying the cost via GroupByKey / Reshuffle operations, since
> >> that
> >>>>> was a
> >>>>>>>> simple way to induce determinism in batch Dataflow* (doesn't
> >> work
> >>>> for
> >>>>>>> most
> >>>>>>>> other runners nor for streaming Dataflow). This change will
> >>>> replace a
> >>>>>>>> hard-coded implementation strategy with a requirement that may
> >> be
> >>>>>>> fulfilled
> >>>>>>>> in the most efficient way available.
> >>>>>>>>
> >>>>>>>> Thoughts?
> >>>>>>>>
> >>>>>>>> Kenn (w/ lots of consult from colleagues, especially Ben)
> >>>>>>>>
> >>>>>>>> * There is some overlap with the reshuffle/redistribute
> >>> discussion
> >>>>>>> because
> >>>>>>>> of this historical situation, but I would like to leave that
> >>>> broader
> >>>>>>>> discussion out of this correctness issue.
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>

Reply via email to