I wonder how often we even implement this optimization today. If the processElement has an OutputReceiver parameter then we mark it as observesWindow, and that's a pretty common parameter.
Arguably this is a bug in our implementation of OutputReceiver though - it should be able to copy all the windows into the output element. Reuven On Mon, May 4, 2020 at 9:37 AM Kenneth Knowles <[email protected]> wrote: > Is the optimization complex in the Fn API context? In non-Fn API it is > basically "if (observesWindow) { explode } else { don't }" [1]. The DoFn > signature tells you everything you need. This might be a good first commit > for someone looking to contribute to the Java SDK harness? > > Kenn > > [1] > https://github.com/apache/beam/blob/591de3473144de54beef0932131025e2a4d8504b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L133 > > On Mon, May 4, 2020 at 9:33 AM Robert Bradshaw <[email protected]> > wrote: > >> In Python we only explode windows if the Window is being inspected. >> (There is no separate "DoFnRunner" for FnApi vs. Legacy execution.) >> >> On Mon, May 4, 2020 at 9:21 AM Luke Cwik <[email protected]> wrote: >> > >> > Reuven you are correct that the optimization has yet to be implemented. >> > Robert the FnApiDoFnRunner is the name of a Java class that executes >> Java DoFns in the Java SDK harness. The poor name choice is my fault. >> > >> > On Fri, May 1, 2020 at 9:14 PM Reuven Lax <[email protected]> wrote: >> >> >> >> FnApiDoFnRunner does run Java DoFns. >> >> >> >> On Fri, May 1, 2020 at 9:10 PM Robert Burke <[email protected]> >> wrote: >> >>> >> >>> In the Go SDK this optimization is handled on the SDK side, inthe >> pardo execution node not one the runner side of the FnAPI >> >>> >> >>> But i think I'm about to learn that FnApiDoFnRunner is something that >> runs on the Java SDK side rather than on the runner side, despite the name. >> >>> >> >>> On Fri, May 1, 2020, 9:02 PM Reuven Lax <[email protected]> wrote: >> >>>> >> >>>> Ah - so we don't implement the optimization of not expanding the >> windows if not necessary? >> >>>> >> >>>> On Fri, May 1, 2020 at 8:56 PM Luke Cwik <[email protected]> wrote: >> >>>>> >> >>>>> In all the processElementYYY methods the currentWindow is assigned >> as can be seen here as we loop over the set of windows: >> >>>>> >> https://github.com/apache/beam/blob/9bb2990c0f6c08dd33d9c6fa1fd91842c644a8e3/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L738 >> >>>>> >> >>>>> On Fri, May 1, 2020 at 8:51 PM Reuven Lax <[email protected]> wrote: >> >>>>>> >> >>>>>> In Beam a WindowedValue can can contain multiple windows, because >> an element can be in multiple windows at once (for example, sliding >> windows). Usually we keep these elements unexpanded, but if the user's doFn >> observes the window then we have to "explode" the element out, and we run >> the process function once per window. e.g. if the process function looks >> like this >> >>>>>> >> >>>>>> @ProcessElement >> >>>>>> public void process(@Element T e, IntervalWindow w) >> >>>>>> >> >>>>>> In SimpleDoFnRunner we do this inside processElement. However I >> can't find the equivalent code in FnApiDoFnRunner. How does window >> explosion work in the portable runner? >> >>>>>> >> >>>>>> Reuven >> >
