On Thu, Jan 26, 2017 at 6:58 PM, Kenneth Knowles <k...@google.com.invalid> 
wrote:
> On Thu, Jan 26, 2017 at 4:15 PM, Robert Bradshaw <
> rober...@google.com.invalid> wrote:
>
>> On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov
>> <kirpic...@google.com.invalid> wrote:
>> >
>> > you can't wrap DoFn's, period
>>
>> As a simple example, given a DoFn<T, O> it's perfectly natural to want
>> to "wrap" this as a DoFn<KV<K, T>, KV<K, O>>. State, side inputs,
>> windows, etc. would just be passed through.
>
>
>> The fact that this is complicated, with reflection and flexible
>> signatures and byte generation, is a property of the SDK (to provide a
>> flexible DoFn API). I agree that it's nice to hide this complexity
>> from the user, and it discourages this kind of composability.
>>
>
> <tangent>
> The difficulty of this sort of composability is a particularly bothersome
> issue for DoFn. It is solvable but the solutions may seem esoteric.
>
>  - Supporting wrapped _invocation_ is actually as easy as before if we
> chose to embrace it: ArgumentProvider is roughly the same thing as ye olde
> ProcessContext. We can easily provide it via our parameter mechanism, and
> DoFnInvokers can be used or we can also provide a DoFnInvoker for some
> requested DoFn.
>
>  - Supporting wrapped analysis is a bit uglier but I don't think there are
> technical blockers. It was actually quite bad prior to the new DoFn - you
> couldn't know statically whether the wrapper class had to "implements
> RequiresWindowAccess" so you would just have to do it "just in case". With
> the new DoFn I could imagine a `@Wraps public DoFn getWrapped()` though
> we'd still have to be able to merge this with whatever the wrapper requires
> - for example if they both use state there will be nothing calling out the
> fact that the wrapper needs to create a disjoint namespace. We could even
> make this required if there is an ArgumentProvider parameter and/or
> automatically provide a DoFnInvoker for this DoFn (so DoFnInvoker would be
> in the core SDK and DoFnInvokers would be only in the SDK Fn harness).
>
> So I think the analysis problems are fundamental, not part of the
> particulars of the new DoFn API or any particular SDK.
> </tangent>

<tangent>
My point is that the DoFn as a concept in the Beam model is
fundamentally, though not perfectly, compossible. Both invocation and
analysis are functions of the SDK, and solvable, though perhaps not
easily (and/or requiring what would normally be considered
implementation details).
</tangent>

> Coming back to the ticket... I'm going to echo Ben's early point, and now
> Eugene's and Robert's that we should enumerate further use cases explicitly
> and preferably add them to the JIRA.
>
> Both SO questions are answered with essentially an inlined PTransform<T,
> Iterable<T>> with a maximum iterable size, for batched RPCs downstream. You
> can easily build timestamped and reified-windows versions without that
> transform being aware of it. It is simple to understand, as easy to
> implement a unified-model version via state & timers as those SO answers,
> and doesn't require DoFn nesting.

I think Eugene surfaced the key point, which is it depends on what one
does with the output.

Both of these emit "result" in the timestamp and window of whatever
the last element of the batch was, regardless of the other elements.
Of course they'll both break at runtime (with high probability) for
inputs windowed by anything but timestamp-invariant WindowFns like
GlobalWindows as the emit in finalize won't have an ambient window or
timestamp to assign to its output.

This is fine for writes or other terminal nodes without output (though
even writes may have output).

> I would love to learn more about use
> cases that either debunk this or refine it.

Anytime one wants to consume the output in a non-globally-windowed,
non-uniform-timestamp way is broken with the SO answers above. In
particular, streaming. (Adapting these to use state would restrict
batches to be per-window, per-key, and still not respect timestamps.)

The usecases I gave are one follows the batch computation with an
"unbatch." Put another way, the batching allows amortization of
processing cost for logically independent elements. One would have to
do this by reifying windows and timestamps (holding back the timestamp
to the earliest member of the batch), storing elements in an
accumulator (per-bundle locally or cross-bundle using
(per-key-window(?)) state) processing the batch, and then restoring
windows and timestamp. However, the user of the API shouldn't have to
manually deal with the window-reified elements, i.e. they should
provide a List<T> -> List<O> DoFn rather than a List<WindowedValue<T>>
-> List<WindowedValue<O>> one which is why I think nesting, or at
least wrapping, is required.

Any outputs other than 1:0 or 1:1 requires more manual thought as to
what the window and timestamp of the output should be (or restriction
of batches to single windows/timestamps, or rounding up timestamps to
the end of the common window, or ???). This likely includes writes
(though the resulting PCollection would be nodetermanisticly
influenced by the batching, unlike the 1:0 or 1:1 cases above.
Sometimes this is OK, e.g. one wants the set of files written, or
follows up with a global (but windowed?) aggregation.)

> Also one transform does not
> need to serve all uses; we just need it to serve its intended use properly
> and try not to tempt misuse.
>
> Focusing briefly on the windowing issues, the outside world is globally
> windowed. So in those communications the data is in the global window
> whether you want it to be or not. IMO with rare expceptions rewindowing to
> the global window silently (such as by making an RPC ignoring the window)
> is data loss (or maybe just well-documented data discarding :-). So
> window-aware write transforms (with special case of the global window being
> "already ready") are a good idea from the start. It probably makes sense to
> have consistently windowed output from a window-aware write, so the above
> transform should operate per-window or else reify windows, batch in the
> global window, then restore windows (requires BEAM-1287 or a WindowFn).
>
> Kenn

Reply via email to