This is a great idea. I thought that (long ago) we decided not to execute
finishBundle per window for these reasons: (1) perf fears (2) naming
bikeshed (3) backwards compatibility (even though we hadn't stabilized, it
is pervasive). That was before the annotation-driven DoFn I believe, so we
didn't have the ability to do it this way. Now this seems like a clear win.

Regarding @StartBundle: I always use and advise others to use lazy init
instead of @StartBundle and to think of @FinishBundle as "flush".
State-sensitive APIs like "start(); process(); finish()" are usually an
anti-pattern since you can almost always write them in a less dangerous way
(try-with-resources, Python context managers, etc). Conveniently, this
eliminates any consideration of symmetry. Can anyone refresh me on
when/whether it is important to have @StartBundle instead of running the
same code via lazy init?


On Tue, May 5, 2020 at 3:16 PM Robert Bradshaw <[email protected]> wrote:

> On Tue, May 5, 2020 at 3:08 PM Reuven Lax <[email protected]> wrote:
> >
> > On Tue, May 5, 2020 at 2:58 PM Robert Bradshaw <[email protected]>
> wrote:
> >>
> >> On Mon, May 4, 2020 at 11:08 AM Reuven Lax <[email protected]> wrote:
> >> >
> >> > This should not affect the ability of the user to specify the output
> timestamp.
> >>
> >> My question was whether we would require it.
> >
> >
> > My current PR does not - it defaults to end-of-window as the timestamp.
> However we could also decide to require it.
>
> I'd be more comfortable requiring it for the time being.
>

+1 for requiring it

Kenn

>> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> wrote:
> >> >
> >> > There was a mention in some other thread, that in order to make user
> experience as predictable as possible, we should try to make windows
> idempotent, and once window is assigned, it should be never changed (and
> timestamp move outside of the scope of window, unless a different windowfn
> is applied). Because all Beam window functions are actually time based, and
> output timestamp is known, what is the issue of applying windowfn to
> elements output from @FinishBundle and assign the windows automatically?
> >>
> >> We used to do exactly this. (I don't recall why it was removed.) If
> >> the input element and/or window was queried by the WindowFn
> >> (admittedly rare), it would fail at runtime.
> >
> > When did we used to do this? We've had users writing WindowFns that
> queried the input element since long before Beam existed.  e.g a window fn
> that inspected a userId field, and created different sized windows based on
> the userId.
>
> This is how it started. In particular WindowFn.AssignContext would be
> created that through an exception on accessing the unavailable fields
> (which would make finalize bundle unsuitable for such WindowFns).
>

Yea this was not great. This also broke the equivalence between WithKeys
and AssignWindows. It was really a workaround for the lack of the feature
Reuven is proposing.



> >> On Tue, May 5, 2020 at 2:51 PM Reuven Lax <[email protected]> wrote:
> >> >
> >> > It's a good question about startBundle - it's something I thought
> about. The problem is that a runner doesn't always know at startBundle what
> windows are in the bundle, and even if it does know it might require the
> runner to run two passes over the bundle to figure this out. Alternatively
> the runner could keep calling startBundle the first time it's seen a new
> window in the bundle, but I think that makes things even weirder. It's also
> worth noting that startBundle is already more limited today - we do not
> support calling output from startBundle, but we do allow calling output
> from finishBundle.
> >> >
> >> > Reuven
> >> >
> >> > On Mon, May 4, 2020 at 11:59 PM Jan Lukavský <[email protected]> wrote:
> >> >>
> >> >> Ah, interesting. That makes windowFn non-idempotent by definition,
> because its first application (e.g. global window -> interval window)
> _might_ yield different result than second application with interval window
> already assigned. On the other hand, let's suppose for a moment we can make
> windowFn idempotent, would that solve the issue of window assignment for
> elements output from finishBundle? I understand that window assignment is
> not only motivation for adding optional window parameter to @FinishBundle,
> but users might be confused why OutputReceiver is working only when there
> is Window parameter. It would be nice to have this somewhat more
> "consistent". And last note - adding the parameter to @FinishBundle seems a
> little imbalanced - could this be made possible for @StartBundle as well?
> Should we enforce that both @StartBundle and @FinishBundle have the same
> signature, or should we accept all combinations?
> >> >>
> >> >> Jan
> >> >>
> >> >> On 5/4/20 11:02 PM, Reuven Lax wrote:
> >> >>
> >> >> I assume you are referring to elements output from finishBundle.
> >> >>
> >> >> The problem is that the current window is an input to
> WindowFn.assignWindows. The new window can depend on the timestamp, the
> element itself, and the original window. I'm not sure how many users rely
> on this, however it has been part of our public windowing API for a long
> time, so I would guess that some users do use this in their WindowFns.
> >> >>
> >> >> Reuven
> >> >>
> >> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]>
> wrote:
> >> >>>
> >> >>> There was a mention in some other thread, that in order to make
> user experience as predictable as possible, we should try to make windows
> idempotent, and once window is assigned, it should be never changed (and
> timestamp move outside of the scope of window, unless a different windowfn
> is applied). Because all Beam window functions are actually time based, and
> output timestamp is known, what is the issue of applying windowfn to
> elements output from @FinishBundle and assign the windows automatically?
> >> >>>
> >> >>> On 5/4/20 8:07 PM, Reuven Lax wrote:
> >> >>>
> >> >>> This should not affect the ability of the user to specify the
> output timestamp. Today FinishBundleContext.output forces you to specify
> the window as well as the timestamp, which is a bit awkward. (I believe
> that it also lets you create brand new windows in finishBundle, which is
> interesting, but I'm not quite sure of the use case).
> >> >>>
> >> >>> On Mon, May 4, 2020 at 10:29 AM Robert Bradshaw <
> [email protected]> wrote:
> >> >>>>
> >> >>>> This is a really nice idea. Would the user still need to specify
> the
> >> >>>> timestamp of the output? I'm a bit ambivalent about calling it
> >> >>>> multiple times if OuptutReceiver alone is in the parameter list;
> this
> >> >>>> might not be obvious and could be surprising behavior.
> >> >>>>
> >> >>>> On Mon, May 4, 2020 at 10:13 AM Reuven Lax <[email protected]>
> wrote:
> >> >>>> >
> >> >>>> > I would like to discuss a minor extension to the Beam model.
> >> >>>> >
> >> >>>> > Beam bundles have very few restrictions on what can be in a
> bundle, in particular s bundle might contain records for many different
> windows. This was an explicit decision as bundling primarily exists for
> performance reasons and we found that limiting bundling based on windows or
> timestamps often led to severe performance problems. However it sometimes
> makes finishBundle hard to use.
> >> >>>> >
> >> >>>> > I've seen multiple cases where users maintain some state in
> their DoFn that needs finalizing (e.g. writing to an external service) in
> finishBundle. Often users end up keeping lists of all windows seen in the
> bundle so they can be processed separately (or sometimes not realizing that
> their can be multiple windows and writing incorrect code).
> >> >>>> >
> >> >>>> > The lack of a window also means that we don't currently support
> injecting an OuptutReceiver into finishBundle, as there's no good way of
> knowing which window output should be put into.
> >> >>>> >
> >> >>>> > I would like to propose adding a way for finishBundle to inspect
> the window, either directly (via a BoundedWindow parameter) or indirectly
> (via an OutputReceiver parameter). In this case, we will execute
> finishBundle once per window in the bundle. Otherwise, we will execute
> finishBundle once at the end of the bundle as before. This behavior is
> backwards compatible, as previously these parameters were disallowed in
> finishBundle.
> >> >>>> >
> >> >>>> > Note that this is similar to something Beam already does in
> processElement. A single element can exist in multiple windows, however if
> the processElement "observes" the window then Beam will call processElement
> once per window.
> >> >>>> >
> >> >>>> > In Java, the user code could look like this:
> >> >>>> >
> >> >>>> > DoFn<> {
> >> >>>> >      ...
> >> >>>> >    @FinishBundle
> >> >>>> >    public void finishBundle(IntervalWindow window,
> OutputReceiver<T> o) {
> >> >>>> >        // This finishBundle will be called once per window in
> the bundle since it has
> >> >>>> >       // a parameter that observes the window.
> >> >>>> >    }
> >> >>>> > }
> >> >>>> >
> >> >>>> > This PR shows an implementation of this extension for the Java
> SDK.
> >> >>>> >
> >> >>>> > Thoughts?
> >> >>>> >
> >> >>>> > Reuven
>

Reply via email to