Start bundle is useful since the framework provides the necessary synchronization while using lazy init requires you to write it yourself and also pay for it on each process element call.
On Wed, May 6, 2020 at 8:46 AM Kenneth Knowles <[email protected]> wrote: > This is a great idea. I thought that (long ago) we decided not to execute > finishBundle per window for these reasons: (1) perf fears (2) naming > bikeshed (3) backwards compatibility (even though we hadn't stabilized, it > is pervasive). That was before the annotation-driven DoFn I believe, so we > didn't have the ability to do it this way. Now this seems like a clear win. > > Regarding @StartBundle: I always use and advise others to use lazy init > instead of @StartBundle and to think of @FinishBundle as "flush". > State-sensitive APIs like "start(); process(); finish()" are usually an > anti-pattern since you can almost always write them in a less dangerous way > (try-with-resources, Python context managers, etc). Conveniently, this > eliminates any consideration of symmetry. Can anyone refresh me on > when/whether it is important to have @StartBundle instead of running the > same code via lazy init? > > > On Tue, May 5, 2020 at 3:16 PM Robert Bradshaw <[email protected]> > wrote: > >> On Tue, May 5, 2020 at 3:08 PM Reuven Lax <[email protected]> wrote: >> > >> > On Tue, May 5, 2020 at 2:58 PM Robert Bradshaw <[email protected]> >> wrote: >> >> >> >> On Mon, May 4, 2020 at 11:08 AM Reuven Lax <[email protected]> wrote: >> >> > >> >> > This should not affect the ability of the user to specify the output >> timestamp. >> >> >> >> My question was whether we would require it. >> > >> > >> > My current PR does not - it defaults to end-of-window as the timestamp. >> However we could also decide to require it. >> >> I'd be more comfortable requiring it for the time being. >> > > +1 for requiring it > > Kenn > > >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> wrote: >> >> > >> >> > There was a mention in some other thread, that in order to make user >> experience as predictable as possible, we should try to make windows >> idempotent, and once window is assigned, it should be never changed (and >> timestamp move outside of the scope of window, unless a different windowfn >> is applied). Because all Beam window functions are actually time based, and >> output timestamp is known, what is the issue of applying windowfn to >> elements output from @FinishBundle and assign the windows automatically? >> >> >> >> We used to do exactly this. (I don't recall why it was removed.) If >> >> the input element and/or window was queried by the WindowFn >> >> (admittedly rare), it would fail at runtime. >> > >> > When did we used to do this? We've had users writing WindowFns that >> queried the input element since long before Beam existed. e.g a window fn >> that inspected a userId field, and created different sized windows based on >> the userId. >> >> This is how it started. In particular WindowFn.AssignContext would be >> created that through an exception on accessing the unavailable fields >> (which would make finalize bundle unsuitable for such WindowFns). >> > > Yea this was not great. This also broke the equivalence between WithKeys > and AssignWindows. It was really a workaround for the lack of the feature > Reuven is proposing. > > > >> >> On Tue, May 5, 2020 at 2:51 PM Reuven Lax <[email protected]> wrote: >> >> > >> >> > It's a good question about startBundle - it's something I thought >> about. The problem is that a runner doesn't always know at startBundle what >> windows are in the bundle, and even if it does know it might require the >> runner to run two passes over the bundle to figure this out. Alternatively >> the runner could keep calling startBundle the first time it's seen a new >> window in the bundle, but I think that makes things even weirder. It's also >> worth noting that startBundle is already more limited today - we do not >> support calling output from startBundle, but we do allow calling output >> from finishBundle. >> >> > >> >> > Reuven >> >> > >> >> > On Mon, May 4, 2020 at 11:59 PM Jan Lukavský <[email protected]> >> wrote: >> >> >> >> >> >> Ah, interesting. That makes windowFn non-idempotent by definition, >> because its first application (e.g. global window -> interval window) >> _might_ yield different result than second application with interval window >> already assigned. On the other hand, let's suppose for a moment we can make >> windowFn idempotent, would that solve the issue of window assignment for >> elements output from finishBundle? I understand that window assignment is >> not only motivation for adding optional window parameter to @FinishBundle, >> but users might be confused why OutputReceiver is working only when there >> is Window parameter. It would be nice to have this somewhat more >> "consistent". And last note - adding the parameter to @FinishBundle seems a >> little imbalanced - could this be made possible for @StartBundle as well? >> Should we enforce that both @StartBundle and @FinishBundle have the same >> signature, or should we accept all combinations? >> >> >> >> >> >> Jan >> >> >> >> >> >> On 5/4/20 11:02 PM, Reuven Lax wrote: >> >> >> >> >> >> I assume you are referring to elements output from finishBundle. >> >> >> >> >> >> The problem is that the current window is an input to >> WindowFn.assignWindows. The new window can depend on the timestamp, the >> element itself, and the original window. I'm not sure how many users rely >> on this, however it has been part of our public windowing API for a long >> time, so I would guess that some users do use this in their WindowFns. >> >> >> >> >> >> Reuven >> >> >> >> >> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> >> wrote: >> >> >>> >> >> >>> There was a mention in some other thread, that in order to make >> user experience as predictable as possible, we should try to make windows >> idempotent, and once window is assigned, it should be never changed (and >> timestamp move outside of the scope of window, unless a different windowfn >> is applied). Because all Beam window functions are actually time based, and >> output timestamp is known, what is the issue of applying windowfn to >> elements output from @FinishBundle and assign the windows automatically? >> >> >>> >> >> >>> On 5/4/20 8:07 PM, Reuven Lax wrote: >> >> >>> >> >> >>> This should not affect the ability of the user to specify the >> output timestamp. Today FinishBundleContext.output forces you to specify >> the window as well as the timestamp, which is a bit awkward. (I believe >> that it also lets you create brand new windows in finishBundle, which is >> interesting, but I'm not quite sure of the use case). >> >> >>> >> >> >>> On Mon, May 4, 2020 at 10:29 AM Robert Bradshaw < >> [email protected]> wrote: >> >> >>>> >> >> >>>> This is a really nice idea. Would the user still need to specify >> the >> >> >>>> timestamp of the output? I'm a bit ambivalent about calling it >> >> >>>> multiple times if OuptutReceiver alone is in the parameter list; >> this >> >> >>>> might not be obvious and could be surprising behavior. >> >> >>>> >> >> >>>> On Mon, May 4, 2020 at 10:13 AM Reuven Lax <[email protected]> >> wrote: >> >> >>>> > >> >> >>>> > I would like to discuss a minor extension to the Beam model. >> >> >>>> > >> >> >>>> > Beam bundles have very few restrictions on what can be in a >> bundle, in particular s bundle might contain records for many different >> windows. This was an explicit decision as bundling primarily exists for >> performance reasons and we found that limiting bundling based on windows or >> timestamps often led to severe performance problems. However it sometimes >> makes finishBundle hard to use. >> >> >>>> > >> >> >>>> > I've seen multiple cases where users maintain some state in >> their DoFn that needs finalizing (e.g. writing to an external service) in >> finishBundle. Often users end up keeping lists of all windows seen in the >> bundle so they can be processed separately (or sometimes not realizing that >> their can be multiple windows and writing incorrect code). >> >> >>>> > >> >> >>>> > The lack of a window also means that we don't currently support >> injecting an OuptutReceiver into finishBundle, as there's no good way of >> knowing which window output should be put into. >> >> >>>> > >> >> >>>> > I would like to propose adding a way for finishBundle to >> inspect the window, either directly (via a BoundedWindow parameter) or >> indirectly (via an OutputReceiver parameter). In this case, we will execute >> finishBundle once per window in the bundle. Otherwise, we will execute >> finishBundle once at the end of the bundle as before. This behavior is >> backwards compatible, as previously these parameters were disallowed in >> finishBundle. >> >> >>>> > >> >> >>>> > Note that this is similar to something Beam already does in >> processElement. A single element can exist in multiple windows, however if >> the processElement "observes" the window then Beam will call processElement >> once per window. >> >> >>>> > >> >> >>>> > In Java, the user code could look like this: >> >> >>>> > >> >> >>>> > DoFn<> { >> >> >>>> > ... >> >> >>>> > @FinishBundle >> >> >>>> > public void finishBundle(IntervalWindow window, >> OutputReceiver<T> o) { >> >> >>>> > // This finishBundle will be called once per window in >> the bundle since it has >> >> >>>> > // a parameter that observes the window. >> >> >>>> > } >> >> >>>> > } >> >> >>>> > >> >> >>>> > This PR shows an implementation of this extension for the Java >> SDK. >> >> >>>> > >> >> >>>> > Thoughts? >> >> >>>> > >> >> >>>> > Reuven >> >
