On Mon, May 4, 2020 at 11:08 AM Reuven Lax <[email protected]> wrote:
>
> This should not affect the ability of the user to specify the output 
> timestamp.

My question was whether we would require it.


On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> wrote:
>
> There was a mention in some other thread, that in order to make user 
> experience as predictable as possible, we should try to make windows 
> idempotent, and once window is assigned, it should be never changed (and 
> timestamp move outside of the scope of window, unless a different windowfn is 
> applied). Because all Beam window functions are actually time based, and 
> output timestamp is known, what is the issue of applying windowfn to elements 
> output from @FinishBundle and assign the windows automatically?

We used to do exactly this. (I don't recall why it was removed.) If
the input element and/or window was queried by the WindowFn
(admittedly rare), it would fail at runtime.

On Tue, May 5, 2020 at 2:51 PM Reuven Lax <[email protected]> wrote:
>
> It's a good question about startBundle - it's something I thought about. The 
> problem is that a runner doesn't always know at startBundle what windows are 
> in the bundle, and even if it does know it might require the runner to run 
> two passes over the bundle to figure this out. Alternatively the runner could 
> keep calling startBundle the first time it's seen a new window in the bundle, 
> but I think that makes things even weirder. It's also worth noting that 
> startBundle is already more limited today - we do not support calling output 
> from startBundle, but we do allow calling output from finishBundle.
>
> Reuven
>
> On Mon, May 4, 2020 at 11:59 PM Jan Lukavský <[email protected]> wrote:
>>
>> Ah, interesting. That makes windowFn non-idempotent by definition, because 
>> its first application (e.g. global window -> interval window) _might_ yield 
>> different result than second application with interval window already 
>> assigned. On the other hand, let's suppose for a moment we can make windowFn 
>> idempotent, would that solve the issue of window assignment for elements 
>> output from finishBundle? I understand that window assignment is not only 
>> motivation for adding optional window parameter to @FinishBundle, but users 
>> might be confused why OutputReceiver is working only when there is Window 
>> parameter. It would be nice to have this somewhat more "consistent". And 
>> last note - adding the parameter to @FinishBundle seems a little imbalanced 
>> - could this be made possible for @StartBundle as well? Should we enforce 
>> that both @StartBundle and @FinishBundle have the same signature, or should 
>> we accept all combinations?
>>
>> Jan
>>
>> On 5/4/20 11:02 PM, Reuven Lax wrote:
>>
>> I assume you are referring to elements output from finishBundle.
>>
>> The problem is that the current window is an input to 
>> WindowFn.assignWindows. The new window can depend on the timestamp, the 
>> element itself, and the original window. I'm not sure how many users rely on 
>> this, however it has been part of our public windowing API for a long time, 
>> so I would guess that some users do use this in their WindowFns.
>>
>> Reuven
>>
>> On Mon, May 4, 2020 at 11:48 AM Jan Lukavský <[email protected]> wrote:
>>>
>>> There was a mention in some other thread, that in order to make user 
>>> experience as predictable as possible, we should try to make windows 
>>> idempotent, and once window is assigned, it should be never changed (and 
>>> timestamp move outside of the scope of window, unless a different windowfn 
>>> is applied). Because all Beam window functions are actually time based, and 
>>> output timestamp is known, what is the issue of applying windowfn to 
>>> elements output from @FinishBundle and assign the windows automatically?
>>>
>>> On 5/4/20 8:07 PM, Reuven Lax wrote:
>>>
>>> This should not affect the ability of the user to specify the output 
>>> timestamp. Today FinishBundleContext.output forces you to specify the 
>>> window as well as the timestamp, which is a bit awkward. (I believe that it 
>>> also lets you create brand new windows in finishBundle, which is 
>>> interesting, but I'm not quite sure of the use case).
>>>
>>> On Mon, May 4, 2020 at 10:29 AM Robert Bradshaw <[email protected]> wrote:
>>>>
>>>> This is a really nice idea. Would the user still need to specify the
>>>> timestamp of the output? I'm a bit ambivalent about calling it
>>>> multiple times if OuptutReceiver alone is in the parameter list; this
>>>> might not be obvious and could be surprising behavior.
>>>>
>>>> On Mon, May 4, 2020 at 10:13 AM Reuven Lax <[email protected]> wrote:
>>>> >
>>>> > I would like to discuss a minor extension to the Beam model.
>>>> >
>>>> > Beam bundles have very few restrictions on what can be in a bundle, in 
>>>> > particular s bundle might contain records for many different windows. 
>>>> > This was an explicit decision as bundling primarily exists for 
>>>> > performance reasons and we found that limiting bundling based on windows 
>>>> > or timestamps often led to severe performance problems. However it 
>>>> > sometimes makes finishBundle hard to use.
>>>> >
>>>> > I've seen multiple cases where users maintain some state in their DoFn 
>>>> > that needs finalizing (e.g. writing to an external service) in 
>>>> > finishBundle. Often users end up keeping lists of all windows seen in 
>>>> > the bundle so they can be processed separately (or sometimes not 
>>>> > realizing that their can be multiple windows and writing incorrect code).
>>>> >
>>>> > The lack of a window also means that we don't currently support 
>>>> > injecting an OuptutReceiver into finishBundle, as there's no good way of 
>>>> > knowing which window output should be put into.
>>>> >
>>>> > I would like to propose adding a way for finishBundle to inspect the 
>>>> > window, either directly (via a BoundedWindow parameter) or indirectly 
>>>> > (via an OutputReceiver parameter). In this case, we will execute 
>>>> > finishBundle once per window in the bundle. Otherwise, we will execute 
>>>> > finishBundle once at the end of the bundle as before. This behavior is 
>>>> > backwards compatible, as previously these parameters were disallowed in 
>>>> > finishBundle.
>>>> >
>>>> > Note that this is similar to something Beam already does in 
>>>> > processElement. A single element can exist in multiple windows, however 
>>>> > if the processElement "observes" the window then Beam will call 
>>>> > processElement once per window.
>>>> >
>>>> > In Java, the user code could look like this:
>>>> >
>>>> > DoFn<> {
>>>> >      ...
>>>> >    @FinishBundle
>>>> >    public void finishBundle(IntervalWindow window, OutputReceiver<T> o) {
>>>> >        // This finishBundle will be called once per window in the bundle 
>>>> > since it has
>>>> >       // a parameter that observes the window.
>>>> >    }
>>>> > }
>>>> >
>>>> > This PR shows an implementation of this extension for the Java SDK.
>>>> >
>>>> > Thoughts?
>>>> >
>>>> > Reuven

Reply via email to