StartBundle pre-dated setUp, which makes it less useful than before. With DoFn re-use, however, startBundle can be used to ensure the DoFn is instantiated to a clean state.
On Thu, May 7, 2020 at 2:03 PM Reuven Lax <[email protected]> wrote: > I think startBundle is useful for convenience and performance, but not > necessarily needed semantically (as Kenn said, you could write your > pipeline without startBundle). finishBundle has a stronger semantic > meaning when interpreted as a way of finalizing elements. > > On Thu, May 7, 2020 at 2:00 PM Luke Cwik <[email protected]> wrote: > >> Start bundle is useful since the framework provides the necessary >> synchronization while using lazy init requires you to write it yourself and >> also pay for it on each process element call. >> >> On Wed, May 6, 2020 at 8:46 AM Kenneth Knowles <[email protected]> wrote: >> >>> This is a great idea. I thought that (long ago) we decided not to >>> execute finishBundle per window for these reasons: (1) perf fears (2) >>> naming bikeshed (3) backwards compatibility (even though we hadn't >>> stabilized, it is pervasive). That was before the annotation-driven DoFn I >>> believe, so we didn't have the ability to do it this way. Now this seems >>> like a clear win. >>> >>> Regarding @StartBundle: I always use and advise others to use lazy init >>> instead of @StartBundle and to think of @FinishBundle as "flush". >>> State-sensitive APIs like "start(); process(); finish()" are usually an >>> anti-pattern since you can almost always write them in a less dangerous way >>> (try-with-resources, Python context managers, etc). Conveniently, this >>> eliminates any consideration of symmetry. Can anyone refresh me on >>> when/whether it is important to have @StartBundle instead of running the >>> same code via lazy init? >>> >>> >>> On Tue, May 5, 2020 at 3:16 PM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> On Tue, May 5, 2020 at 3:08 PM Reuven Lax <[email protected]> wrote: >>>> > >>>> > On Tue, May 5, 2020 at 2:58 PM Robert Bradshaw <[email protected]> >>>> wrote: >>>> >> >>>> >> On Mon, May 4, 2020 at 11:08 AM Reuven Lax <[email protected]> wrote: >>>> >> > >>>> >> > This should not affect the ability of the user to specify the >>>> output timestamp. >>>> >> >>>> >> My question was whether we would require it. >>>> > >>>> > >>>> > My current PR does not - it defaults to end-of-window as the >>>> timestamp. However we could also decide to require it. >>>> >>>> I'd be more comfortable requiring it for the time being. >>>> >>> >>> +1 for requiring it >>> >>> Kenn >>> >>> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> wrote: >>>> >> > >>>> >> > There was a mention in some other thread, that in order to make >>>> user experience as predictable as possible, we should try to make windows >>>> idempotent, and once window is assigned, it should be never changed (and >>>> timestamp move outside of the scope of window, unless a different windowfn >>>> is applied). Because all Beam window functions are actually time based, and >>>> output timestamp is known, what is the issue of applying windowfn to >>>> elements output from @FinishBundle and assign the windows automatically? >>>> >> >>>> >> We used to do exactly this. (I don't recall why it was removed.) If >>>> >> the input element and/or window was queried by the WindowFn >>>> >> (admittedly rare), it would fail at runtime. >>>> > >>>> > When did we used to do this? We've had users writing WindowFns that >>>> queried the input element since long before Beam existed. e.g a window fn >>>> that inspected a userId field, and created different sized windows based on >>>> the userId. >>>> >>>> This is how it started. In particular WindowFn.AssignContext would be >>>> created that through an exception on accessing the unavailable fields >>>> (which would make finalize bundle unsuitable for such WindowFns). >>>> >>> >>> Yea this was not great. This also broke the equivalence between WithKeys >>> and AssignWindows. It was really a workaround for the lack of the feature >>> Reuven is proposing. >>> >>> >>> >>>> >> On Tue, May 5, 2020 at 2:51 PM Reuven Lax <[email protected]> wrote: >>>> >> > >>>> >> > It's a good question about startBundle - it's something I thought >>>> about. The problem is that a runner doesn't always know at startBundle what >>>> windows are in the bundle, and even if it does know it might require the >>>> runner to run two passes over the bundle to figure this out. Alternatively >>>> the runner could keep calling startBundle the first time it's seen a new >>>> window in the bundle, but I think that makes things even weirder. It's also >>>> worth noting that startBundle is already more limited today - we do not >>>> support calling output from startBundle, but we do allow calling output >>>> from finishBundle. >>>> >> > >>>> >> > Reuven >>>> >> > >>>> >> > On Mon, May 4, 2020 at 11:59 PM Jan Lukavský <[email protected]> >>>> wrote: >>>> >> >> >>>> >> >> Ah, interesting. That makes windowFn non-idempotent by >>>> definition, because its first application (e.g. global window -> interval >>>> window) _might_ yield different result than second application with >>>> interval window already assigned. On the other hand, let's suppose for a >>>> moment we can make windowFn idempotent, would that solve the issue of >>>> window assignment for elements output from finishBundle? I understand that >>>> window assignment is not only motivation for adding optional window >>>> parameter to @FinishBundle, but users might be confused why OutputReceiver >>>> is working only when there is Window parameter. It would be nice to have >>>> this somewhat more "consistent". And last note - adding the parameter to >>>> @FinishBundle seems a little imbalanced - could this be made possible for >>>> @StartBundle as well? Should we enforce that both @StartBundle and >>>> @FinishBundle have the same signature, or should we accept all >>>> combinations? >>>> >> >> >>>> >> >> Jan >>>> >> >> >>>> >> >> On 5/4/20 11:02 PM, Reuven Lax wrote: >>>> >> >> >>>> >> >> I assume you are referring to elements output from finishBundle. >>>> >> >> >>>> >> >> The problem is that the current window is an input to >>>> WindowFn.assignWindows. The new window can depend on the timestamp, the >>>> element itself, and the original window. I'm not sure how many users rely >>>> on this, however it has been part of our public windowing API for a long >>>> time, so I would guess that some users do use this in their WindowFns. >>>> >> >> >>>> >> >> Reuven >>>> >> >> >>>> >> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> >>>> wrote: >>>> >> >>> >>>> >> >>> There was a mention in some other thread, that in order to make >>>> user experience as predictable as possible, we should try to make windows >>>> idempotent, and once window is assigned, it should be never changed (and >>>> timestamp move outside of the scope of window, unless a different windowfn >>>> is applied). Because all Beam window functions are actually time based, and >>>> output timestamp is known, what is the issue of applying windowfn to >>>> elements output from @FinishBundle and assign the windows automatically? >>>> >> >>> >>>> >> >>> On 5/4/20 8:07 PM, Reuven Lax wrote: >>>> >> >>> >>>> >> >>> This should not affect the ability of the user to specify the >>>> output timestamp. Today FinishBundleContext.output forces you to specify >>>> the window as well as the timestamp, which is a bit awkward. (I believe >>>> that it also lets you create brand new windows in finishBundle, which is >>>> interesting, but I'm not quite sure of the use case). >>>> >> >>> >>>> >> >>> On Mon, May 4, 2020 at 10:29 AM Robert Bradshaw < >>>> [email protected]> wrote: >>>> >> >>>> >>>> >> >>>> This is a really nice idea. Would the user still need to >>>> specify the >>>> >> >>>> timestamp of the output? I'm a bit ambivalent about calling it >>>> >> >>>> multiple times if OuptutReceiver alone is in the parameter >>>> list; this >>>> >> >>>> might not be obvious and could be surprising behavior. >>>> >> >>>> >>>> >> >>>> On Mon, May 4, 2020 at 10:13 AM Reuven Lax <[email protected]> >>>> wrote: >>>> >> >>>> > >>>> >> >>>> > I would like to discuss a minor extension to the Beam model. >>>> >> >>>> > >>>> >> >>>> > Beam bundles have very few restrictions on what can be in a >>>> bundle, in particular s bundle might contain records for many different >>>> windows. This was an explicit decision as bundling primarily exists for >>>> performance reasons and we found that limiting bundling based on windows or >>>> timestamps often led to severe performance problems. However it sometimes >>>> makes finishBundle hard to use. >>>> >> >>>> > >>>> >> >>>> > I've seen multiple cases where users maintain some state in >>>> their DoFn that needs finalizing (e.g. writing to an external service) in >>>> finishBundle. Often users end up keeping lists of all windows seen in the >>>> bundle so they can be processed separately (or sometimes not realizing that >>>> their can be multiple windows and writing incorrect code). >>>> >> >>>> > >>>> >> >>>> > The lack of a window also means that we don't currently >>>> support injecting an OuptutReceiver into finishBundle, as there's no good >>>> way of knowing which window output should be put into. >>>> >> >>>> > >>>> >> >>>> > I would like to propose adding a way for finishBundle to >>>> inspect the window, either directly (via a BoundedWindow parameter) or >>>> indirectly (via an OutputReceiver parameter). In this case, we will execute >>>> finishBundle once per window in the bundle. Otherwise, we will execute >>>> finishBundle once at the end of the bundle as before. This behavior is >>>> backwards compatible, as previously these parameters were disallowed in >>>> finishBundle. >>>> >> >>>> > >>>> >> >>>> > Note that this is similar to something Beam already does in >>>> processElement. A single element can exist in multiple windows, however if >>>> the processElement "observes" the window then Beam will call processElement >>>> once per window. >>>> >> >>>> > >>>> >> >>>> > In Java, the user code could look like this: >>>> >> >>>> > >>>> >> >>>> > DoFn<> { >>>> >> >>>> > ... >>>> >> >>>> > @FinishBundle >>>> >> >>>> > public void finishBundle(IntervalWindow window, >>>> OutputReceiver<T> o) { >>>> >> >>>> > // This finishBundle will be called once per window in >>>> the bundle since it has >>>> >> >>>> > // a parameter that observes the window. >>>> >> >>>> > } >>>> >> >>>> > } >>>> >> >>>> > >>>> >> >>>> > This PR shows an implementation of this extension for the >>>> Java SDK. >>>> >> >>>> > >>>> >> >>>> > Thoughts? >>>> >> >>>> > >>>> >> >>>> > Reuven >>>> >>>
