Ah, interesting. That makes windowFn non-idempotent by definition,
because its first application (e.g. global window -> interval window)
_might_ yield different result than second application with interval
window already assigned. On the other hand, let's suppose for a moment
we can make windowFn idempotent, would that solve the issue of window
assignment for elements output from finishBundle? I understand that
window assignment is not only motivation for adding optional window
parameter to @FinishBundle, but users might be confused why
OutputReceiver is working only when there is Window parameter. It would
be nice to have this somewhat more "consistent". And last note - adding
the parameter to @FinishBundle seems a little imbalanced - could this be
made possible for @StartBundle as well? Should we enforce that both
@StartBundle and @FinishBundle have the same signature, or should we
accept all combinations?
Jan
On 5/4/20 11:02 PM, Reuven Lax wrote:
I assume you are referring to elements output from finishBundle.
The problem is that the current window is an input to
WindowFn.assignWindows. The new window can depend on the timestamp,
the element itself, and the original window. I'm not sure how many
users rely on this, however it has been part of our public windowing
API for a long time, so I would guess that some users do use this in
their WindowFns.
Reuven
On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
There was a mention in some other thread, that in order to make
user experience as predictable as possible, we should try to make
windows idempotent, and once window is assigned, it should be
never changed (and timestamp move outside of the scope of window,
unless a different windowfn is applied). Because all Beam window
functions are actually time based, and output timestamp is known,
what is the issue of applying windowfn to elements output from
@FinishBundle and assign the windows automatically?
On 5/4/20 8:07 PM, Reuven Lax wrote:
This should not affect the ability of the user to specify the
output timestamp. Today FinishBundleContext.output forces you to
specify the window as well as the timestamp, which is a bit
awkward. (I believe that it also lets you create brand new
windows in finishBundle, which is interesting, but I'm not quite
sure of the use case).
On Mon, May 4, 2020 at 10:29 AM Robert Bradshaw
<[email protected] <mailto:[email protected]>> wrote:
This is a really nice idea. Would the user still need to
specify the
timestamp of the output? I'm a bit ambivalent about calling it
multiple times if OuptutReceiver alone is in the parameter
list; this
might not be obvious and could be surprising behavior.
On Mon, May 4, 2020 at 10:13 AM Reuven Lax <[email protected]
<mailto:[email protected]>> wrote:
>
> I would like to discuss a minor extension to the Beam model.
>
> Beam bundles have very few restrictions on what can be in a
bundle, in particular s bundle might contain records for many
different windows. This was an explicit decision as bundling
primarily exists for performance reasons and we found that
limiting bundling based on windows or timestamps often led to
severe performance problems. However it sometimes makes
finishBundle hard to use.
>
> I've seen multiple cases where users maintain some state in
their DoFn that needs finalizing (e.g. writing to an external
service) in finishBundle. Often users end up keeping lists of
all windows seen in the bundle so they can be processed
separately (or sometimes not realizing that their can be
multiple windows and writing incorrect code).
>
> The lack of a window also means that we don't currently
support injecting an OuptutReceiver into finishBundle, as
there's no good way of knowing which window output should be
put into.
>
> I would like to propose adding a way for finishBundle to
inspect the window, either directly (via a BoundedWindow
parameter) or indirectly (via an OutputReceiver parameter).
In this case, we will execute finishBundle once per window in
the bundle. Otherwise, we will execute finishBundle once at
the end of the bundle as before. This behavior is backwards
compatible, as previously these parameters were disallowed in
finishBundle.
>
> Note that this is similar to something Beam already does in
processElement. A single element can exist in multiple
windows, however if the processElement "observes" the window
then Beam will call processElement once per window.
>
> In Java, the user code could look like this:
>
> DoFn<> {
> ...
> @FinishBundle
> public void finishBundle(IntervalWindow window,
OutputReceiver<T> o) {
> // This finishBundle will be called once per window
in the bundle since it has
> // a parameter that observes the window.
> }
> }
>
> This PR shows an implementation of this extension for the
Java SDK.
>
> Thoughts?
>
> Reuven