This is a really nice idea. Would the user still need to specify the
timestamp of the output? I'm a bit ambivalent about calling it
multiple times if OuptutReceiver alone is in the parameter list; this
might not be obvious and could be surprising behavior.

On Mon, May 4, 2020 at 10:13 AM Reuven Lax <[email protected]> wrote:
>
> I would like to discuss a minor extension to the Beam model.
>
> Beam bundles have very few restrictions on what can be in a bundle, in 
> particular s bundle might contain records for many different windows. This 
> was an explicit decision as bundling primarily exists for performance reasons 
> and we found that limiting bundling based on windows or timestamps often led 
> to severe performance problems. However it sometimes makes finishBundle hard 
> to use.
>
> I've seen multiple cases where users maintain some state in their DoFn that 
> needs finalizing (e.g. writing to an external service) in finishBundle. Often 
> users end up keeping lists of all windows seen in the bundle so they can be 
> processed separately (or sometimes not realizing that their can be multiple 
> windows and writing incorrect code).
>
> The lack of a window also means that we don't currently support injecting an 
> OuptutReceiver into finishBundle, as there's no good way of knowing which 
> window output should be put into.
>
> I would like to propose adding a way for finishBundle to inspect the window, 
> either directly (via a BoundedWindow parameter) or indirectly (via an 
> OutputReceiver parameter). In this case, we will execute finishBundle once 
> per window in the bundle. Otherwise, we will execute finishBundle once at the 
> end of the bundle as before. This behavior is backwards compatible, as 
> previously these parameters were disallowed in finishBundle.
>
> Note that this is similar to something Beam already does in processElement. A 
> single element can exist in multiple windows, however if the processElement 
> "observes" the window then Beam will call processElement once per window.
>
> In Java, the user code could look like this:
>
> DoFn<> {
>      ...
>    @FinishBundle
>    public void finishBundle(IntervalWindow window, OutputReceiver<T> o) {
>        // This finishBundle will be called once per window in the bundle 
> since it has
>       // a parameter that observes the window.
>    }
> }
>
> This PR shows an implementation of this extension for the Java SDK.
>
> Thoughts?
>
> Reuven

Reply via email to