I think it must imply fixed content <K, Iter<V>>s - making a decision based
on the contents of an iterable assuming the Iterable is deterministic seems
an acceptable use of the API, and that requires the contents to be
identical through failures. This does imply that (assuming this is reading
directly from the output of a GroupByKey) the elements will be
materialized, grouped, materialized again, and then read back to ensure
that elements are not added on to the end.

I agree that there is no ordering guarantee for the sequence in which
elements are processed.

On Thu, Aug 10, 2017 at 11:03 AM, Reuven Lax <re...@google.com.invalid>
wrote:

> It means that single element replay is stable.
>
> On Thu, Aug 10, 2017 at 10:56 AM, Raghu Angadi <rang...@google.com.invalid
> >
> wrote:
>
> > Can we define what exactly is meant by deterministic/stable/replayable
> > etc?
> >
> >    - Does it imply a fixed order? If yes, it implies fixed order of
> >    processElement() invocations, right? Are there any qualifiers (within
> a
> >    window+key etc)?
> >
>
> No, no ordering guarantee.
>
>
> >    - Does it also imply fixed length and content for value iterators?
> >
>
> Good point. With our current runner api, it does not. the KV<K,
> Iterable<V>> has no good way of being deterministic if there is late data.
> We could do so by forcing the Iterable to be materialized into a single
> element, but that would also mean that the entire Iterable must fit in
> memory (which at least the Dataflow runner does not require).
>
>
> >    - Some examples to clarify nuances would be very useful.
> >
> > State durability semantics for timers that Reuven proposed seem to be
> > unrelated to stable input (at model level). It might be simpler to add
> > these semantics first. A lot of deterministic side-effects issues can be
> > handled by durable state in timers. One thing I like about timers
> approach
> > is that it makes the cost more transparent to the user since the state is
> > explicitly stored.
> >
> >
> > On Thu, Aug 10, 2017 at 10:02 AM, Ben Chambers
> > <bchamb...@google.com.invalid
> > > wrote:
> >
> > > I think it only makes sense in places where a user might reasonable
> > require
> > > stable input to ensure idempotency of side-effects. It also only makes
> > > sense in places where a runner could reasonably provide such a
> guarantee.
> > >
> > > A given Combine is unlikely to have side effects so it is less likely
> to
> > > benefit from stability of the input. Further, the reason writing a
> > Combine
> > > is desirable is because its execution can be split up and moved to the
> > > mapper-side (before the GroupByKey). But this division is inherently
> > > non-deterministic, and so it seems unlikely to benefit from stability.
> > And
> > > many cases where I could see wanting side-effects would end up in
> > > extractOutput, for which there is an easy (arguably better) solution --
> > > have extractOutput return the accumulators and do the side-effects in a
> > > DoFn afterwards.
> > >
> > > For composites, it is a bit trickier. I could see a case for supporting
> > it
> > > on composites, but it would need to make it very clear that it only
> > > affected the input to the composite. If any of the operations within
> the
> > > composite were non-deterministic, then the outputs of that could be
> > > unstable, leading to instability in later parts of the composite.
> > Further,
> > > it doesn't seem to offer much. The composite itself doesn't perform
> > > side-effects, so there is no benefit to having the annotation there --
> > > instead, we allow the annotation to be put where it is relevant and
> > > important -- on the DoFn's that actually have side-effects that require
> > > stability.
> > >
> > > On Thu, Aug 10, 2017 at 9:23 AM Reuven Lax <re...@google.com.invalid>
> > > wrote:
> > >
> > > > I don't think it really makes sense to to do this on Combine. And I
> > agree
> > > > with you, it doesn't make sense on composites either.
> > > >
> > > > On Thu, Aug 10, 2017 at 9:19 AM, Scott Wegner
> > <sweg...@google.com.invalid
> > > >
> > > > wrote:
> > > >
> > > > > Does requires-stable-input only apply to ParDo transforms?
> > > > >
> > > > > I don't think it would make sense to annotate to composite, because
> > > > > checkpointing should happen as close to the side-effecting
> operation
> > as
> > > > > possible, since upstream transforms within a composite could
> > introduce
> > > > > non-determinism. So it's the primitive transform that should own
> the
> > > > > requirement.
> > > > >
> > > > > Are there other primitives that should be annotated? 'Combine' is
> > > > > interesting because it optimized in Dataflow (and perhaps other
> > > runners)
> > > > to
> > > > > partially apply before a GroupByKey.
> > > > >
> > > > > On Thu, Aug 10, 2017 at 9:01 AM Tyler Akidau
> > > <taki...@google.com.invalid
> > > > >
> > > > > wrote:
> > > > >
> > > > > > +1 to the annotation idea, and to having it on processTimer.
> > > > > >
> > > > > > -Tyler
> > > > > >
> > > > > > On Thu, Aug 10, 2017 at 2:15 AM Aljoscha Krettek <
> > > aljos...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > +1 to the annotation approach. I outlined how implementing this
> > > would
> > > > > > work
> > > > > > > in the Flink runner in the Thread about the exactly-once Kafka
> > > Sink.
> > > > > > >
> > > > > > > > On 9. Aug 2017, at 23:03, Reuven Lax
> <re...@google.com.INVALID
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > Yes - I don't think we should try and make any deterministic
> > > > > guarantees
> > > > > > > > about what is in a bundle. Stability guarantees are per
> element
> > > > only.
> > > > > > > >
> > > > > > > > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh
> > > > <tg...@google.com.invalid
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> +1 to the annotation-on-ProcessElement approach.
> > ProcessElement
> > > is
> > > > > the
> > > > > > > >> minimum implementation requirement of a DoFn, and should be
> > > where
> > > > > the
> > > > > > > >> processing logic which depends on characteristics of the
> > inputs
> > > > lie.
> > > > > > > It's a
> > > > > > > >> good way of signalling the requirements of the Fn, and
> letting
> > > the
> > > > > > > runner
> > > > > > > >> decide.
> > > > > > > >>
> > > > > > > >> I have a minor concern that this may not work as expected
> for
> > > > users
> > > > > > that
> > > > > > > >> try to batch remote calls in `FinishBundle` - we should make
> > > sure
> > > > we
> > > > > > > >> document that it is explicitly the input elements that will
> be
> > > > > > replayed,
> > > > > > > >> and bundles and other operational are still arbitrary.
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax
> > > > > <re...@google.com.invalid
> > > > > > >
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >>> I think deterministic here means deterministically
> > replayable.
> > > > i.e.
> > > > > > no
> > > > > > > >>> matter how many times the element is retried, it will
> always
> > be
> > > > the
> > > > > > > same.
> > > > > > > >>>
> > > > > > > >>> I think we should also allow specifying this on
> processTimer.
> > > > This
> > > > > > > would
> > > > > > > >>> mean that any keyed state written in a previous
> > processElement
> > > > must
> > > > > > be
> > > > > > > >>> guaranteed durable before processTimer is called.
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers <
> > > > > bchamb...@apache.org>
> > > > > > > >>> wrote:
> > > > > > > >>>
> > > > > > > >>>> I strongly agree with this proposal. I think moving away
> > from
> > > > > "just
> > > > > > > >>> insert
> > > > > > > >>>> a GroupByKey for one of the 3 different reasons you may
> want
> > > it"
> > > > > > > >> towards
> > > > > > > >>>> APIs that allow code to express the requirements they have
> > and
> > > > the
> > > > > > > >> runner
> > > > > > > >>>> to choose the best way to meet this is a major step
> forwards
> > > in
> > > > > > terms
> > > > > > > >> of
> > > > > > > >>>> portability.
> > > > > > > >>>>
> > > > > > > >>>> I think "deterministic" may be misleading. The actual
> > contents
> > > > of
> > > > > > the
> > > > > > > >>>> collection aren't deterministic if upstream computations
> > > aren't.
> > > > > The
> > > > > > > >>>> property we really need is that once an input may have
> been
> > > > > observed
> > > > > > > by
> > > > > > > >>> the
> > > > > > > >>>> side-effecting code it will never be observed with a
> > different
> > > > > > value.
> > > > > > > >>>>
> > > > > > > >>>> I would propose something RequiresStableInput, to indicate
> > > that
> > > > > the
> > > > > > > >> input
> > > > > > > >>>> must be stable as observed by the function. I could also
> see
> > > > > > something
> > > > > > > >>>> hinting at the fact we don't recompute the input, such as
> > > > > > > >>>> RequiresMemoizedInput or RequiresNoRecomputation.
> > > > > > > >>>>
> > > > > > > >>>> -- Ben
> > > > > > > >>>>
> > > > > > > >>>> P.S For anyone interested other uses of GroupByKey that we
> > may
> > > > > want
> > > > > > to
> > > > > > > >>>> discuss APIs for would be preventing retry across steps
> > (eg.,
> > > > > > > >> preventing
> > > > > > > >>>> fusion) and redistributing inputs across workers.
> > > > > > > >>>>
> > > > > > > >>>> On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles
> > > > > > <k...@google.com.invalid
> > > > > > > >>>
> > > > > > > >>>> wrote:
> > > > > > > >>>>
> > > > > > > >>>>> This came up again, so I wanted to push it along by
> > > proposing a
> > > > > > > >>> specific
> > > > > > > >>>>> API for Java that could have a derived API in Python. I
> am
> > > > > writing
> > > > > > > >> this
> > > > > > > >>>>> quickly to get something out there, so I welcome
> > suggestions
> > > > for
> > > > > > > >>>> revision.
> > > > > > > >>>>>
> > > > > > > >>>>> Today a DoFn has a @ProcessElement annotated method with
> > > > various
> > > > > > > >>>> automated
> > > > > > > >>>>> parameters, but most fundamentally this:
> > > > > > > >>>>>
> > > > > > > >>>>> @ProcessElement
> > > > > > > >>>>> public void process(ProcessContext ctx) {
> > > > > > > >>>>>  ctx.element() // to access the current input element
> > > > > > > >>>>>  ctx.output(something) // to write to default output
> > > collection
> > > > > > > >>>>>  ctx.output(tag, something) // to write to other output
> > > > > collections
> > > > > > > >>>>> }
> > > > > > > >>>>>
> > > > > > > >>>>> For some time, we have hoped to unpack the context - it
> is
> > a
> > > > > > > >>>>> backwards-compatibility pattern made obsolete by the new
> > DoFn
> > > > > > design.
> > > > > > > >>> So
> > > > > > > >>>>> instead it would look like this:
> > > > > > > >>>>>
> > > > > > > >>>>> @ProcessElement
> > > > > > > >>>>> public void process(Element element, MainOutput
> mainOutput,
> > > > ...)
> > > > > {
> > > > > > > >>>>>  element.get() // to access the current input element
> > > > > > > >>>>>  mainOutput.output(something) // to write to the default
> > > output
> > > > > > > >>>> collection
> > > > > > > >>>>>  other.output(something) // to write to other output
> > > collection
> > > > > > > >>>>> }
> > > > > > > >>>>>
> > > > > > > >>>>> I've deliberately left out the undecided syntax for side
> > > > outputs.
> > > > > > But
> > > > > > > >>> it
> > > > > > > >>>>> would be nice for the tag to be built in to the parameter
> > so
> > > it
> > > > > > > >> doesn't
> > > > > > > >>>>> have to be used when calling output().
> > > > > > > >>>>>
> > > > > > > >>>>> One way to enhance this to deterministic input would just
> > be
> > > > > this:
> > > > > > > >>>>>
> > > > > > > >>>>> @ProcessElement
> > > > > > > >>>>> @RequiresDeterministicInput
> > > > > > > >>>>> public void process(Element element, MainOutput
> mainOutput,
> > > > ...)
> > > > > {
> > > > > > > >>>>>  element.get() // to access the current input element
> > > > > > > >>>>>  mainOutput.output(something) // to write to the default
> > > output
> > > > > > > >>>> collection
> > > > > > > >>>>>  other.output(something) // to write to other output
> > > collection
> > > > > > > >>>>> }
> > > > > > > >>>>>
> > > > > > > >>>>> There are really a lot of places where we could put an
> > > > annotation
> > > > > > or
> > > > > > > >>>> change
> > > > > > > >>>>> a type to indicate that the input PCollection should be
> > > > > > > >>>>> well-defined/deterministically-replayable. I don't have
> a
> > > > really
> > > > > > > >>> strong
> > > > > > > >>>>> opinion.
> > > > > > > >>>>>
> > > > > > > >>>>> Kenn
> > > > > > > >>>>>
> > > > > > > >>>>> On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers
> > > > > > > >>>> <bchamb...@google.com.invalid
> > > > > > > >>>>>>
> > > > > > > >>>>> wrote:
> > > > > > > >>>>>
> > > > > > > >>>>>> Allowing an annotation on DoFn's that produce
> > deterministic
> > > > > output
> > > > > > > >>>> could
> > > > > > > >>>>> be
> > > > > > > >>>>>> added in the future, but doesn't seem like a great
> option.
> > > > > > > >>>>>>
> > > > > > > >>>>>> 1. It is a correctness issue to assume a DoFn is
> > > deterministic
> > > > > and
> > > > > > > >> be
> > > > > > > >>>>>> wrong, so we would need to assume all transform outputs
> > are
> > > > > > > >>>>>> non-deterministic unless annotated. Getting this correct
> > is
> > > > > > > >> difficult
> > > > > > > >>>>> (for
> > > > > > > >>>>>> example, GBK is surprisingly non-deterministic except in
> > > > > specific
> > > > > > > >>>> cases).
> > > > > > > >>>>>>
> > > > > > > >>>>>> 2. It is unlikely to be a major performance improvement,
> > > given
> > > > > > that
> > > > > > > >>> any
> > > > > > > >>>>>> non-deterministic transform prior to a sink (which are
> > most
> > > > > likely
> > > > > > > >> to
> > > > > > > >>>>>> require deterministic input) will cause additional work
> to
> > > be
> > > > > > > >> needed.
> > > > > > > >>>>>>
> > > > > > > >>>>>> Based on this, it seems like the risk of allowing an
> > > > annotation
> > > > > is
> > > > > > > >>> high
> > > > > > > >>>>>> while the potential for performance improvements is low.
> > The
> > > > > > > >> current
> > > > > > > >>>>>> proposal (not allowing an annotation) makes sense for
> now,
> > > > until
> > > > > > we
> > > > > > > >>> can
> > > > > > > >>>>>> demonstrate that the impact on performance is high in
> > cases
> > > > that
> > > > > > > >>> could
> > > > > > > >>>> be
> > > > > > > >>>>>> avoided with an annotation (in real-world use).
> > > > > > > >>>>>>
> > > > > > > >>>>>> -- Ben
> > > > > > > >>>>>>
> > > > > > > >>>>>> On Tue, Mar 21, 2017 at 2:05 PM vikas rk <
> > > vikky...@gmail.com>
> > > > > > > >> wrote:
> > > > > > > >>>>>>
> > > > > > > >>>>>> +1 for the general idea of runners handling it over
> > > hard-coded
> > > > > > > >>>>>> implementation strategy.
> > > > > > > >>>>>>
> > > > > > > >>>>>> For the Write transform I believe you are talking about
> > > > > > > >>>> ApplyShardingKey
> > > > > > > >>>>>> <
> > > > > > > >>>>>>
> > > > > > https://github.com/apache/beam/blob/
> d66029cafde152c0a46ebd276ddfa4
> > > > > > > >>>>>> c3e7fd3433/sdks/java/core/src/
> > > main/java/org/apache/beam/sdk/
> > > > > > > >>>>>> io/Write.java#L304
> > > > > > > >>>>>>>
> > > > > > > >>>>>> which
> > > > > > > >>>>>> introduces non deterministic behavior when retried?
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>> *Let a DoFn declare (mechanism not important right now)
> > that
> > > > it
> > > > > > > >>>>>> "requiresdeterministic input"*
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>> *Each runner will need a way to induce deterministic
> > input -
> > > > the
> > > > > > > >>>>>> obviouschoice being a materialization.*
> > > > > > > >>>>>>
> > > > > > > >>>>>> Does this mean that a runner will always materialize (or
> > > > > whatever
> > > > > > > >> the
> > > > > > > >>>>>> strategy is) an input PCollection to this DoFn even
> though
> > > the
> > > > > > > >>>>> PCollection
> > > > > > > >>>>>> might have been produced by deterministic transforms?
> > Would
> > > it
> > > > > > make
> > > > > > > >>>> sense
> > > > > > > >>>>>> to also let DoFns declare if they produce
> > non-deterministic
> > > > > > output?
> > > > > > > >>>>>>
> > > > > > > >>>>>> -Vikas
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>> On 21 March 2017 at 13:52, Stephen Sisk
> > > > <s...@google.com.invalid
> > > > > >
> > > > > > > >>>> wrote:
> > > > > > > >>>>>>
> > > > > > > >>>>>>> Hey Kenn-
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> this seems important, but I don't have all the context
> on
> > > > what
> > > > > > > >> the
> > > > > > > >>>>>> problem
> > > > > > > >>>>>>> is.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Can you explain this sentence "Specifically, there is
> > > > > > > >> pseudorandom
> > > > > > > >>>> data
> > > > > > > >>>>>>> generated and once it has been observed and used to
> > > produce a
> > > > > > > >> side
> > > > > > > >>>>>> effect,
> > > > > > > >>>>>>> it cannot be regenerated without erroneous results." ?
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Where is the pseudorandom data coming from? Perhaps a
> > > > concrete
> > > > > > > >>>> example
> > > > > > > >>>>>>> would help?
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> S
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles
> > > > > > > >>>> <k...@google.com.invalid
> > > > > > > >>>>>>
> > > > > > > >>>>>>> wrote:
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>> Problem:
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> I will drop all nuance and say that the `Write`
> > transform
> > > as
> > > > > it
> > > > > > > >>>>> exists
> > > > > > > >>>>>> in
> > > > > > > >>>>>>>> the SDK is incorrect until we add some specification
> and
> > > > APIs.
> > > > > > > >> We
> > > > > > > >>>>> can't
> > > > > > > >>>>>>>> keep shipping an SDK with an unsafe transform in it,
> and
> > > IMO
> > > > > > > >> this
> > > > > > > >>>>>>> certainly
> > > > > > > >>>>>>>> blocks a stable release.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Specifically, there is pseudorandom data generated and
> > > once
> > > > it
> > > > > > > >>> has
> > > > > > > >>>>> been
> > > > > > > >>>>>>>> observed and used to produce a side effect, it cannot
> be
> > > > > > > >>>> regenerated
> > > > > > > >>>>>>>> without erroneous results.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> This generalizes: For some side-effecting user-defined
> > > > > > > >> functions,
> > > > > > > >>>> it
> > > > > > > >>>>> is
> > > > > > > >>>>>>>> vital that even across retries/replays they have a
> > > > consistent
> > > > > > > >>> view
> > > > > > > >>>> of
> > > > > > > >>>>>> the
> > > > > > > >>>>>>>> contents of their input PCollection, because their
> > effect
> > > on
> > > > > > > >> the
> > > > > > > >>>>>> outside
> > > > > > > >>>>>>>> world cannot be retracted if/when they fail and are
> > > retried.
> > > > > > > >> Once
> > > > > > > >>>> the
> > > > > > > >>>>>>>> runner ensures a consistent view of the input, it is
> > then
> > > > > their
> > > > > > > >>> own
> > > > > > > >>>>>>>> responsibility to be idempotent.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Ideally we should specify this requirement for the
> > > > > user-defined
> > > > > > > >>>>>> function
> > > > > > > >>>>>>>> without imposing any particular implementation
> strategy
> > on
> > > > > Beam
> > > > > > > >>>>>> runners.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Proposal:
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> 1. Let a DoFn declare (mechanism not important right
> > now)
> > > > that
> > > > > > > >> it
> > > > > > > >>>>>>> "requires
> > > > > > > >>>>>>>> deterministic input".
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> 2. Each runner will need a way to induce deterministic
> > > > input -
> > > > > > > >>> the
> > > > > > > >>>>>>> obvious
> > > > > > > >>>>>>>> choice being a materialization.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> I want to keep the discussion focused, so I'm leaving
> > out
> > > > any
> > > > > > > >>>>>>> possibilities
> > > > > > > >>>>>>>> of taking this further.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Regarding performance: Today places that require this
> > tend
> > > > to
> > > > > > > >> be
> > > > > > > >>>>>> already
> > > > > > > >>>>>>>> paying the cost via GroupByKey / Reshuffle operations,
> > > since
> > > > > > > >> that
> > > > > > > >>>>> was a
> > > > > > > >>>>>>>> simple way to induce determinism in batch Dataflow*
> > > (doesn't
> > > > > > > >> work
> > > > > > > >>>> for
> > > > > > > >>>>>>> most
> > > > > > > >>>>>>>> other runners nor for streaming Dataflow). This change
> > > will
> > > > > > > >>>> replace a
> > > > > > > >>>>>>>> hard-coded implementation strategy with a requirement
> > that
> > > > may
> > > > > > > >> be
> > > > > > > >>>>>>> fulfilled
> > > > > > > >>>>>>>> in the most efficient way available.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Thoughts?
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Kenn (w/ lots of consult from colleagues, especially
> > Ben)
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> * There is some overlap with the
> reshuffle/redistribute
> > > > > > > >>> discussion
> > > > > > > >>>>>>> because
> > > > > > > >>>>>>>> of this historical situation, but I would like to
> leave
> > > that
> > > > > > > >>>> broader
> > > > > > > >>>>>>>> discussion out of this correctness issue.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>
> > > > > > > >>>>
> > > > > > > >>>
> > > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to