StartBundle pre-dated setUp, which makes it less useful than before. With
DoFn re-use, however, startBundle can be used to ensure the DoFn is
instantiated to a clean state.

On Thu, May 7, 2020 at 2:03 PM Reuven Lax <[email protected]> wrote:

> I think startBundle is useful for convenience and performance, but not
> necessarily needed semantically (as Kenn said, you could write your
> pipeline without startBundle). finishBundle has a stronger semantic
> meaning when interpreted as a way of finalizing elements.
>
> On Thu, May 7, 2020 at 2:00 PM Luke Cwik <[email protected]> wrote:
>
>> Start bundle is useful since the framework provides the necessary
>> synchronization while using lazy init requires you to write it yourself and
>> also pay for it on each process element call.
>>
>> On Wed, May 6, 2020 at 8:46 AM Kenneth Knowles <[email protected]> wrote:
>>
>>> This is a great idea. I thought that (long ago) we decided not to
>>> execute finishBundle per window for these reasons: (1) perf fears (2)
>>> naming bikeshed (3) backwards compatibility (even though we hadn't
>>> stabilized, it is pervasive). That was before the annotation-driven DoFn I
>>> believe, so we didn't have the ability to do it this way. Now this seems
>>> like a clear win.
>>>
>>> Regarding @StartBundle: I always use and advise others to use lazy init
>>> instead of @StartBundle and to think of @FinishBundle as "flush".
>>> State-sensitive APIs like "start(); process(); finish()" are usually an
>>> anti-pattern since you can almost always write them in a less dangerous way
>>> (try-with-resources, Python context managers, etc). Conveniently, this
>>> eliminates any consideration of symmetry. Can anyone refresh me on
>>> when/whether it is important to have @StartBundle instead of running the
>>> same code via lazy init?
>>>
>>>
>>> On Tue, May 5, 2020 at 3:16 PM Robert Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> On Tue, May 5, 2020 at 3:08 PM Reuven Lax <[email protected]> wrote:
>>>> >
>>>> > On Tue, May 5, 2020 at 2:58 PM Robert Bradshaw <[email protected]>
>>>> wrote:
>>>> >>
>>>> >> On Mon, May 4, 2020 at 11:08 AM Reuven Lax <[email protected]> wrote:
>>>> >> >
>>>> >> > This should not affect the ability of the user to specify the
>>>> output timestamp.
>>>> >>
>>>> >> My question was whether we would require it.
>>>> >
>>>> >
>>>> > My current PR does not - it defaults to end-of-window as the
>>>> timestamp. However we could also decide to require it.
>>>>
>>>> I'd be more comfortable requiring it for the time being.
>>>>
>>>
>>> +1 for requiring it
>>>
>>> Kenn
>>>
>>> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> wrote:
>>>> >> >
>>>> >> > There was a mention in some other thread, that in order to make
>>>> user experience as predictable as possible, we should try to make windows
>>>> idempotent, and once window is assigned, it should be never changed (and
>>>> timestamp move outside of the scope of window, unless a different windowfn
>>>> is applied). Because all Beam window functions are actually time based, and
>>>> output timestamp is known, what is the issue of applying windowfn to
>>>> elements output from @FinishBundle and assign the windows automatically?
>>>> >>
>>>> >> We used to do exactly this. (I don't recall why it was removed.) If
>>>> >> the input element and/or window was queried by the WindowFn
>>>> >> (admittedly rare), it would fail at runtime.
>>>> >
>>>> > When did we used to do this? We've had users writing WindowFns that
>>>> queried the input element since long before Beam existed.  e.g a window fn
>>>> that inspected a userId field, and created different sized windows based on
>>>> the userId.
>>>>
>>>> This is how it started. In particular WindowFn.AssignContext would be
>>>> created that through an exception on accessing the unavailable fields
>>>> (which would make finalize bundle unsuitable for such WindowFns).
>>>>
>>>
>>> Yea this was not great. This also broke the equivalence between WithKeys
>>> and AssignWindows. It was really a workaround for the lack of the feature
>>> Reuven is proposing.
>>>
>>>
>>>
>>>> >> On Tue, May 5, 2020 at 2:51 PM Reuven Lax <[email protected]> wrote:
>>>> >> >
>>>> >> > It's a good question about startBundle - it's something I thought
>>>> about. The problem is that a runner doesn't always know at startBundle what
>>>> windows are in the bundle, and even if it does know it might require the
>>>> runner to run two passes over the bundle to figure this out. Alternatively
>>>> the runner could keep calling startBundle the first time it's seen a new
>>>> window in the bundle, but I think that makes things even weirder. It's also
>>>> worth noting that startBundle is already more limited today - we do not
>>>> support calling output from startBundle, but we do allow calling output
>>>> from finishBundle.
>>>> >> >
>>>> >> > Reuven
>>>> >> >
>>>> >> > On Mon, May 4, 2020 at 11:59 PM Jan Lukavský <[email protected]>
>>>> wrote:
>>>> >> >>
>>>> >> >> Ah, interesting. That makes windowFn non-idempotent by
>>>> definition, because its first application (e.g. global window -> interval
>>>> window) _might_ yield different result than second application with
>>>> interval window already assigned. On the other hand, let's suppose for a
>>>> moment we can make windowFn idempotent, would that solve the issue of
>>>> window assignment for elements output from finishBundle? I understand that
>>>> window assignment is not only motivation for adding optional window
>>>> parameter to @FinishBundle, but users might be confused why OutputReceiver
>>>> is working only when there is Window parameter. It would be nice to have
>>>> this somewhat more "consistent". And last note - adding the parameter to
>>>> @FinishBundle seems a little imbalanced - could this be made possible for
>>>> @StartBundle as well? Should we enforce that both @StartBundle and
>>>> @FinishBundle have the same signature, or should we accept all 
>>>> combinations?
>>>> >> >>
>>>> >> >> Jan
>>>> >> >>
>>>> >> >> On 5/4/20 11:02 PM, Reuven Lax wrote:
>>>> >> >>
>>>> >> >> I assume you are referring to elements output from finishBundle.
>>>> >> >>
>>>> >> >> The problem is that the current window is an input to
>>>> WindowFn.assignWindows. The new window can depend on the timestamp, the
>>>> element itself, and the original window. I'm not sure how many users rely
>>>> on this, however it has been part of our public windowing API for a long
>>>> time, so I would guess that some users do use this in their WindowFns.
>>>> >> >>
>>>> >> >> Reuven
>>>> >> >>
>>>> >> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]>
>>>> wrote:
>>>> >> >>>
>>>> >> >>> There was a mention in some other thread, that in order to make
>>>> user experience as predictable as possible, we should try to make windows
>>>> idempotent, and once window is assigned, it should be never changed (and
>>>> timestamp move outside of the scope of window, unless a different windowfn
>>>> is applied). Because all Beam window functions are actually time based, and
>>>> output timestamp is known, what is the issue of applying windowfn to
>>>> elements output from @FinishBundle and assign the windows automatically?
>>>> >> >>>
>>>> >> >>> On 5/4/20 8:07 PM, Reuven Lax wrote:
>>>> >> >>>
>>>> >> >>> This should not affect the ability of the user to specify the
>>>> output timestamp. Today FinishBundleContext.output forces you to specify
>>>> the window as well as the timestamp, which is a bit awkward. (I believe
>>>> that it also lets you create brand new windows in finishBundle, which is
>>>> interesting, but I'm not quite sure of the use case).
>>>> >> >>>
>>>> >> >>> On Mon, May 4, 2020 at 10:29 AM Robert Bradshaw <
>>>> [email protected]> wrote:
>>>> >> >>>>
>>>> >> >>>> This is a really nice idea. Would the user still need to
>>>> specify the
>>>> >> >>>> timestamp of the output? I'm a bit ambivalent about calling it
>>>> >> >>>> multiple times if OuptutReceiver alone is in the parameter
>>>> list; this
>>>> >> >>>> might not be obvious and could be surprising behavior.
>>>> >> >>>>
>>>> >> >>>> On Mon, May 4, 2020 at 10:13 AM Reuven Lax <[email protected]>
>>>> wrote:
>>>> >> >>>> >
>>>> >> >>>> > I would like to discuss a minor extension to the Beam model.
>>>> >> >>>> >
>>>> >> >>>> > Beam bundles have very few restrictions on what can be in a
>>>> bundle, in particular s bundle might contain records for many different
>>>> windows. This was an explicit decision as bundling primarily exists for
>>>> performance reasons and we found that limiting bundling based on windows or
>>>> timestamps often led to severe performance problems. However it sometimes
>>>> makes finishBundle hard to use.
>>>> >> >>>> >
>>>> >> >>>> > I've seen multiple cases where users maintain some state in
>>>> their DoFn that needs finalizing (e.g. writing to an external service) in
>>>> finishBundle. Often users end up keeping lists of all windows seen in the
>>>> bundle so they can be processed separately (or sometimes not realizing that
>>>> their can be multiple windows and writing incorrect code).
>>>> >> >>>> >
>>>> >> >>>> > The lack of a window also means that we don't currently
>>>> support injecting an OuptutReceiver into finishBundle, as there's no good
>>>> way of knowing which window output should be put into.
>>>> >> >>>> >
>>>> >> >>>> > I would like to propose adding a way for finishBundle to
>>>> inspect the window, either directly (via a BoundedWindow parameter) or
>>>> indirectly (via an OutputReceiver parameter). In this case, we will execute
>>>> finishBundle once per window in the bundle. Otherwise, we will execute
>>>> finishBundle once at the end of the bundle as before. This behavior is
>>>> backwards compatible, as previously these parameters were disallowed in
>>>> finishBundle.
>>>> >> >>>> >
>>>> >> >>>> > Note that this is similar to something Beam already does in
>>>> processElement. A single element can exist in multiple windows, however if
>>>> the processElement "observes" the window then Beam will call processElement
>>>> once per window.
>>>> >> >>>> >
>>>> >> >>>> > In Java, the user code could look like this:
>>>> >> >>>> >
>>>> >> >>>> > DoFn<> {
>>>> >> >>>> >      ...
>>>> >> >>>> >    @FinishBundle
>>>> >> >>>> >    public void finishBundle(IntervalWindow window,
>>>> OutputReceiver<T> o) {
>>>> >> >>>> >        // This finishBundle will be called once per window in
>>>> the bundle since it has
>>>> >> >>>> >       // a parameter that observes the window.
>>>> >> >>>> >    }
>>>> >> >>>> > }
>>>> >> >>>> >
>>>> >> >>>> > This PR shows an implementation of this extension for the
>>>> Java SDK.
>>>> >> >>>> >
>>>> >> >>>> > Thoughts?
>>>> >> >>>> >
>>>> >> >>>> > Reuven
>>>>
>>>

Reply via email to