On Tue, May 5, 2020 at 2:58 PM Robert Bradshaw <[email protected]> wrote:

> On Mon, May 4, 2020 at 11:08 AM Reuven Lax <[email protected]> wrote:
> >
> > This should not affect the ability of the user to specify the output
> timestamp.
>
> My question was whether we would require it.
>

My current PR does not - it defaults to end-of-window as the timestamp.
However we could also decide to require it.


>
>
> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> wrote:
> >
> > There was a mention in some other thread, that in order to make user
> experience as predictable as possible, we should try to make windows
> idempotent, and once window is assigned, it should be never changed (and
> timestamp move outside of the scope of window, unless a different windowfn
> is applied). Because all Beam window functions are actually time based, and
> output timestamp is known, what is the issue of applying windowfn to
> elements output from @FinishBundle and assign the windows automatically?
>
> We used to do exactly this. (I don't recall why it was removed.) If
> the input element and/or window was queried by the WindowFn
> (admittedly rare), it would fail at runtime.
>

When did we used to do this? We've had users writing WindowFns that queried
the input element since long before Beam existed.  e.g a window fn that
inspected a userId field, and created different sized windows based on the
userId.


> On Tue, May 5, 2020 at 2:51 PM Reuven Lax <[email protected]> wrote:
> >
> > It's a good question about startBundle - it's something I thought about.
> The problem is that a runner doesn't always know at startBundle what
> windows are in the bundle, and even if it does know it might require the
> runner to run two passes over the bundle to figure this out. Alternatively
> the runner could keep calling startBundle the first time it's seen a new
> window in the bundle, but I think that makes things even weirder. It's also
> worth noting that startBundle is already more limited today - we do not
> support calling output from startBundle, but we do allow calling output
> from finishBundle.
> >
> > Reuven
> >
> > On Mon, May 4, 2020 at 11:59 PM Jan Lukavský <[email protected]> wrote:
> >>
> >> Ah, interesting. That makes windowFn non-idempotent by definition,
> because its first application (e.g. global window -> interval window)
> _might_ yield different result than second application with interval window
> already assigned. On the other hand, let's suppose for a moment we can make
> windowFn idempotent, would that solve the issue of window assignment for
> elements output from finishBundle? I understand that window assignment is
> not only motivation for adding optional window parameter to @FinishBundle,
> but users might be confused why OutputReceiver is working only when there
> is Window parameter. It would be nice to have this somewhat more
> "consistent". And last note - adding the parameter to @FinishBundle seems a
> little imbalanced - could this be made possible for @StartBundle as well?
> Should we enforce that both @StartBundle and @FinishBundle have the same
> signature, or should we accept all combinations?
> >>
> >> Jan
> >>
> >> On 5/4/20 11:02 PM, Reuven Lax wrote:
> >>
> >> I assume you are referring to elements output from finishBundle.
> >>
> >> The problem is that the current window is an input to
> WindowFn.assignWindows. The new window can depend on the timestamp, the
> element itself, and the original window. I'm not sure how many users rely
> on this, however it has been part of our public windowing API for a long
> time, so I would guess that some users do use this in their WindowFns.
> >>
> >> Reuven
> >>
> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> wrote:
> >>>
> >>> There was a mention in some other thread, that in order to make user
> experience as predictable as possible, we should try to make windows
> idempotent, and once window is assigned, it should be never changed (and
> timestamp move outside of the scope of window, unless a different windowfn
> is applied). Because all Beam window functions are actually time based, and
> output timestamp is known, what is the issue of applying windowfn to
> elements output from @FinishBundle and assign the windows automatically?
> >>>
> >>> On 5/4/20 8:07 PM, Reuven Lax wrote:
> >>>
> >>> This should not affect the ability of the user to specify the output
> timestamp. Today FinishBundleContext.output forces you to specify the
> window as well as the timestamp, which is a bit awkward. (I believe that it
> also lets you create brand new windows in finishBundle, which is
> interesting, but I'm not quite sure of the use case).
> >>>
> >>> On Mon, May 4, 2020 at 10:29 AM Robert Bradshaw <[email protected]>
> wrote:
> >>>>
> >>>> This is a really nice idea. Would the user still need to specify the
> >>>> timestamp of the output? I'm a bit ambivalent about calling it
> >>>> multiple times if OuptutReceiver alone is in the parameter list; this
> >>>> might not be obvious and could be surprising behavior.
> >>>>
> >>>> On Mon, May 4, 2020 at 10:13 AM Reuven Lax <[email protected]> wrote:
> >>>> >
> >>>> > I would like to discuss a minor extension to the Beam model.
> >>>> >
> >>>> > Beam bundles have very few restrictions on what can be in a bundle,
> in particular s bundle might contain records for many different windows.
> This was an explicit decision as bundling primarily exists for performance
> reasons and we found that limiting bundling based on windows or timestamps
> often led to severe performance problems. However it sometimes makes
> finishBundle hard to use.
> >>>> >
> >>>> > I've seen multiple cases where users maintain some state in their
> DoFn that needs finalizing (e.g. writing to an external service) in
> finishBundle. Often users end up keeping lists of all windows seen in the
> bundle so they can be processed separately (or sometimes not realizing that
> their can be multiple windows and writing incorrect code).
> >>>> >
> >>>> > The lack of a window also means that we don't currently support
> injecting an OuptutReceiver into finishBundle, as there's no good way of
> knowing which window output should be put into.
> >>>> >
> >>>> > I would like to propose adding a way for finishBundle to inspect
> the window, either directly (via a BoundedWindow parameter) or indirectly
> (via an OutputReceiver parameter). In this case, we will execute
> finishBundle once per window in the bundle. Otherwise, we will execute
> finishBundle once at the end of the bundle as before. This behavior is
> backwards compatible, as previously these parameters were disallowed in
> finishBundle.
> >>>> >
> >>>> > Note that this is similar to something Beam already does in
> processElement. A single element can exist in multiple windows, however if
> the processElement "observes" the window then Beam will call processElement
> once per window.
> >>>> >
> >>>> > In Java, the user code could look like this:
> >>>> >
> >>>> > DoFn<> {
> >>>> >      ...
> >>>> >    @FinishBundle
> >>>> >    public void finishBundle(IntervalWindow window,
> OutputReceiver<T> o) {
> >>>> >        // This finishBundle will be called once per window in the
> bundle since it has
> >>>> >       // a parameter that observes the window.
> >>>> >    }
> >>>> > }
> >>>> >
> >>>> > This PR shows an implementation of this extension for the Java SDK.
> >>>> >
> >>>> > Thoughts?
> >>>> >
> >>>> > Reuven
>

Reply via email to