On Mon, May 4, 2020 at 11:08 AM Reuven Lax <[email protected]> wrote: > > This should not affect the ability of the user to specify the output > timestamp.
My question was whether we would require it. On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> wrote: > > There was a mention in some other thread, that in order to make user > experience as predictable as possible, we should try to make windows > idempotent, and once window is assigned, it should be never changed (and > timestamp move outside of the scope of window, unless a different windowfn is > applied). Because all Beam window functions are actually time based, and > output timestamp is known, what is the issue of applying windowfn to elements > output from @FinishBundle and assign the windows automatically? We used to do exactly this. (I don't recall why it was removed.) If the input element and/or window was queried by the WindowFn (admittedly rare), it would fail at runtime. On Tue, May 5, 2020 at 2:51 PM Reuven Lax <[email protected]> wrote: > > It's a good question about startBundle - it's something I thought about. The > problem is that a runner doesn't always know at startBundle what windows are > in the bundle, and even if it does know it might require the runner to run > two passes over the bundle to figure this out. Alternatively the runner could > keep calling startBundle the first time it's seen a new window in the bundle, > but I think that makes things even weirder. It's also worth noting that > startBundle is already more limited today - we do not support calling output > from startBundle, but we do allow calling output from finishBundle. > > Reuven > > On Mon, May 4, 2020 at 11:59 PM Jan Lukavský <[email protected]> wrote: >> >> Ah, interesting. That makes windowFn non-idempotent by definition, because >> its first application (e.g. global window -> interval window) _might_ yield >> different result than second application with interval window already >> assigned. On the other hand, let's suppose for a moment we can make windowFn >> idempotent, would that solve the issue of window assignment for elements >> output from finishBundle? I understand that window assignment is not only >> motivation for adding optional window parameter to @FinishBundle, but users >> might be confused why OutputReceiver is working only when there is Window >> parameter. It would be nice to have this somewhat more "consistent". And >> last note - adding the parameter to @FinishBundle seems a little imbalanced >> - could this be made possible for @StartBundle as well? Should we enforce >> that both @StartBundle and @FinishBundle have the same signature, or should >> we accept all combinations? >> >> Jan >> >> On 5/4/20 11:02 PM, Reuven Lax wrote: >> >> I assume you are referring to elements output from finishBundle. >> >> The problem is that the current window is an input to >> WindowFn.assignWindows. The new window can depend on the timestamp, the >> element itself, and the original window. I'm not sure how many users rely on >> this, however it has been part of our public windowing API for a long time, >> so I would guess that some users do use this in their WindowFns. >> >> Reuven >> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> wrote: >>> >>> There was a mention in some other thread, that in order to make user >>> experience as predictable as possible, we should try to make windows >>> idempotent, and once window is assigned, it should be never changed (and >>> timestamp move outside of the scope of window, unless a different windowfn >>> is applied). Because all Beam window functions are actually time based, and >>> output timestamp is known, what is the issue of applying windowfn to >>> elements output from @FinishBundle and assign the windows automatically? >>> >>> On 5/4/20 8:07 PM, Reuven Lax wrote: >>> >>> This should not affect the ability of the user to specify the output >>> timestamp. Today FinishBundleContext.output forces you to specify the >>> window as well as the timestamp, which is a bit awkward. (I believe that it >>> also lets you create brand new windows in finishBundle, which is >>> interesting, but I'm not quite sure of the use case). >>> >>> On Mon, May 4, 2020 at 10:29 AM Robert Bradshaw <[email protected]> wrote: >>>> >>>> This is a really nice idea. Would the user still need to specify the >>>> timestamp of the output? I'm a bit ambivalent about calling it >>>> multiple times if OuptutReceiver alone is in the parameter list; this >>>> might not be obvious and could be surprising behavior. >>>> >>>> On Mon, May 4, 2020 at 10:13 AM Reuven Lax <[email protected]> wrote: >>>> > >>>> > I would like to discuss a minor extension to the Beam model. >>>> > >>>> > Beam bundles have very few restrictions on what can be in a bundle, in >>>> > particular s bundle might contain records for many different windows. >>>> > This was an explicit decision as bundling primarily exists for >>>> > performance reasons and we found that limiting bundling based on windows >>>> > or timestamps often led to severe performance problems. However it >>>> > sometimes makes finishBundle hard to use. >>>> > >>>> > I've seen multiple cases where users maintain some state in their DoFn >>>> > that needs finalizing (e.g. writing to an external service) in >>>> > finishBundle. Often users end up keeping lists of all windows seen in >>>> > the bundle so they can be processed separately (or sometimes not >>>> > realizing that their can be multiple windows and writing incorrect code). >>>> > >>>> > The lack of a window also means that we don't currently support >>>> > injecting an OuptutReceiver into finishBundle, as there's no good way of >>>> > knowing which window output should be put into. >>>> > >>>> > I would like to propose adding a way for finishBundle to inspect the >>>> > window, either directly (via a BoundedWindow parameter) or indirectly >>>> > (via an OutputReceiver parameter). In this case, we will execute >>>> > finishBundle once per window in the bundle. Otherwise, we will execute >>>> > finishBundle once at the end of the bundle as before. This behavior is >>>> > backwards compatible, as previously these parameters were disallowed in >>>> > finishBundle. >>>> > >>>> > Note that this is similar to something Beam already does in >>>> > processElement. A single element can exist in multiple windows, however >>>> > if the processElement "observes" the window then Beam will call >>>> > processElement once per window. >>>> > >>>> > In Java, the user code could look like this: >>>> > >>>> > DoFn<> { >>>> > ... >>>> > @FinishBundle >>>> > public void finishBundle(IntervalWindow window, OutputReceiver<T> o) { >>>> > // This finishBundle will be called once per window in the bundle >>>> > since it has >>>> > // a parameter that observes the window. >>>> > } >>>> > } >>>> > >>>> > This PR shows an implementation of this extension for the Java SDK. >>>> > >>>> > Thoughts? >>>> > >>>> > Reuven
