This is a great idea. I thought that (long ago) we decided not to execute finishBundle per window for these reasons: (1) perf fears (2) naming bikeshed (3) backwards compatibility (even though we hadn't stabilized, it is pervasive). That was before the annotation-driven DoFn I believe, so we didn't have the ability to do it this way. Now this seems like a clear win.
Regarding @StartBundle: I always use and advise others to use lazy init instead of @StartBundle and to think of @FinishBundle as "flush". State-sensitive APIs like "start(); process(); finish()" are usually an anti-pattern since you can almost always write them in a less dangerous way (try-with-resources, Python context managers, etc). Conveniently, this eliminates any consideration of symmetry. Can anyone refresh me on when/whether it is important to have @StartBundle instead of running the same code via lazy init? On Tue, May 5, 2020 at 3:16 PM Robert Bradshaw <[email protected]> wrote: > On Tue, May 5, 2020 at 3:08 PM Reuven Lax <[email protected]> wrote: > > > > On Tue, May 5, 2020 at 2:58 PM Robert Bradshaw <[email protected]> > wrote: > >> > >> On Mon, May 4, 2020 at 11:08 AM Reuven Lax <[email protected]> wrote: > >> > > >> > This should not affect the ability of the user to specify the output > timestamp. > >> > >> My question was whether we would require it. > > > > > > My current PR does not - it defaults to end-of-window as the timestamp. > However we could also decide to require it. > > I'd be more comfortable requiring it for the time being. > +1 for requiring it Kenn >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> wrote: > >> > > >> > There was a mention in some other thread, that in order to make user > experience as predictable as possible, we should try to make windows > idempotent, and once window is assigned, it should be never changed (and > timestamp move outside of the scope of window, unless a different windowfn > is applied). Because all Beam window functions are actually time based, and > output timestamp is known, what is the issue of applying windowfn to > elements output from @FinishBundle and assign the windows automatically? > >> > >> We used to do exactly this. (I don't recall why it was removed.) If > >> the input element and/or window was queried by the WindowFn > >> (admittedly rare), it would fail at runtime. > > > > When did we used to do this? We've had users writing WindowFns that > queried the input element since long before Beam existed. e.g a window fn > that inspected a userId field, and created different sized windows based on > the userId. > > This is how it started. In particular WindowFn.AssignContext would be > created that through an exception on accessing the unavailable fields > (which would make finalize bundle unsuitable for such WindowFns). > Yea this was not great. This also broke the equivalence between WithKeys and AssignWindows. It was really a workaround for the lack of the feature Reuven is proposing. > >> On Tue, May 5, 2020 at 2:51 PM Reuven Lax <[email protected]> wrote: > >> > > >> > It's a good question about startBundle - it's something I thought > about. The problem is that a runner doesn't always know at startBundle what > windows are in the bundle, and even if it does know it might require the > runner to run two passes over the bundle to figure this out. Alternatively > the runner could keep calling startBundle the first time it's seen a new > window in the bundle, but I think that makes things even weirder. It's also > worth noting that startBundle is already more limited today - we do not > support calling output from startBundle, but we do allow calling output > from finishBundle. > >> > > >> > Reuven > >> > > >> > On Mon, May 4, 2020 at 11:59 PM Jan Lukavský <[email protected]> wrote: > >> >> > >> >> Ah, interesting. That makes windowFn non-idempotent by definition, > because its first application (e.g. global window -> interval window) > _might_ yield different result than second application with interval window > already assigned. On the other hand, let's suppose for a moment we can make > windowFn idempotent, would that solve the issue of window assignment for > elements output from finishBundle? I understand that window assignment is > not only motivation for adding optional window parameter to @FinishBundle, > but users might be confused why OutputReceiver is working only when there > is Window parameter. It would be nice to have this somewhat more > "consistent". And last note - adding the parameter to @FinishBundle seems a > little imbalanced - could this be made possible for @StartBundle as well? > Should we enforce that both @StartBundle and @FinishBundle have the same > signature, or should we accept all combinations? > >> >> > >> >> Jan > >> >> > >> >> On 5/4/20 11:02 PM, Reuven Lax wrote: > >> >> > >> >> I assume you are referring to elements output from finishBundle. > >> >> > >> >> The problem is that the current window is an input to > WindowFn.assignWindows. The new window can depend on the timestamp, the > element itself, and the original window. I'm not sure how many users rely > on this, however it has been part of our public windowing API for a long > time, so I would guess that some users do use this in their WindowFns. > >> >> > >> >> Reuven > >> >> > >> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> > wrote: > >> >>> > >> >>> There was a mention in some other thread, that in order to make > user experience as predictable as possible, we should try to make windows > idempotent, and once window is assigned, it should be never changed (and > timestamp move outside of the scope of window, unless a different windowfn > is applied). Because all Beam window functions are actually time based, and > output timestamp is known, what is the issue of applying windowfn to > elements output from @FinishBundle and assign the windows automatically? > >> >>> > >> >>> On 5/4/20 8:07 PM, Reuven Lax wrote: > >> >>> > >> >>> This should not affect the ability of the user to specify the > output timestamp. Today FinishBundleContext.output forces you to specify > the window as well as the timestamp, which is a bit awkward. (I believe > that it also lets you create brand new windows in finishBundle, which is > interesting, but I'm not quite sure of the use case). > >> >>> > >> >>> On Mon, May 4, 2020 at 10:29 AM Robert Bradshaw < > [email protected]> wrote: > >> >>>> > >> >>>> This is a really nice idea. Would the user still need to specify > the > >> >>>> timestamp of the output? I'm a bit ambivalent about calling it > >> >>>> multiple times if OuptutReceiver alone is in the parameter list; > this > >> >>>> might not be obvious and could be surprising behavior. > >> >>>> > >> >>>> On Mon, May 4, 2020 at 10:13 AM Reuven Lax <[email protected]> > wrote: > >> >>>> > > >> >>>> > I would like to discuss a minor extension to the Beam model. > >> >>>> > > >> >>>> > Beam bundles have very few restrictions on what can be in a > bundle, in particular s bundle might contain records for many different > windows. This was an explicit decision as bundling primarily exists for > performance reasons and we found that limiting bundling based on windows or > timestamps often led to severe performance problems. However it sometimes > makes finishBundle hard to use. > >> >>>> > > >> >>>> > I've seen multiple cases where users maintain some state in > their DoFn that needs finalizing (e.g. writing to an external service) in > finishBundle. Often users end up keeping lists of all windows seen in the > bundle so they can be processed separately (or sometimes not realizing that > their can be multiple windows and writing incorrect code). > >> >>>> > > >> >>>> > The lack of a window also means that we don't currently support > injecting an OuptutReceiver into finishBundle, as there's no good way of > knowing which window output should be put into. > >> >>>> > > >> >>>> > I would like to propose adding a way for finishBundle to inspect > the window, either directly (via a BoundedWindow parameter) or indirectly > (via an OutputReceiver parameter). In this case, we will execute > finishBundle once per window in the bundle. Otherwise, we will execute > finishBundle once at the end of the bundle as before. This behavior is > backwards compatible, as previously these parameters were disallowed in > finishBundle. > >> >>>> > > >> >>>> > Note that this is similar to something Beam already does in > processElement. A single element can exist in multiple windows, however if > the processElement "observes" the window then Beam will call processElement > once per window. > >> >>>> > > >> >>>> > In Java, the user code could look like this: > >> >>>> > > >> >>>> > DoFn<> { > >> >>>> > ... > >> >>>> > @FinishBundle > >> >>>> > public void finishBundle(IntervalWindow window, > OutputReceiver<T> o) { > >> >>>> > // This finishBundle will be called once per window in > the bundle since it has > >> >>>> > // a parameter that observes the window. > >> >>>> > } > >> >>>> > } > >> >>>> > > >> >>>> > This PR shows an implementation of this extension for the Java > SDK. > >> >>>> > > >> >>>> > Thoughts? > >> >>>> > > >> >>>> > Reuven >
