This is a really nice idea. Would the user still need to specify the timestamp of the output? I'm a bit ambivalent about calling it multiple times if OuptutReceiver alone is in the parameter list; this might not be obvious and could be surprising behavior.
On Mon, May 4, 2020 at 10:13 AM Reuven Lax <[email protected]> wrote: > > I would like to discuss a minor extension to the Beam model. > > Beam bundles have very few restrictions on what can be in a bundle, in > particular s bundle might contain records for many different windows. This > was an explicit decision as bundling primarily exists for performance reasons > and we found that limiting bundling based on windows or timestamps often led > to severe performance problems. However it sometimes makes finishBundle hard > to use. > > I've seen multiple cases where users maintain some state in their DoFn that > needs finalizing (e.g. writing to an external service) in finishBundle. Often > users end up keeping lists of all windows seen in the bundle so they can be > processed separately (or sometimes not realizing that their can be multiple > windows and writing incorrect code). > > The lack of a window also means that we don't currently support injecting an > OuptutReceiver into finishBundle, as there's no good way of knowing which > window output should be put into. > > I would like to propose adding a way for finishBundle to inspect the window, > either directly (via a BoundedWindow parameter) or indirectly (via an > OutputReceiver parameter). In this case, we will execute finishBundle once > per window in the bundle. Otherwise, we will execute finishBundle once at the > end of the bundle as before. This behavior is backwards compatible, as > previously these parameters were disallowed in finishBundle. > > Note that this is similar to something Beam already does in processElement. A > single element can exist in multiple windows, however if the processElement > "observes" the window then Beam will call processElement once per window. > > In Java, the user code could look like this: > > DoFn<> { > ... > @FinishBundle > public void finishBundle(IntervalWindow window, OutputReceiver<T> o) { > // This finishBundle will be called once per window in the bundle > since it has > // a parameter that observes the window. > } > } > > This PR shows an implementation of this extension for the Java SDK. > > Thoughts? > > Reuven
