On Thu, Jan 26, 2017 at 4:15 PM, Robert Bradshaw < rober...@google.com.invalid> wrote:
> On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov > <kirpic...@google.com.invalid> wrote: > > > > you can't wrap DoFn's, period > > As a simple example, given a DoFn<T, O> it's perfectly natural to want > to "wrap" this as a DoFn<KV<K, T>, KV<K, O>>. State, side inputs, > windows, etc. would just be passed through. > The fact that this is complicated, with reflection and flexible > signatures and byte generation, is a property of the SDK (to provide a > flexible DoFn API). I agree that it's nice to hide this complexity > from the user, and it discourages this kind of composability. > <tangent> The difficulty of this sort of composability is a particularly bothersome issue for DoFn. It is solvable but the solutions may seem esoteric. - Supporting wrapped _invocation_ is actually as easy as before if we chose to embrace it: ArgumentProvider is roughly the same thing as ye olde ProcessContext. We can easily provide it via our parameter mechanism, and DoFnInvokers can be used or we can also provide a DoFnInvoker for some requested DoFn. - Supporting wrapped analysis is a bit uglier but I don't think there are technical blockers. It was actually quite bad prior to the new DoFn - you couldn't know statically whether the wrapper class had to "implements RequiresWindowAccess" so you would just have to do it "just in case". With the new DoFn I could imagine a `@Wraps public DoFn getWrapped()` though we'd still have to be able to merge this with whatever the wrapper requires - for example if they both use state there will be nothing calling out the fact that the wrapper needs to create a disjoint namespace. We could even make this required if there is an ArgumentProvider parameter and/or automatically provide a DoFnInvoker for this DoFn (so DoFnInvoker would be in the core SDK and DoFnInvokers would be only in the SDK Fn harness). So I think the analysis problems are fundamental, not part of the particulars of the new DoFn API or any particular SDK. </tangent> Coming back to the ticket... I'm going to echo Ben's early point, and now Eugene's and Robert's that we should enumerate further use cases explicitly and preferably add them to the JIRA. Both SO questions are answered with essentially an inlined PTransform<T, Iterable<T>> with a maximum iterable size, for batched RPCs downstream. You can easily build timestamped and reified-windows versions without that transform being aware of it. It is simple to understand, as easy to implement a unified-model version via state & timers as those SO answers, and doesn't require DoFn nesting. I would love to learn more about use cases that either debunk this or refine it. Also one transform does not need to serve all uses; we just need it to serve its intended use properly and try not to tempt misuse. Focusing briefly on the windowing issues, the outside world is globally windowed. So in those communications the data is in the global window whether you want it to be or not. IMO with rare expceptions rewindowing to the global window silently (such as by making an RPC ignoring the window) is data loss (or maybe just well-documented data discarding :-). So window-aware write transforms (with special case of the global window being "already ready") are a good idea from the start. It probably makes sense to have consistently windowed output from a window-aware write, so the above transform should operate per-window or else reify windows, batch in the global window, then restore windows (requires BEAM-1287 or a WindowFn). Kenn