Just to close this thread. This one is addressed now via https://issues.apache.org/jira/browse/BEAM-10303 Thanks Luke for taking care.
On Mon, May 4, 2020 at 9:31 PM Luke Cwik <lc...@google.com> wrote: > > Kenn, the optimization is not complex, just never done. > > The FnApiDoFnRunner was rewritten to be designed with portability first and > to move away from the assumptions that were baked into the existing DoFn > "runner" implementations and the constructs used in the non-portable > implementation. There are many DoFn "runner" implementations that exist in > Java that are layered on top of each other to handle several special cases > which are also used by "system" DoFns as well. > > On Mon, May 4, 2020 at 10:38 AM Robert Burke <rob...@frantil.com> wrote: >> >> Ack ok. Thank you for clarifying! >> Confirming that Kenn is right, the optimization is pretty much that simple. >> [1] is where it's done in the Go SDK >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/pardo.go#L136 >> >> On Mon, May 4, 2020, 10:18 AM Reuven Lax <re...@google.com> wrote: >>> >>> I wonder how often we even implement this optimization today. If the >>> processElement has an OutputReceiver parameter then we mark it as >>> observesWindow, and that's a pretty common parameter. >>> >>> Arguably this is a bug in our implementation of OutputReceiver though - it >>> should be able to copy all the windows into the output element. >>> >>> Reuven >>> >>> On Mon, May 4, 2020 at 9:37 AM Kenneth Knowles <k...@apache.org> wrote: >>>> >>>> Is the optimization complex in the Fn API context? In non-Fn API it is >>>> basically "if (observesWindow) { explode } else { don't }" [1]. The DoFn >>>> signature tells you everything you need. This might be a good first commit >>>> for someone looking to contribute to the Java SDK harness? >>>> >>>> Kenn >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/591de3473144de54beef0932131025e2a4d8504b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L133 >>>> >>>> On Mon, May 4, 2020 at 9:33 AM Robert Bradshaw <rober...@google.com> wrote: >>>>> >>>>> In Python we only explode windows if the Window is being inspected. >>>>> (There is no separate "DoFnRunner" for FnApi vs. Legacy execution.) >>>>> >>>>> On Mon, May 4, 2020 at 9:21 AM Luke Cwik <lc...@google.com> wrote: >>>>> > >>>>> > Reuven you are correct that the optimization has yet to be implemented. >>>>> > Robert the FnApiDoFnRunner is the name of a Java class that executes >>>>> > Java DoFns in the Java SDK harness. The poor name choice is my fault. >>>>> > >>>>> > On Fri, May 1, 2020 at 9:14 PM Reuven Lax <re...@google.com> wrote: >>>>> >> >>>>> >> FnApiDoFnRunner does run Java DoFns. >>>>> >> >>>>> >> On Fri, May 1, 2020 at 9:10 PM Robert Burke <rob...@frantil.com> wrote: >>>>> >>> >>>>> >>> In the Go SDK this optimization is handled on the SDK side, inthe >>>>> >>> pardo execution node not one the runner side of the FnAPI >>>>> >>> >>>>> >>> But i think I'm about to learn that FnApiDoFnRunner is something that >>>>> >>> runs on the Java SDK side rather than on the runner side, despite the >>>>> >>> name. >>>>> >>> >>>>> >>> On Fri, May 1, 2020, 9:02 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>> >>>>> >>>> Ah - so we don't implement the optimization of not expanding the >>>>> >>>> windows if not necessary? >>>>> >>>> >>>>> >>>> On Fri, May 1, 2020 at 8:56 PM Luke Cwik <lc...@google.com> wrote: >>>>> >>>>> >>>>> >>>>> In all the processElementYYY methods the currentWindow is assigned >>>>> >>>>> as can be seen here as we loop over the set of windows: >>>>> >>>>> https://github.com/apache/beam/blob/9bb2990c0f6c08dd33d9c6fa1fd91842c644a8e3/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L738 >>>>> >>>>> >>>>> >>>>> On Fri, May 1, 2020 at 8:51 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> >>>>> >>>>>> In Beam a WindowedValue can can contain multiple windows, because >>>>> >>>>>> an element can be in multiple windows at once (for example, >>>>> >>>>>> sliding windows). Usually we keep these elements unexpanded, but >>>>> >>>>>> if the user's doFn observes the window then we have to "explode" >>>>> >>>>>> the element out, and we run the process function once per window. >>>>> >>>>>> e.g. if the process function looks like this >>>>> >>>>>> >>>>> >>>>>> @ProcessElement >>>>> >>>>>> public void process(@Element T e, IntervalWindow w) >>>>> >>>>>> >>>>> >>>>>> In SimpleDoFnRunner we do this inside processElement. However I >>>>> >>>>>> can't find the equivalent code in FnApiDoFnRunner. How does window >>>>> >>>>>> explosion work in the portable runner? >>>>> >>>>>> >>>>> >>>>>> Reuven