On Thu, Jan 26, 2017 at 4:15 PM, Robert Bradshaw <
rober...@google.com.invalid> wrote:

> On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov
> <kirpic...@google.com.invalid> wrote:
> >
> > you can't wrap DoFn's, period
>
> As a simple example, given a DoFn<T, O> it's perfectly natural to want
> to "wrap" this as a DoFn<KV<K, T>, KV<K, O>>. State, side inputs,
> windows, etc. would just be passed through.


> The fact that this is complicated, with reflection and flexible
> signatures and byte generation, is a property of the SDK (to provide a
> flexible DoFn API). I agree that it's nice to hide this complexity
> from the user, and it discourages this kind of composability.
>

<tangent>
The difficulty of this sort of composability is a particularly bothersome
issue for DoFn. It is solvable but the solutions may seem esoteric.

 - Supporting wrapped _invocation_ is actually as easy as before if we
chose to embrace it: ArgumentProvider is roughly the same thing as ye olde
ProcessContext. We can easily provide it via our parameter mechanism, and
DoFnInvokers can be used or we can also provide a DoFnInvoker for some
requested DoFn.

 - Supporting wrapped analysis is a bit uglier but I don't think there are
technical blockers. It was actually quite bad prior to the new DoFn - you
couldn't know statically whether the wrapper class had to "implements
RequiresWindowAccess" so you would just have to do it "just in case". With
the new DoFn I could imagine a `@Wraps public DoFn getWrapped()` though
we'd still have to be able to merge this with whatever the wrapper requires
- for example if they both use state there will be nothing calling out the
fact that the wrapper needs to create a disjoint namespace. We could even
make this required if there is an ArgumentProvider parameter and/or
automatically provide a DoFnInvoker for this DoFn (so DoFnInvoker would be
in the core SDK and DoFnInvokers would be only in the SDK Fn harness).

So I think the analysis problems are fundamental, not part of the
particulars of the new DoFn API or any particular SDK.
</tangent>

Coming back to the ticket... I'm going to echo Ben's early point, and now
Eugene's and Robert's that we should enumerate further use cases explicitly
and preferably add them to the JIRA.

Both SO questions are answered with essentially an inlined PTransform<T,
Iterable<T>> with a maximum iterable size, for batched RPCs downstream. You
can easily build timestamped and reified-windows versions without that
transform being aware of it. It is simple to understand, as easy to
implement a unified-model version via state & timers as those SO answers,
and doesn't require DoFn nesting. I would love to learn more about use
cases that either debunk this or refine it. Also one transform does not
need to serve all uses; we just need it to serve its intended use properly
and try not to tempt misuse.

Focusing briefly on the windowing issues, the outside world is globally
windowed. So in those communications the data is in the global window
whether you want it to be or not. IMO with rare expceptions rewindowing to
the global window silently (such as by making an RPC ignoring the window)
is data loss (or maybe just well-documented data discarding :-). So
window-aware write transforms (with special case of the global window being
"already ready") are a good idea from the start. It probably makes sense to
have consistently windowed output from a window-aware write, so the above
transform should operate per-window or else reify windows, batch in the
global window, then restore windows (requires BEAM-1287 or a WindowFn).

Kenn

Reply via email to