On Thu, Aug 10, 2017 at 1:07 PM, Kenneth Knowles <k...@google.com.invalid>
wrote:

>  > > >    - Does it also imply fixed length and content for value
> iterators?
> > > >
>
> The concept of "value iterator" brings up a nit.
>
> First, there is no such concept in the Beam model, and I don't think there
> should be. I don't think we should special case GBK if we can avoid it. If
> a PCollection contains elements of type KV<K, Iterable<V>> we should have
> the same definition of "stable" whether or not it came from GBK. So I
> think, for some definition of "fixed content", it should imply fixed length
> and content.
>

I agree. Of course it might take longer to implement this well. It's
perfectly acceptable for this to temporarily not work in this scenario, as
long as we know how to make it work.


>
> But actually what is the definition of whether two Iterable<V> values have
> the same content, since we don't require a deterministic coder for the
> values? I think this question applies to this proposal in general.
> Obviously we expect that if the runner is replaying from persisted
> serialized data, then it suffices, but that is too operational for a good
> definition IMO.
>

I don't quite understand what you mean here. The user has a definition of
equality which is independent of coders (e.g. equals() in Java). Even
non-deterministic coders are expected to deserialize to objects that the
user finds equivalent. Or if you want to word things a bit more formally -
the set of encoded objects can be partitioned into equivalence classes,
based on the users' notion of equality. A deterministic coder is simply one
in which each equivalence class has cardinality one.

the KV<K,
> > > > Iterable<V>> has no good way of being deterministic if there is late
> > > data.
> > > > We could do so by forcing the Iterable to be materialized into a
> single
> > > > element, but that would also mean that the entire Iterable must fit
> in
> > > > memory (which at least the Dataflow runner does not require).
> >
>
> As I mention above, the iterable is semantically [part of] a single
> element. So just to unpack this, to make sure we are talking about the same
> thing, I think you are talking about GBK as implemented via GBKO + GABW.
>
> When the output of GABW is required to be stable but the output of GBKO is
> not stable, we don't have stability for free in all cases by inserting a
> GBK, but require something more to make the output of GABW stable, in the
> worst case a full materialization.
>

Correct. My point is that there are alternate, cheaper ways of doing this.
If GABW stores state in an ordered list, it can simply checkpoint a market
into that list to ensure that the output is stabl.


>
> In a perfect (future) world, if we were in accumulating mode (aka each new
> element is a full replacement value) then we would know that the sink will
> just overwrite prior values so it is safe to be unstable w.r.t. to
> triggerings (but not other side effects). We could materialize at most the
> pane index (aka sequence number for the write).
>
> For discarding mode (aka each new element is incremental output) I think
> you need full stability of the pane index / sequence number and its
> associated element. There might be different ways to implement, such as the
> ones suggested so far, depending on the runner.
>
> Kenn
>
> > >
> > > >
> > > > >    - Some examples to clarify nuances would be very useful.
> > > > >
> > > > > State durability semantics for timers that Reuven proposed seem to
> be
> > > > > unrelated to stable input (at model level). It might be simpler to
> > add
> > > > > these semantics first. A lot of deterministic side-effects issues
> can
> > > be
> > > > > handled by durable state in timers. One thing I like about timers
> > > > approach
> > > > > is that it makes the cost more transparent to the user since the
> > state
> > > is
> > > > > explicitly stored.
> > > > >
> > > > >
> > > > > On Thu, Aug 10, 2017 at 10:02 AM, Ben Chambers
> > > > > <bchamb...@google.com.invalid
> > > > > > wrote:
> > > > >
> > > > > > I think it only makes sense in places where a user might
> reasonable
> > > > > require
> > > > > > stable input to ensure idempotency of side-effects. It also only
> > > makes
> > > > > > sense in places where a runner could reasonably provide such a
> > > > guarantee.
> > > > > >
> > > > > > A given Combine is unlikely to have side effects so it is less
> > likely
> > > > to
> > > > > > benefit from stability of the input. Further, the reason writing
> a
> > > > > Combine
> > > > > > is desirable is because its execution can be split up and moved
> to
> > > the
> > > > > > mapper-side (before the GroupByKey). But this division is
> > inherently
> > > > > > non-deterministic, and so it seems unlikely to benefit from
> > > stability.
> > > > > And
> > > > > > many cases where I could see wanting side-effects would end up in
> > > > > > extractOutput, for which there is an easy (arguably better)
> > solution
> > > --
> > > > > > have extractOutput return the accumulators and do the
> side-effects
> > > in a
> > > > > > DoFn afterwards.
> > > > > >
> > > > > > For composites, it is a bit trickier. I could see a case for
> > > supporting
> > > > > it
> > > > > > on composites, but it would need to make it very clear that it
> only
> > > > > > affected the input to the composite. If any of the operations
> > within
> > > > the
> > > > > > composite were non-deterministic, then the outputs of that could
> be
> > > > > > unstable, leading to instability in later parts of the composite.
> > > > > Further,
> > > > > > it doesn't seem to offer much. The composite itself doesn't
> perform
> > > > > > side-effects, so there is no benefit to having the annotation
> there
> > > --
> > > > > > instead, we allow the annotation to be put where it is relevant
> and
> > > > > > important -- on the DoFn's that actually have side-effects that
> > > require
> > > > > > stability.
> > > > > >
> > > > > > On Thu, Aug 10, 2017 at 9:23 AM Reuven Lax
> > <re...@google.com.invalid
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > I don't think it really makes sense to to do this on Combine.
> > And I
> > > > > agree
> > > > > > > with you, it doesn't make sense on composites either.
> > > > > > >
> > > > > > > On Thu, Aug 10, 2017 at 9:19 AM, Scott Wegner
> > > > > <sweg...@google.com.invalid
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Does requires-stable-input only apply to ParDo transforms?
> > > > > > > >
> > > > > > > > I don't think it would make sense to annotate to composite,
> > > because
> > > > > > > > checkpointing should happen as close to the side-effecting
> > > > operation
> > > > > as
> > > > > > > > possible, since upstream transforms within a composite could
> > > > > introduce
> > > > > > > > non-determinism. So it's the primitive transform that should
> > own
> > > > the
> > > > > > > > requirement.
> > > > > > > >
> > > > > > > > Are there other primitives that should be annotated?
> 'Combine'
> > is
> > > > > > > > interesting because it optimized in Dataflow (and perhaps
> other
> > > > > > runners)
> > > > > > > to
> > > > > > > > partially apply before a GroupByKey.
> > > > > > > >
> > > > > > > > On Thu, Aug 10, 2017 at 9:01 AM Tyler Akidau
> > > > > > <taki...@google.com.invalid
> > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > +1 to the annotation idea, and to having it on
> processTimer.
> > > > > > > > >
> > > > > > > > > -Tyler
> > > > > > > > >
> > > > > > > > > On Thu, Aug 10, 2017 at 2:15 AM Aljoscha Krettek <
> > > > > > aljos...@apache.org>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > +1 to the annotation approach. I outlined how
> implementing
> > > this
> > > > > > would
> > > > > > > > > work
> > > > > > > > > > in the Flink runner in the Thread about the exactly-once
> > > Kafka
> > > > > > Sink.
> > > > > > > > > >
> > > > > > > > > > > On 9. Aug 2017, at 23:03, Reuven Lax
> > > > <re...@google.com.INVALID
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > Yes - I don't think we should try and make any
> > > deterministic
> > > > > > > > guarantees
> > > > > > > > > > > about what is in a bundle. Stability guarantees are per
> > > > element
> > > > > > > only.
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh
> > > > > > > <tg...@google.com.invalid
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > >> +1 to the annotation-on-ProcessElement approach.
> > > > > ProcessElement
> > > > > > is
> > > > > > > > the
> > > > > > > > > > >> minimum implementation requirement of a DoFn, and
> should
> > > be
> > > > > > where
> > > > > > > > the
> > > > > > > > > > >> processing logic which depends on characteristics of
> the
> > > > > inputs
> > > > > > > lie.
> > > > > > > > > > It's a
> > > > > > > > > > >> good way of signalling the requirements of the Fn, and
> > > > letting
> > > > > > the
> > > > > > > > > > runner
> > > > > > > > > > >> decide.
> > > > > > > > > > >>
> > > > > > > > > > >> I have a minor concern that this may not work as
> > expected
> > > > for
> > > > > > > users
> > > > > > > > > that
> > > > > > > > > > >> try to batch remote calls in `FinishBundle` - we
> should
> > > make
> > > > > > sure
> > > > > > > we
> > > > > > > > > > >> document that it is explicitly the input elements that
> > > will
> > > > be
> > > > > > > > > replayed,
> > > > > > > > > > >> and bundles and other operational are still arbitrary.
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax
> > > > > > > > <re...@google.com.invalid
> > > > > > > > > >
> > > > > > > > > > >> wrote:
> > > > > > > > > > >>
> > > > > > > > > > >>> I think deterministic here means deterministically
> > > > > replayable.
> > > > > > > i.e.
> > > > > > > > > no
> > > > > > > > > > >>> matter how many times the element is retried, it will
> > > > always
> > > > > be
> > > > > > > the
> > > > > > > > > > same.
> > > > > > > > > > >>>
> > > > > > > > > > >>> I think we should also allow specifying this on
> > > > processTimer.
> > > > > > > This
> > > > > > > > > > would
> > > > > > > > > > >>> mean that any keyed state written in a previous
> > > > > processElement
> > > > > > > must
> > > > > > > > > be
> > > > > > > > > > >>> guaranteed durable before processTimer is called.
> > > > > > > > > > >>>
> > > > > > > > > > >>>
> > > > > > > > > > >>> On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers <
> > > > > > > > bchamb...@apache.org>
> > > > > > > > > > >>> wrote:
> > > > > > > > > > >>>
> > > > > > > > > > >>>> I strongly agree with this proposal. I think moving
> > away
> > > > > from
> > > > > > > > "just
> > > > > > > > > > >>> insert
> > > > > > > > > > >>>> a GroupByKey for one of the 3 different reasons you
> > may
> > > > want
> > > > > > it"
> > > > > > > > > > >> towards
> > > > > > > > > > >>>> APIs that allow code to express the requirements
> they
> > > have
> > > > > and
> > > > > > > the
> > > > > > > > > > >> runner
> > > > > > > > > > >>>> to choose the best way to meet this is a major step
> > > > forwards
> > > > > > in
> > > > > > > > > terms
> > > > > > > > > > >> of
> > > > > > > > > > >>>> portability.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> I think "deterministic" may be misleading. The
> actual
> > > > > contents
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > >>>> collection aren't deterministic if upstream
> > computations
> > > > > > aren't.
> > > > > > > > The
> > > > > > > > > > >>>> property we really need is that once an input may
> have
> > > > been
> > > > > > > > observed
> > > > > > > > > > by
> > > > > > > > > > >>> the
> > > > > > > > > > >>>> side-effecting code it will never be observed with a
> > > > > different
> > > > > > > > > value.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> I would propose something RequiresStableInput, to
> > > indicate
> > > > > > that
> > > > > > > > the
> > > > > > > > > > >> input
> > > > > > > > > > >>>> must be stable as observed by the function. I could
> > also
> > > > see
> > > > > > > > > something
> > > > > > > > > > >>>> hinting at the fact we don't recompute the input,
> such
> > > as
> > > > > > > > > > >>>> RequiresMemoizedInput or RequiresNoRecomputation.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> -- Ben
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> P.S For anyone interested other uses of GroupByKey
> > that
> > > we
> > > > > may
> > > > > > > > want
> > > > > > > > > to
> > > > > > > > > > >>>> discuss APIs for would be preventing retry across
> > steps
> > > > > (eg.,
> > > > > > > > > > >> preventing
> > > > > > > > > > >>>> fusion) and redistributing inputs across workers.
> > > > > > > > > > >>>>
> > > > > > > > > > >>>> On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles
> > > > > > > > > <k...@google.com.invalid
> > > > > > > > > > >>>
> > > > > > > > > > >>>> wrote:
> > > > > > > > > > >>>>
> > > > > > > > > > >>>>> This came up again, so I wanted to push it along by
> > > > > > proposing a
> > > > > > > > > > >>> specific
> > > > > > > > > > >>>>> API for Java that could have a derived API in
> > Python. I
> > > > am
> > > > > > > > writing
> > > > > > > > > > >> this
> > > > > > > > > > >>>>> quickly to get something out there, so I welcome
> > > > > suggestions
> > > > > > > for
> > > > > > > > > > >>>> revision.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> Today a DoFn has a @ProcessElement annotated method
> > > with
> > > > > > > various
> > > > > > > > > > >>>> automated
> > > > > > > > > > >>>>> parameters, but most fundamentally this:
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> @ProcessElement
> > > > > > > > > > >>>>> public void process(ProcessContext ctx) {
> > > > > > > > > > >>>>>  ctx.element() // to access the current input
> element
> > > > > > > > > > >>>>>  ctx.output(something) // to write to default
> output
> > > > > > collection
> > > > > > > > > > >>>>>  ctx.output(tag, something) // to write to other
> > output
> > > > > > > > collections
> > > > > > > > > > >>>>> }
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> For some time, we have hoped to unpack the context
> -
> > it
> > > > is
> > > > > a
> > > > > > > > > > >>>>> backwards-compatibility pattern made obsolete by
> the
> > > new
> > > > > DoFn
> > > > > > > > > design.
> > > > > > > > > > >>> So
> > > > > > > > > > >>>>> instead it would look like this:
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> @ProcessElement
> > > > > > > > > > >>>>> public void process(Element element, MainOutput
> > > > mainOutput,
> > > > > > > ...)
> > > > > > > > {
> > > > > > > > > > >>>>>  element.get() // to access the current input
> element
> > > > > > > > > > >>>>>  mainOutput.output(something) // to write to the
> > > default
> > > > > > output
> > > > > > > > > > >>>> collection
> > > > > > > > > > >>>>>  other.output(something) // to write to other
> output
> > > > > > collection
> > > > > > > > > > >>>>> }
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> I've deliberately left out the undecided syntax for
> > > side
> > > > > > > outputs.
> > > > > > > > > But
> > > > > > > > > > >>> it
> > > > > > > > > > >>>>> would be nice for the tag to be built in to the
> > > parameter
> > > > > so
> > > > > > it
> > > > > > > > > > >> doesn't
> > > > > > > > > > >>>>> have to be used when calling output().
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> One way to enhance this to deterministic input
> would
> > > just
> > > > > be
> > > > > > > > this:
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> @ProcessElement
> > > > > > > > > > >>>>> @RequiresDeterministicInput
> > > > > > > > > > >>>>> public void process(Element element, MainOutput
> > > > mainOutput,
> > > > > > > ...)
> > > > > > > > {
> > > > > > > > > > >>>>>  element.get() // to access the current input
> element
> > > > > > > > > > >>>>>  mainOutput.output(something) // to write to the
> > > default
> > > > > > output
> > > > > > > > > > >>>> collection
> > > > > > > > > > >>>>>  other.output(something) // to write to other
> output
> > > > > > collection
> > > > > > > > > > >>>>> }
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> There are really a lot of places where we could put
> > an
> > > > > > > annotation
> > > > > > > > > or
> > > > > > > > > > >>>> change
> > > > > > > > > > >>>>> a type to indicate that the input PCollection
> should
> > be
> > > > > > > > > > >>>>> well-defined/deterministically-replayable. I don't
> > > have
> > > > a
> > > > > > > really
> > > > > > > > > > >>> strong
> > > > > > > > > > >>>>> opinion.
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> Kenn
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>> On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers
> > > > > > > > > > >>>> <bchamb...@google.com.invalid
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>> wrote:
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>>> Allowing an annotation on DoFn's that produce
> > > > > deterministic
> > > > > > > > output
> > > > > > > > > > >>>> could
> > > > > > > > > > >>>>> be
> > > > > > > > > > >>>>>> added in the future, but doesn't seem like a great
> > > > option.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> 1. It is a correctness issue to assume a DoFn is
> > > > > > deterministic
> > > > > > > > and
> > > > > > > > > > >> be
> > > > > > > > > > >>>>>> wrong, so we would need to assume all transform
> > > outputs
> > > > > are
> > > > > > > > > > >>>>>> non-deterministic unless annotated. Getting this
> > > correct
> > > > > is
> > > > > > > > > > >> difficult
> > > > > > > > > > >>>>> (for
> > > > > > > > > > >>>>>> example, GBK is surprisingly non-deterministic
> > except
> > > in
> > > > > > > > specific
> > > > > > > > > > >>>> cases).
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> 2. It is unlikely to be a major performance
> > > improvement,
> > > > > > given
> > > > > > > > > that
> > > > > > > > > > >>> any
> > > > > > > > > > >>>>>> non-deterministic transform prior to a sink (which
> > are
> > > > > most
> > > > > > > > likely
> > > > > > > > > > >> to
> > > > > > > > > > >>>>>> require deterministic input) will cause additional
> > > work
> > > > to
> > > > > > be
> > > > > > > > > > >> needed.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> Based on this, it seems like the risk of allowing
> an
> > > > > > > annotation
> > > > > > > > is
> > > > > > > > > > >>> high
> > > > > > > > > > >>>>>> while the potential for performance improvements
> is
> > > low.
> > > > > The
> > > > > > > > > > >> current
> > > > > > > > > > >>>>>> proposal (not allowing an annotation) makes sense
> > for
> > > > now,
> > > > > > > until
> > > > > > > > > we
> > > > > > > > > > >>> can
> > > > > > > > > > >>>>>> demonstrate that the impact on performance is high
> > in
> > > > > cases
> > > > > > > that
> > > > > > > > > > >>> could
> > > > > > > > > > >>>> be
> > > > > > > > > > >>>>>> avoided with an annotation (in real-world use).
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> -- Ben
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> On Tue, Mar 21, 2017 at 2:05 PM vikas rk <
> > > > > > vikky...@gmail.com>
> > > > > > > > > > >> wrote:
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> +1 for the general idea of runners handling it
> over
> > > > > > hard-coded
> > > > > > > > > > >>>>>> implementation strategy.
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> For the Write transform I believe you are talking
> > > about
> > > > > > > > > > >>>> ApplyShardingKey
> > > > > > > > > > >>>>>> <
> > > > > > > > > > >>>>>>
> > > > > > > > > https://github.com/apache/beam/blob/
> > > > d66029cafde152c0a46ebd276ddfa4
> > > > > > > > > > >>>>>> c3e7fd3433/sdks/java/core/src/
> > > > > > main/java/org/apache/beam/sdk/
> > > > > > > > > > >>>>>> io/Write.java#L304
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>> which
> > > > > > > > > > >>>>>> introduces non deterministic behavior when
> retried?
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> *Let a DoFn declare (mechanism not important right
> > > now)
> > > > > that
> > > > > > > it
> > > > > > > > > > >>>>>> "requiresdeterministic input"*
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> *Each runner will need a way to induce
> deterministic
> > > > > input -
> > > > > > > the
> > > > > > > > > > >>>>>> obviouschoice being a materialization.*
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> Does this mean that a runner will always
> materialize
> > > (or
> > > > > > > > whatever
> > > > > > > > > > >> the
> > > > > > > > > > >>>>>> strategy is) an input PCollection to this DoFn
> even
> > > > though
> > > > > > the
> > > > > > > > > > >>>>> PCollection
> > > > > > > > > > >>>>>> might have been produced by deterministic
> > transforms?
> > > > > Would
> > > > > > it
> > > > > > > > > make
> > > > > > > > > > >>>> sense
> > > > > > > > > > >>>>>> to also let DoFns declare if they produce
> > > > > non-deterministic
> > > > > > > > > output?
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> -Vikas
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>> On 21 March 2017 at 13:52, Stephen Sisk
> > > > > > > <s...@google.com.invalid
> > > > > > > > >
> > > > > > > > > > >>>> wrote:
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>> Hey Kenn-
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> this seems important, but I don't have all the
> > > context
> > > > on
> > > > > > > what
> > > > > > > > > > >> the
> > > > > > > > > > >>>>>> problem
> > > > > > > > > > >>>>>>> is.
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Can you explain this sentence "Specifically,
> there
> > is
> > > > > > > > > > >> pseudorandom
> > > > > > > > > > >>>> data
> > > > > > > > > > >>>>>>> generated and once it has been observed and used
> to
> > > > > > produce a
> > > > > > > > > > >> side
> > > > > > > > > > >>>>>> effect,
> > > > > > > > > > >>>>>>> it cannot be regenerated without erroneous
> > results."
> > > ?
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> Where is the pseudorandom data coming from?
> > Perhaps a
> > > > > > > concrete
> > > > > > > > > > >>>> example
> > > > > > > > > > >>>>>>> would help?
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> S
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>> On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles
> > > > > > > > > > >>>> <k...@google.com.invalid
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>>> wrote:
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>>> Problem:
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> I will drop all nuance and say that the `Write`
> > > > > transform
> > > > > > as
> > > > > > > > it
> > > > > > > > > > >>>>> exists
> > > > > > > > > > >>>>>> in
> > > > > > > > > > >>>>>>>> the SDK is incorrect until we add some
> > specification
> > > > and
> > > > > > > APIs.
> > > > > > > > > > >> We
> > > > > > > > > > >>>>> can't
> > > > > > > > > > >>>>>>>> keep shipping an SDK with an unsafe transform in
> > it,
> > > > and
> > > > > > IMO
> > > > > > > > > > >> this
> > > > > > > > > > >>>>>>> certainly
> > > > > > > > > > >>>>>>>> blocks a stable release.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> Specifically, there is pseudorandom data
> generated
> > > and
> > > > > > once
> > > > > > > it
> > > > > > > > > > >>> has
> > > > > > > > > > >>>>> been
> > > > > > > > > > >>>>>>>> observed and used to produce a side effect, it
> > > cannot
> > > > be
> > > > > > > > > > >>>> regenerated
> > > > > > > > > > >>>>>>>> without erroneous results.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> This generalizes: For some side-effecting
> > > user-defined
> > > > > > > > > > >> functions,
> > > > > > > > > > >>>> it
> > > > > > > > > > >>>>> is
> > > > > > > > > > >>>>>>>> vital that even across retries/replays they
> have a
> > > > > > > consistent
> > > > > > > > > > >>> view
> > > > > > > > > > >>>> of
> > > > > > > > > > >>>>>> the
> > > > > > > > > > >>>>>>>> contents of their input PCollection, because
> their
> > > > > effect
> > > > > > on
> > > > > > > > > > >> the
> > > > > > > > > > >>>>>> outside
> > > > > > > > > > >>>>>>>> world cannot be retracted if/when they fail and
> > are
> > > > > > retried.
> > > > > > > > > > >> Once
> > > > > > > > > > >>>> the
> > > > > > > > > > >>>>>>>> runner ensures a consistent view of the input,
> it
> > is
> > > > > then
> > > > > > > > their
> > > > > > > > > > >>> own
> > > > > > > > > > >>>>>>>> responsibility to be idempotent.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> Ideally we should specify this requirement for
> the
> > > > > > > > user-defined
> > > > > > > > > > >>>>>> function
> > > > > > > > > > >>>>>>>> without imposing any particular implementation
> > > > strategy
> > > > > on
> > > > > > > > Beam
> > > > > > > > > > >>>>>> runners.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> Proposal:
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> 1. Let a DoFn declare (mechanism not important
> > right
> > > > > now)
> > > > > > > that
> > > > > > > > > > >> it
> > > > > > > > > > >>>>>>> "requires
> > > > > > > > > > >>>>>>>> deterministic input".
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> 2. Each runner will need a way to induce
> > > deterministic
> > > > > > > input -
> > > > > > > > > > >>> the
> > > > > > > > > > >>>>>>> obvious
> > > > > > > > > > >>>>>>>> choice being a materialization.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> I want to keep the discussion focused, so I'm
> > > leaving
> > > > > out
> > > > > > > any
> > > > > > > > > > >>>>>>> possibilities
> > > > > > > > > > >>>>>>>> of taking this further.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> Regarding performance: Today places that require
> > > this
> > > > > tend
> > > > > > > to
> > > > > > > > > > >> be
> > > > > > > > > > >>>>>> already
> > > > > > > > > > >>>>>>>> paying the cost via GroupByKey / Reshuffle
> > > operations,
> > > > > > since
> > > > > > > > > > >> that
> > > > > > > > > > >>>>> was a
> > > > > > > > > > >>>>>>>> simple way to induce determinism in batch
> > Dataflow*
> > > > > > (doesn't
> > > > > > > > > > >> work
> > > > > > > > > > >>>> for
> > > > > > > > > > >>>>>>> most
> > > > > > > > > > >>>>>>>> other runners nor for streaming Dataflow). This
> > > change
> > > > > > will
> > > > > > > > > > >>>> replace a
> > > > > > > > > > >>>>>>>> hard-coded implementation strategy with a
> > > requirement
> > > > > that
> > > > > > > may
> > > > > > > > > > >> be
> > > > > > > > > > >>>>>>> fulfilled
> > > > > > > > > > >>>>>>>> in the most efficient way available.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> Thoughts?
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> Kenn (w/ lots of consult from colleagues,
> > especially
> > > > > Ben)
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>> * There is some overlap with the
> > > > reshuffle/redistribute
> > > > > > > > > > >>> discussion
> > > > > > > > > > >>>>>>> because
> > > > > > > > > > >>>>>>>> of this historical situation, but I would like
> to
> > > > leave
> > > > > > that
> > > > > > > > > > >>>> broader
> > > > > > > > > > >>>>>>>> discussion out of this correctness issue.
> > > > > > > > > > >>>>>>>>
> > > > > > > > > > >>>>>>>
> > > > > > > > > > >>>>>>
> > > > > > > > > > >>>>>
> > > > > > > > > > >>>>
> > > > > > > > > > >>>
> > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to