+1 to the annotation approach. I outlined how implementing this would work in 
the Flink runner in the Thread about the exactly-once Kafka Sink.

> On 9. Aug 2017, at 23:03, Reuven Lax <re...@google.com.INVALID> wrote:
> 
> Yes - I don't think we should try and make any deterministic guarantees
> about what is in a bundle. Stability guarantees are per element only.
> 
> On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh <tg...@google.com.invalid>
> wrote:
> 
>> +1 to the annotation-on-ProcessElement approach. ProcessElement is the
>> minimum implementation requirement of a DoFn, and should be where the
>> processing logic which depends on characteristics of the inputs lie. It's a
>> good way of signalling the requirements of the Fn, and letting the runner
>> decide.
>> 
>> I have a minor concern that this may not work as expected for users that
>> try to batch remote calls in `FinishBundle` - we should make sure we
>> document that it is explicitly the input elements that will be replayed,
>> and bundles and other operational are still arbitrary.
>> 
>> 
>> 
>> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>> 
>>> I think deterministic here means deterministically replayable. i.e. no
>>> matter how many times the element is retried, it will always be the same.
>>> 
>>> I think we should also allow specifying this on processTimer. This would
>>> mean that any keyed state written in a previous processElement must be
>>> guaranteed durable before processTimer is called.
>>> 
>>> 
>>> On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers <bchamb...@apache.org>
>>> wrote:
>>> 
>>>> I strongly agree with this proposal. I think moving away from "just
>>> insert
>>>> a GroupByKey for one of the 3 different reasons you may want it"
>> towards
>>>> APIs that allow code to express the requirements they have and the
>> runner
>>>> to choose the best way to meet this is a major step forwards in terms
>> of
>>>> portability.
>>>> 
>>>> I think "deterministic" may be misleading. The actual contents of the
>>>> collection aren't deterministic if upstream computations aren't. The
>>>> property we really need is that once an input may have been observed by
>>> the
>>>> side-effecting code it will never be observed with a different value.
>>>> 
>>>> I would propose something RequiresStableInput, to indicate that the
>> input
>>>> must be stable as observed by the function. I could also see something
>>>> hinting at the fact we don't recompute the input, such as
>>>> RequiresMemoizedInput or RequiresNoRecomputation.
>>>> 
>>>> -- Ben
>>>> 
>>>> P.S For anyone interested other uses of GroupByKey that we may want to
>>>> discuss APIs for would be preventing retry across steps (eg.,
>> preventing
>>>> fusion) and redistributing inputs across workers.
>>>> 
>>>> On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles <k...@google.com.invalid
>>> 
>>>> wrote:
>>>> 
>>>>> This came up again, so I wanted to push it along by proposing a
>>> specific
>>>>> API for Java that could have a derived API in Python. I am writing
>> this
>>>>> quickly to get something out there, so I welcome suggestions for
>>>> revision.
>>>>> 
>>>>> Today a DoFn has a @ProcessElement annotated method with various
>>>> automated
>>>>> parameters, but most fundamentally this:
>>>>> 
>>>>> @ProcessElement
>>>>> public void process(ProcessContext ctx) {
>>>>>  ctx.element() // to access the current input element
>>>>>  ctx.output(something) // to write to default output collection
>>>>>  ctx.output(tag, something) // to write to other output collections
>>>>> }
>>>>> 
>>>>> For some time, we have hoped to unpack the context - it is a
>>>>> backwards-compatibility pattern made obsolete by the new DoFn design.
>>> So
>>>>> instead it would look like this:
>>>>> 
>>>>> @ProcessElement
>>>>> public void process(Element element, MainOutput mainOutput, ...) {
>>>>>  element.get() // to access the current input element
>>>>>  mainOutput.output(something) // to write to the default output
>>>> collection
>>>>>  other.output(something) // to write to other output collection
>>>>> }
>>>>> 
>>>>> I've deliberately left out the undecided syntax for side outputs. But
>>> it
>>>>> would be nice for the tag to be built in to the parameter so it
>> doesn't
>>>>> have to be used when calling output().
>>>>> 
>>>>> One way to enhance this to deterministic input would just be this:
>>>>> 
>>>>> @ProcessElement
>>>>> @RequiresDeterministicInput
>>>>> public void process(Element element, MainOutput mainOutput, ...) {
>>>>>  element.get() // to access the current input element
>>>>>  mainOutput.output(something) // to write to the default output
>>>> collection
>>>>>  other.output(something) // to write to other output collection
>>>>> }
>>>>> 
>>>>> There are really a lot of places where we could put an annotation or
>>>> change
>>>>> a type to indicate that the input PCollection should be
>>>>> well-defined/deterministically-replayable. I don't have a really
>>> strong
>>>>> opinion.
>>>>> 
>>>>> Kenn
>>>>> 
>>>>> On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers
>>>> <bchamb...@google.com.invalid
>>>>>> 
>>>>> wrote:
>>>>> 
>>>>>> Allowing an annotation on DoFn's that produce deterministic output
>>>> could
>>>>> be
>>>>>> added in the future, but doesn't seem like a great option.
>>>>>> 
>>>>>> 1. It is a correctness issue to assume a DoFn is deterministic and
>> be
>>>>>> wrong, so we would need to assume all transform outputs are
>>>>>> non-deterministic unless annotated. Getting this correct is
>> difficult
>>>>> (for
>>>>>> example, GBK is surprisingly non-deterministic except in specific
>>>> cases).
>>>>>> 
>>>>>> 2. It is unlikely to be a major performance improvement, given that
>>> any
>>>>>> non-deterministic transform prior to a sink (which are most likely
>> to
>>>>>> require deterministic input) will cause additional work to be
>> needed.
>>>>>> 
>>>>>> Based on this, it seems like the risk of allowing an annotation is
>>> high
>>>>>> while the potential for performance improvements is low. The
>> current
>>>>>> proposal (not allowing an annotation) makes sense for now, until we
>>> can
>>>>>> demonstrate that the impact on performance is high in cases that
>>> could
>>>> be
>>>>>> avoided with an annotation (in real-world use).
>>>>>> 
>>>>>> -- Ben
>>>>>> 
>>>>>> On Tue, Mar 21, 2017 at 2:05 PM vikas rk <vikky...@gmail.com>
>> wrote:
>>>>>> 
>>>>>> +1 for the general idea of runners handling it over hard-coded
>>>>>> implementation strategy.
>>>>>> 
>>>>>> For the Write transform I believe you are talking about
>>>> ApplyShardingKey
>>>>>> <
>>>>>> https://github.com/apache/beam/blob/d66029cafde152c0a46ebd276ddfa4
>>>>>> c3e7fd3433/sdks/java/core/src/main/java/org/apache/beam/sdk/
>>>>>> io/Write.java#L304
>>>>>>> 
>>>>>> which
>>>>>> introduces non deterministic behavior when retried?
>>>>>> 
>>>>>> 
>>>>>> *Let a DoFn declare (mechanism not important right now) that it
>>>>>> "requiresdeterministic input"*
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> *Each runner will need a way to induce deterministic input - the
>>>>>> obviouschoice being a materialization.*
>>>>>> 
>>>>>> Does this mean that a runner will always materialize (or whatever
>> the
>>>>>> strategy is) an input PCollection to this DoFn even though the
>>>>> PCollection
>>>>>> might have been produced by deterministic transforms? Would it make
>>>> sense
>>>>>> to also let DoFns declare if they produce non-deterministic output?
>>>>>> 
>>>>>> -Vikas
>>>>>> 
>>>>>> 
>>>>>> On 21 March 2017 at 13:52, Stephen Sisk <s...@google.com.invalid>
>>>> wrote:
>>>>>> 
>>>>>>> Hey Kenn-
>>>>>>> 
>>>>>>> this seems important, but I don't have all the context on what
>> the
>>>>>> problem
>>>>>>> is.
>>>>>>> 
>>>>>>> Can you explain this sentence "Specifically, there is
>> pseudorandom
>>>> data
>>>>>>> generated and once it has been observed and used to produce a
>> side
>>>>>> effect,
>>>>>>> it cannot be regenerated without erroneous results." ?
>>>>>>> 
>>>>>>> Where is the pseudorandom data coming from? Perhaps a concrete
>>>> example
>>>>>>> would help?
>>>>>>> 
>>>>>>> S
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles
>>>> <k...@google.com.invalid
>>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Problem:
>>>>>>>> 
>>>>>>>> I will drop all nuance and say that the `Write` transform as it
>>>>> exists
>>>>>> in
>>>>>>>> the SDK is incorrect until we add some specification and APIs.
>> We
>>>>> can't
>>>>>>>> keep shipping an SDK with an unsafe transform in it, and IMO
>> this
>>>>>>> certainly
>>>>>>>> blocks a stable release.
>>>>>>>> 
>>>>>>>> Specifically, there is pseudorandom data generated and once it
>>> has
>>>>> been
>>>>>>>> observed and used to produce a side effect, it cannot be
>>>> regenerated
>>>>>>>> without erroneous results.
>>>>>>>> 
>>>>>>>> This generalizes: For some side-effecting user-defined
>> functions,
>>>> it
>>>>> is
>>>>>>>> vital that even across retries/replays they have a consistent
>>> view
>>>> of
>>>>>> the
>>>>>>>> contents of their input PCollection, because their effect on
>> the
>>>>>> outside
>>>>>>>> world cannot be retracted if/when they fail and are retried.
>> Once
>>>> the
>>>>>>>> runner ensures a consistent view of the input, it is then their
>>> own
>>>>>>>> responsibility to be idempotent.
>>>>>>>> 
>>>>>>>> Ideally we should specify this requirement for the user-defined
>>>>>> function
>>>>>>>> without imposing any particular implementation strategy on Beam
>>>>>> runners.
>>>>>>>> 
>>>>>>>> Proposal:
>>>>>>>> 
>>>>>>>> 1. Let a DoFn declare (mechanism not important right now) that
>> it
>>>>>>> "requires
>>>>>>>> deterministic input".
>>>>>>>> 
>>>>>>>> 2. Each runner will need a way to induce deterministic input -
>>> the
>>>>>>> obvious
>>>>>>>> choice being a materialization.
>>>>>>>> 
>>>>>>>> I want to keep the discussion focused, so I'm leaving out any
>>>>>>> possibilities
>>>>>>>> of taking this further.
>>>>>>>> 
>>>>>>>> Regarding performance: Today places that require this tend to
>> be
>>>>>> already
>>>>>>>> paying the cost via GroupByKey / Reshuffle operations, since
>> that
>>>>> was a
>>>>>>>> simple way to induce determinism in batch Dataflow* (doesn't
>> work
>>>> for
>>>>>>> most
>>>>>>>> other runners nor for streaming Dataflow). This change will
>>>> replace a
>>>>>>>> hard-coded implementation strategy with a requirement that may
>> be
>>>>>>> fulfilled
>>>>>>>> in the most efficient way available.
>>>>>>>> 
>>>>>>>> Thoughts?
>>>>>>>> 
>>>>>>>> Kenn (w/ lots of consult from colleagues, especially Ben)
>>>>>>>> 
>>>>>>>> * There is some overlap with the reshuffle/redistribute
>>> discussion
>>>>>>> because
>>>>>>>> of this historical situation, but I would like to leave that
>>>> broader
>>>>>>>> discussion out of this correctness issue.
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Reply via email to