As I said, a minor concern; we should be explicit in our documentation that it is only the input _elements_ that are deterministic/stable/replayable/etc, and not operational concerns surrounding them (such as bundling). I'd generally avoid making the actual annotation more verbose.
On Wed, Aug 9, 2017 at 1:49 PM, Kenneth Knowles <k...@google.com.invalid> wrote: > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh <tg...@google.com.invalid> > wrote: > > > I have a minor concern that this may not work as expected for users that > > try to batch remote calls in `FinishBundle` - we should make sure we > > document that it is explicitly the input elements that will be replayed, > > and bundles and other operational are still arbitrary. > > > > I think it is safe since writing has to be idempotent per-element anyhow, > and a DoFn must be discarded on failure and a new version used for any > retries. > > Was your concern something else? Were you concerned that the phrasing may > imply a deterministic ordering of calls to the DoFn's methods? Perhaps > @RequiresStableInputContents to make clear it has nothing to do with > replaying method calls? > > Kenn > > > > > > > > On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax <re...@google.com.invalid> > > wrote: > > > > > I think deterministic here means deterministically replayable. i.e. no > > > matter how many times the element is retried, it will always be the > same. > > > > > > I think we should also allow specifying this on processTimer. This > would > > > mean that any keyed state written in a previous processElement must be > > > guaranteed durable before processTimer is called. > > > > > > > > > On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers <bchamb...@apache.org> > > > wrote: > > > > > > > I strongly agree with this proposal. I think moving away from "just > > > insert > > > > a GroupByKey for one of the 3 different reasons you may want it" > > towards > > > > APIs that allow code to express the requirements they have and the > > runner > > > > to choose the best way to meet this is a major step forwards in terms > > of > > > > portability. > > > > > > > > I think "deterministic" may be misleading. The actual contents of the > > > > collection aren't deterministic if upstream computations aren't. The > > > > property we really need is that once an input may have been observed > by > > > the > > > > side-effecting code it will never be observed with a different value. > > > > > > > > I would propose something RequiresStableInput, to indicate that the > > input > > > > must be stable as observed by the function. I could also see > something > > > > hinting at the fact we don't recompute the input, such as > > > > RequiresMemoizedInput or RequiresNoRecomputation. > > > > > > > > -- Ben > > > > > > > > P.S For anyone interested other uses of GroupByKey that we may want > to > > > > discuss APIs for would be preventing retry across steps (eg., > > preventing > > > > fusion) and redistributing inputs across workers. > > > > > > > > On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles > <k...@google.com.invalid > > > > > > > wrote: > > > > > > > > > This came up again, so I wanted to push it along by proposing a > > > specific > > > > > API for Java that could have a derived API in Python. I am writing > > this > > > > > quickly to get something out there, so I welcome suggestions for > > > > revision. > > > > > > > > > > Today a DoFn has a @ProcessElement annotated method with various > > > > automated > > > > > parameters, but most fundamentally this: > > > > > > > > > > @ProcessElement > > > > > public void process(ProcessContext ctx) { > > > > > ctx.element() // to access the current input element > > > > > ctx.output(something) // to write to default output collection > > > > > ctx.output(tag, something) // to write to other output > collections > > > > > } > > > > > > > > > > For some time, we have hoped to unpack the context - it is a > > > > > backwards-compatibility pattern made obsolete by the new DoFn > design. > > > So > > > > > instead it would look like this: > > > > > > > > > > @ProcessElement > > > > > public void process(Element element, MainOutput mainOutput, ...) { > > > > > element.get() // to access the current input element > > > > > mainOutput.output(something) // to write to the default output > > > > collection > > > > > other.output(something) // to write to other output collection > > > > > } > > > > > > > > > > I've deliberately left out the undecided syntax for side outputs. > But > > > it > > > > > would be nice for the tag to be built in to the parameter so it > > doesn't > > > > > have to be used when calling output(). > > > > > > > > > > One way to enhance this to deterministic input would just be this: > > > > > > > > > > @ProcessElement > > > > > @RequiresDeterministicInput > > > > > public void process(Element element, MainOutput mainOutput, ...) { > > > > > element.get() // to access the current input element > > > > > mainOutput.output(something) // to write to the default output > > > > collection > > > > > other.output(something) // to write to other output collection > > > > > } > > > > > > > > > > There are really a lot of places where we could put an annotation > or > > > > change > > > > > a type to indicate that the input PCollection should be > > > > > well-defined/deterministically-replayable. I don't have a really > > > strong > > > > > opinion. > > > > > > > > > > Kenn > > > > > > > > > > On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers > > > > <bchamb...@google.com.invalid > > > > > > > > > > > wrote: > > > > > > > > > > > Allowing an annotation on DoFn's that produce deterministic > output > > > > could > > > > > be > > > > > > added in the future, but doesn't seem like a great option. > > > > > > > > > > > > 1. It is a correctness issue to assume a DoFn is deterministic > and > > be > > > > > > wrong, so we would need to assume all transform outputs are > > > > > > non-deterministic unless annotated. Getting this correct is > > difficult > > > > > (for > > > > > > example, GBK is surprisingly non-deterministic except in specific > > > > cases). > > > > > > > > > > > > 2. It is unlikely to be a major performance improvement, given > that > > > any > > > > > > non-deterministic transform prior to a sink (which are most > likely > > to > > > > > > require deterministic input) will cause additional work to be > > needed. > > > > > > > > > > > > Based on this, it seems like the risk of allowing an annotation > is > > > high > > > > > > while the potential for performance improvements is low. The > > current > > > > > > proposal (not allowing an annotation) makes sense for now, until > we > > > can > > > > > > demonstrate that the impact on performance is high in cases that > > > could > > > > be > > > > > > avoided with an annotation (in real-world use). > > > > > > > > > > > > -- Ben > > > > > > > > > > > > On Tue, Mar 21, 2017 at 2:05 PM vikas rk <vikky...@gmail.com> > > wrote: > > > > > > > > > > > > +1 for the general idea of runners handling it over hard-coded > > > > > > implementation strategy. > > > > > > > > > > > > For the Write transform I believe you are talking about > > > > ApplyShardingKey > > > > > > < > > > > > > https://github.com/apache/beam/blob/ > d66029cafde152c0a46ebd276ddfa4 > > > > > > c3e7fd3433/sdks/java/core/src/main/java/org/apache/beam/sdk/ > > > > > > io/Write.java#L304 > > > > > > > > > > > > > which > > > > > > introduces non deterministic behavior when retried? > > > > > > > > > > > > > > > > > > *Let a DoFn declare (mechanism not important right now) that it > > > > > > "requiresdeterministic input"* > > > > > > > > > > > > > > > > > > > > > > > > *Each runner will need a way to induce deterministic input - the > > > > > > obviouschoice being a materialization.* > > > > > > > > > > > > Does this mean that a runner will always materialize (or whatever > > the > > > > > > strategy is) an input PCollection to this DoFn even though the > > > > > PCollection > > > > > > might have been produced by deterministic transforms? Would it > make > > > > sense > > > > > > to also let DoFns declare if they produce non-deterministic > output? > > > > > > > > > > > > -Vikas > > > > > > > > > > > > > > > > > > On 21 March 2017 at 13:52, Stephen Sisk <s...@google.com.invalid > > > > > > wrote: > > > > > > > > > > > > > Hey Kenn- > > > > > > > > > > > > > > this seems important, but I don't have all the context on what > > the > > > > > > problem > > > > > > > is. > > > > > > > > > > > > > > Can you explain this sentence "Specifically, there is > > pseudorandom > > > > data > > > > > > > generated and once it has been observed and used to produce a > > side > > > > > > effect, > > > > > > > it cannot be regenerated without erroneous results." ? > > > > > > > > > > > > > > Where is the pseudorandom data coming from? Perhaps a concrete > > > > example > > > > > > > would help? > > > > > > > > > > > > > > S > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles > > > > <k...@google.com.invalid > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Problem: > > > > > > > > > > > > > > > > I will drop all nuance and say that the `Write` transform as > it > > > > > exists > > > > > > in > > > > > > > > the SDK is incorrect until we add some specification and > APIs. > > We > > > > > can't > > > > > > > > keep shipping an SDK with an unsafe transform in it, and IMO > > this > > > > > > > certainly > > > > > > > > blocks a stable release. > > > > > > > > > > > > > > > > Specifically, there is pseudorandom data generated and once > it > > > has > > > > > been > > > > > > > > observed and used to produce a side effect, it cannot be > > > > regenerated > > > > > > > > without erroneous results. > > > > > > > > > > > > > > > > This generalizes: For some side-effecting user-defined > > functions, > > > > it > > > > > is > > > > > > > > vital that even across retries/replays they have a consistent > > > view > > > > of > > > > > > the > > > > > > > > contents of their input PCollection, because their effect on > > the > > > > > > outside > > > > > > > > world cannot be retracted if/when they fail and are retried. > > Once > > > > the > > > > > > > > runner ensures a consistent view of the input, it is then > their > > > own > > > > > > > > responsibility to be idempotent. > > > > > > > > > > > > > > > > Ideally we should specify this requirement for the > user-defined > > > > > > function > > > > > > > > without imposing any particular implementation strategy on > Beam > > > > > > runners. > > > > > > > > > > > > > > > > Proposal: > > > > > > > > > > > > > > > > 1. Let a DoFn declare (mechanism not important right now) > that > > it > > > > > > > "requires > > > > > > > > deterministic input". > > > > > > > > > > > > > > > > 2. Each runner will need a way to induce deterministic input > - > > > the > > > > > > > obvious > > > > > > > > choice being a materialization. > > > > > > > > > > > > > > > > I want to keep the discussion focused, so I'm leaving out any > > > > > > > possibilities > > > > > > > > of taking this further. > > > > > > > > > > > > > > > > Regarding performance: Today places that require this tend to > > be > > > > > > already > > > > > > > > paying the cost via GroupByKey / Reshuffle operations, since > > that > > > > > was a > > > > > > > > simple way to induce determinism in batch Dataflow* (doesn't > > work > > > > for > > > > > > > most > > > > > > > > other runners nor for streaming Dataflow). This change will > > > > replace a > > > > > > > > hard-coded implementation strategy with a requirement that > may > > be > > > > > > > fulfilled > > > > > > > > in the most efficient way available. > > > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > > > Kenn (w/ lots of consult from colleagues, especially Ben) > > > > > > > > > > > > > > > > * There is some overlap with the reshuffle/redistribute > > > discussion > > > > > > > because > > > > > > > > of this historical situation, but I would like to leave that > > > > broader > > > > > > > > discussion out of this correctness issue. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >