On Tue, May 5, 2020 at 3:08 PM Reuven Lax <[email protected]> wrote:
>
> On Tue, May 5, 2020 at 2:58 PM Robert Bradshaw <[email protected]> wrote:
>>
>> On Mon, May 4, 2020 at 11:08 AM Reuven Lax <[email protected]> wrote:
>> >
>> > This should not affect the ability of the user to specify the output 
>> > timestamp.
>>
>> My question was whether we would require it.
>
>
> My current PR does not - it defaults to end-of-window as the timestamp. 
> However we could also decide to require it.

I'd be more comfortable requiring it for the time being.

>> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> wrote:
>> >
>> > There was a mention in some other thread, that in order to make user 
>> > experience as predictable as possible, we should try to make windows 
>> > idempotent, and once window is assigned, it should be never changed (and 
>> > timestamp move outside of the scope of window, unless a different windowfn 
>> > is applied). Because all Beam window functions are actually time based, 
>> > and output timestamp is known, what is the issue of applying windowfn to 
>> > elements output from @FinishBundle and assign the windows automatically?
>>
>> We used to do exactly this. (I don't recall why it was removed.) If
>> the input element and/or window was queried by the WindowFn
>> (admittedly rare), it would fail at runtime.
>
> When did we used to do this? We've had users writing WindowFns that queried 
> the input element since long before Beam existed.  e.g a window fn that 
> inspected a userId field, and created different sized windows based on the 
> userId.

This is how it started. In particular WindowFn.AssignContext would be
created that through an exception on accessing the unavailable fields
(which would make finalize bundle unsuitable for such WindowFns).

>> On Tue, May 5, 2020 at 2:51 PM Reuven Lax <[email protected]> wrote:
>> >
>> > It's a good question about startBundle - it's something I thought about. 
>> > The problem is that a runner doesn't always know at startBundle what 
>> > windows are in the bundle, and even if it does know it might require the 
>> > runner to run two passes over the bundle to figure this out. Alternatively 
>> > the runner could keep calling startBundle the first time it's seen a new 
>> > window in the bundle, but I think that makes things even weirder. It's 
>> > also worth noting that startBundle is already more limited today - we do 
>> > not support calling output from startBundle, but we do allow calling 
>> > output from finishBundle.
>> >
>> > Reuven
>> >
>> > On Mon, May 4, 2020 at 11:59 PM Jan Lukavský <[email protected]> wrote:
>> >>
>> >> Ah, interesting. That makes windowFn non-idempotent by definition, 
>> >> because its first application (e.g. global window -> interval window) 
>> >> _might_ yield different result than second application with interval 
>> >> window already assigned. On the other hand, let's suppose for a moment we 
>> >> can make windowFn idempotent, would that solve the issue of window 
>> >> assignment for elements output from finishBundle? I understand that 
>> >> window assignment is not only motivation for adding optional window 
>> >> parameter to @FinishBundle, but users might be confused why 
>> >> OutputReceiver is working only when there is Window parameter. It would 
>> >> be nice to have this somewhat more "consistent". And last note - adding 
>> >> the parameter to @FinishBundle seems a little imbalanced - could this be 
>> >> made possible for @StartBundle as well? Should we enforce that both 
>> >> @StartBundle and @FinishBundle have the same signature, or should we 
>> >> accept all combinations?
>> >>
>> >> Jan
>> >>
>> >> On 5/4/20 11:02 PM, Reuven Lax wrote:
>> >>
>> >> I assume you are referring to elements output from finishBundle.
>> >>
>> >> The problem is that the current window is an input to 
>> >> WindowFn.assignWindows. The new window can depend on the timestamp, the 
>> >> element itself, and the original window. I'm not sure how many users rely 
>> >> on this, however it has been part of our public windowing API for a long 
>> >> time, so I would guess that some users do use this in their WindowFns.
>> >>
>> >> Reuven
>> >>
>> >> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> wrote:
>> >>>
>> >>> There was a mention in some other thread, that in order to make user 
>> >>> experience as predictable as possible, we should try to make windows 
>> >>> idempotent, and once window is assigned, it should be never changed (and 
>> >>> timestamp move outside of the scope of window, unless a different 
>> >>> windowfn is applied). Because all Beam window functions are actually 
>> >>> time based, and output timestamp is known, what is the issue of applying 
>> >>> windowfn to elements output from @FinishBundle and assign the windows 
>> >>> automatically?
>> >>>
>> >>> On 5/4/20 8:07 PM, Reuven Lax wrote:
>> >>>
>> >>> This should not affect the ability of the user to specify the output 
>> >>> timestamp. Today FinishBundleContext.output forces you to specify the 
>> >>> window as well as the timestamp, which is a bit awkward. (I believe that 
>> >>> it also lets you create brand new windows in finishBundle, which is 
>> >>> interesting, but I'm not quite sure of the use case).
>> >>>
>> >>> On Mon, May 4, 2020 at 10:29 AM Robert Bradshaw <[email protected]> 
>> >>> wrote:
>> >>>>
>> >>>> This is a really nice idea. Would the user still need to specify the
>> >>>> timestamp of the output? I'm a bit ambivalent about calling it
>> >>>> multiple times if OuptutReceiver alone is in the parameter list; this
>> >>>> might not be obvious and could be surprising behavior.
>> >>>>
>> >>>> On Mon, May 4, 2020 at 10:13 AM Reuven Lax <[email protected]> wrote:
>> >>>> >
>> >>>> > I would like to discuss a minor extension to the Beam model.
>> >>>> >
>> >>>> > Beam bundles have very few restrictions on what can be in a bundle, 
>> >>>> > in particular s bundle might contain records for many different 
>> >>>> > windows. This was an explicit decision as bundling primarily exists 
>> >>>> > for performance reasons and we found that limiting bundling based on 
>> >>>> > windows or timestamps often led to severe performance problems. 
>> >>>> > However it sometimes makes finishBundle hard to use.
>> >>>> >
>> >>>> > I've seen multiple cases where users maintain some state in their 
>> >>>> > DoFn that needs finalizing (e.g. writing to an external service) in 
>> >>>> > finishBundle. Often users end up keeping lists of all windows seen in 
>> >>>> > the bundle so they can be processed separately (or sometimes not 
>> >>>> > realizing that their can be multiple windows and writing incorrect 
>> >>>> > code).
>> >>>> >
>> >>>> > The lack of a window also means that we don't currently support 
>> >>>> > injecting an OuptutReceiver into finishBundle, as there's no good way 
>> >>>> > of knowing which window output should be put into.
>> >>>> >
>> >>>> > I would like to propose adding a way for finishBundle to inspect the 
>> >>>> > window, either directly (via a BoundedWindow parameter) or indirectly 
>> >>>> > (via an OutputReceiver parameter). In this case, we will execute 
>> >>>> > finishBundle once per window in the bundle. Otherwise, we will 
>> >>>> > execute finishBundle once at the end of the bundle as before. This 
>> >>>> > behavior is backwards compatible, as previously these parameters were 
>> >>>> > disallowed in finishBundle.
>> >>>> >
>> >>>> > Note that this is similar to something Beam already does in 
>> >>>> > processElement. A single element can exist in multiple windows, 
>> >>>> > however if the processElement "observes" the window then Beam will 
>> >>>> > call processElement once per window.
>> >>>> >
>> >>>> > In Java, the user code could look like this:
>> >>>> >
>> >>>> > DoFn<> {
>> >>>> >      ...
>> >>>> >    @FinishBundle
>> >>>> >    public void finishBundle(IntervalWindow window, OutputReceiver<T> 
>> >>>> > o) {
>> >>>> >        // This finishBundle will be called once per window in the 
>> >>>> > bundle since it has
>> >>>> >       // a parameter that observes the window.
>> >>>> >    }
>> >>>> > }
>> >>>> >
>> >>>> > This PR shows an implementation of this extension for the Java SDK.
>> >>>> >
>> >>>> > Thoughts?
>> >>>> >
>> >>>> > Reuven

Reply via email to