On Tue, May 5, 2020 at 2:58 PM Robert Bradshaw <[email protected]> wrote:
> On Mon, May 4, 2020 at 11:08 AM Reuven Lax <[email protected]> wrote: > > > > This should not affect the ability of the user to specify the output > timestamp. > > My question was whether we would require it. > My current PR does not - it defaults to end-of-window as the timestamp. However we could also decide to require it. > > > On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> wrote: > > > > There was a mention in some other thread, that in order to make user > experience as predictable as possible, we should try to make windows > idempotent, and once window is assigned, it should be never changed (and > timestamp move outside of the scope of window, unless a different windowfn > is applied). Because all Beam window functions are actually time based, and > output timestamp is known, what is the issue of applying windowfn to > elements output from @FinishBundle and assign the windows automatically? > > We used to do exactly this. (I don't recall why it was removed.) If > the input element and/or window was queried by the WindowFn > (admittedly rare), it would fail at runtime. > When did we used to do this? We've had users writing WindowFns that queried the input element since long before Beam existed. e.g a window fn that inspected a userId field, and created different sized windows based on the userId. > On Tue, May 5, 2020 at 2:51 PM Reuven Lax <[email protected]> wrote: > > > > It's a good question about startBundle - it's something I thought about. > The problem is that a runner doesn't always know at startBundle what > windows are in the bundle, and even if it does know it might require the > runner to run two passes over the bundle to figure this out. Alternatively > the runner could keep calling startBundle the first time it's seen a new > window in the bundle, but I think that makes things even weirder. It's also > worth noting that startBundle is already more limited today - we do not > support calling output from startBundle, but we do allow calling output > from finishBundle. > > > > Reuven > > > > On Mon, May 4, 2020 at 11:59 PM Jan Lukavský <[email protected]> wrote: > >> > >> Ah, interesting. That makes windowFn non-idempotent by definition, > because its first application (e.g. global window -> interval window) > _might_ yield different result than second application with interval window > already assigned. On the other hand, let's suppose for a moment we can make > windowFn idempotent, would that solve the issue of window assignment for > elements output from finishBundle? I understand that window assignment is > not only motivation for adding optional window parameter to @FinishBundle, > but users might be confused why OutputReceiver is working only when there > is Window parameter. It would be nice to have this somewhat more > "consistent". And last note - adding the parameter to @FinishBundle seems a > little imbalanced - could this be made possible for @StartBundle as well? > Should we enforce that both @StartBundle and @FinishBundle have the same > signature, or should we accept all combinations? > >> > >> Jan > >> > >> On 5/4/20 11:02 PM, Reuven Lax wrote: > >> > >> I assume you are referring to elements output from finishBundle. > >> > >> The problem is that the current window is an input to > WindowFn.assignWindows. The new window can depend on the timestamp, the > element itself, and the original window. I'm not sure how many users rely > on this, however it has been part of our public windowing API for a long > time, so I would guess that some users do use this in their WindowFns. > >> > >> Reuven > >> > >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> wrote: > >>> > >>> There was a mention in some other thread, that in order to make user > experience as predictable as possible, we should try to make windows > idempotent, and once window is assigned, it should be never changed (and > timestamp move outside of the scope of window, unless a different windowfn > is applied). Because all Beam window functions are actually time based, and > output timestamp is known, what is the issue of applying windowfn to > elements output from @FinishBundle and assign the windows automatically? > >>> > >>> On 5/4/20 8:07 PM, Reuven Lax wrote: > >>> > >>> This should not affect the ability of the user to specify the output > timestamp. Today FinishBundleContext.output forces you to specify the > window as well as the timestamp, which is a bit awkward. (I believe that it > also lets you create brand new windows in finishBundle, which is > interesting, but I'm not quite sure of the use case). > >>> > >>> On Mon, May 4, 2020 at 10:29 AM Robert Bradshaw <[email protected]> > wrote: > >>>> > >>>> This is a really nice idea. Would the user still need to specify the > >>>> timestamp of the output? I'm a bit ambivalent about calling it > >>>> multiple times if OuptutReceiver alone is in the parameter list; this > >>>> might not be obvious and could be surprising behavior. > >>>> > >>>> On Mon, May 4, 2020 at 10:13 AM Reuven Lax <[email protected]> wrote: > >>>> > > >>>> > I would like to discuss a minor extension to the Beam model. > >>>> > > >>>> > Beam bundles have very few restrictions on what can be in a bundle, > in particular s bundle might contain records for many different windows. > This was an explicit decision as bundling primarily exists for performance > reasons and we found that limiting bundling based on windows or timestamps > often led to severe performance problems. However it sometimes makes > finishBundle hard to use. > >>>> > > >>>> > I've seen multiple cases where users maintain some state in their > DoFn that needs finalizing (e.g. writing to an external service) in > finishBundle. Often users end up keeping lists of all windows seen in the > bundle so they can be processed separately (or sometimes not realizing that > their can be multiple windows and writing incorrect code). > >>>> > > >>>> > The lack of a window also means that we don't currently support > injecting an OuptutReceiver into finishBundle, as there's no good way of > knowing which window output should be put into. > >>>> > > >>>> > I would like to propose adding a way for finishBundle to inspect > the window, either directly (via a BoundedWindow parameter) or indirectly > (via an OutputReceiver parameter). In this case, we will execute > finishBundle once per window in the bundle. Otherwise, we will execute > finishBundle once at the end of the bundle as before. This behavior is > backwards compatible, as previously these parameters were disallowed in > finishBundle. > >>>> > > >>>> > Note that this is similar to something Beam already does in > processElement. A single element can exist in multiple windows, however if > the processElement "observes" the window then Beam will call processElement > once per window. > >>>> > > >>>> > In Java, the user code could look like this: > >>>> > > >>>> > DoFn<> { > >>>> > ... > >>>> > @FinishBundle > >>>> > public void finishBundle(IntervalWindow window, > OutputReceiver<T> o) { > >>>> > // This finishBundle will be called once per window in the > bundle since it has > >>>> > // a parameter that observes the window. > >>>> > } > >>>> > } > >>>> > > >>>> > This PR shows an implementation of this extension for the Java SDK. > >>>> > > >>>> > Thoughts? > >>>> > > >>>> > Reuven >
