There was a mention in some other thread, that in order to make user experience as predictable as possible, we should try to make windows idempotent, and once window is assigned, it should be never changed (and timestamp move outside of the scope of window, unless a different windowfn is applied). Because all Beam window functions are actually time based, and output timestamp is known, what is the issue of applying windowfn to elements output from @FinishBundle and assign the windows automatically?

On 5/4/20 8:07 PM, Reuven Lax wrote:
This should not affect the ability of the user to specify the output timestamp. Today FinishBundleContext.output forces you to specify the window as well as the timestamp, which is a bit awkward. (I believe that it also lets you create brand new windows in finishBundle, which is interesting, but I'm not quite sure of the use case).

On Mon, May 4, 2020 at 10:29 AM Robert Bradshaw <[email protected] <mailto:[email protected]>> wrote:

    This is a really nice idea. Would the user still need to specify the
    timestamp of the output? I'm a bit ambivalent about calling it
    multiple times if OuptutReceiver alone is in the parameter list; this
    might not be obvious and could be surprising behavior.

    On Mon, May 4, 2020 at 10:13 AM Reuven Lax <[email protected]
    <mailto:[email protected]>> wrote:
    >
    > I would like to discuss a minor extension to the Beam model.
    >
    > Beam bundles have very few restrictions on what can be in a
    bundle, in particular s bundle might contain records for many
    different windows. This was an explicit decision as bundling
    primarily exists for performance reasons and we found that
    limiting bundling based on windows or timestamps often led to
    severe performance problems. However it sometimes makes
    finishBundle hard to use.
    >
    > I've seen multiple cases where users maintain some state in
    their DoFn that needs finalizing (e.g. writing to an external
    service) in finishBundle. Often users end up keeping lists of all
    windows seen in the bundle so they can be processed separately (or
    sometimes not realizing that their can be multiple windows and
    writing incorrect code).
    >
    > The lack of a window also means that we don't currently support
    injecting an OuptutReceiver into finishBundle, as there's no good
    way of knowing which window output should be put into.
    >
    > I would like to propose adding a way for finishBundle to inspect
    the window, either directly (via a BoundedWindow parameter) or
    indirectly (via an OutputReceiver parameter). In this case, we
    will execute finishBundle once per window in the bundle.
    Otherwise, we will execute finishBundle once at the end of the
    bundle as before. This behavior is backwards compatible, as
    previously these parameters were disallowed in finishBundle.
    >
    > Note that this is similar to something Beam already does in
    processElement. A single element can exist in multiple windows,
    however if the processElement "observes" the window then Beam will
    call processElement once per window.
    >
    > In Java, the user code could look like this:
    >
    > DoFn<> {
    >      ...
    >    @FinishBundle
    >    public void finishBundle(IntervalWindow window,
    OutputReceiver<T> o) {
    >        // This finishBundle will be called once per window in
    the bundle since it has
    >       // a parameter that observes the window.
    >    }
    > }
    >
    > This PR shows an implementation of this extension for the Java SDK.
    >
    > Thoughts?
    >
    > Reuven

Reply via email to