Reading over the other thread, there was consensus to implement.

Reading commentary on the PR, there were good questions raised about the
semantics. Questions which I feel able to have an opinion about :-)

The questions surrounded bundling and timers in the same bundling clearing
each other. Actually the same questions apply to timers re-setting later
timers and +Jan Lukavský <je...@seznam.cz> has raised this already (among
other people) so we kind of know the answer now, and I think +Boyuan Zhang
<boyu...@google.com> code was good (from my quick read). What has changed
is that we have a better idea of the contract with the runner. I'm not sure
if portability makes this more complex. I will share all my thoughts on
this:

I think one key to the Beam model is that bundles are for performance and
also intended as the unit of commitment (until FinishBundle is called,
there may be unfinished work). They can affect *behavior* (what the program
does - including what can be observed) ... but not *semantics* (what the
output means).

So, for example, bundling affects how many files are written but you are
expected to read all the files and the number or ordering you must not
depend on. The behavior is different, but the semantics are the same.

When it comes to timers, behavior and semantics are very tightly coupled;
timers are like a self loop. The firing of a timer is a behavior w.r.t. the
output of the stateful ParDo but it is semantics of the timer itself (my
definitions don't actually apply so simply so don't dig too hard :-). So to
get bundling-invariant semantics, we should try for bundling-invariant
behavior. When some clock moves from T to T+D between bundles then all the
timers in the range [T, T+D) may fire so they are delivered in the bundle.
I believe we have in the prior re-setting issue agreed that timers should
always be called back in timestamp order. Bundling-invariance then implies
that earlier timers can clear later timers just like they can re-set them.
So even though a timer is delivered in a bundle, the local state of the
timer wins. This matches how state works as well; no matter how things are
bundled, the state you read is always whatever was written last.

Kenn

On Tue, May 11, 2021 at 1:24 PM Siyuan Chen <syc...@google.com> wrote:

>
>
> On Tue, May 11, 2021 at 11:08 AM Reuven Lax <re...@google.com> wrote:
>
>>
>>
>> On Tue, May 11, 2021 at 9:01 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>>
>>>
>>> On Mon, May 10, 2021 at 7:40 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I've been looking at the implementation of GroupIntoBatches (hoping to
>>>> add support to group based on byte size), and I have a few questions about
>>>> the current implementation.
>>>>
>>>> 1. I noticed that the transform does not preserve input timestamps. The
>>>> output timestamp is not explicitly set, so it will be whatever the default
>>>> output timestamp. Confusingly this will depend on which path is taken. If
>>>> the batch is output while processing an element, then the timestamp of tha
>>>> element will be used. If the batch is output from the timer, then the
>>>> processing-time value of the timer will be used.
>>>>
>>>
>>> Seems like bugs.
>>>
>>>
>>>>      - Should I start setting outputTimestamp explicitly - ideally to
>>>> the minimum timestamp in the current batch of elements?
>>>>
>>>
>>> That's a sensible default. Could pass a TimestampCombiner if that leads
>>> to watermark delays. Given the nature of the transform, the simple approach
>>> will probably be fine.
>>>
>>
>> This is a bit tricky to do. in order to update the output timestamp, we
>> need to call setTimer again. However since we are calling
>> timer.offset().setRelative(), this will keep bumping the timer into the
>> future  and it will never fire.
>>
>> One solution would be for GIB to just store the current timer ts in
>> state, and make sure that keep setting the same target until the timer
>> fires. However that precludes us from using setRelative (since that method
>> does not allow the DoFn to see what target time is being set). I think that
>> this approach won't play nice with TestStream.advanceProcessingTimeTo.
>>
>> We could also add a way to update _just_ the output time for a timer
>> without resetting the entire timer.
>>
>>
>>>
>>>      - Should we have the option to preserve all element timestamps?
>>>> this could look something like this:
>>>>
>>>>     PColllection<KV<K, TimestampedValue<V>>> batches =
>>>>
>>>> input.apply(GroupIntoBatches.ofSize(N).withTimestampedValues());
>>>>
>>>
>>> This seems useful.
>>>
>>>
>>>> 2. flushBatch always resets the timer, even after the batch is
>>>> processed. The only reason I can think of for doing so is to update the
>>>> watermark hold. TimerInternals contains a deleteTimer method - is there any
>>>> reason we shouldn't simply implement Timer.clear and hook it up to
>>>> the existing deleteTimer?
>>>>
>>>
>>> Makes sense to me. Prior thread on this seemed to have lightweight
>>> consensus:
>>> https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E
>>>
>> There was another thread about adding Timer.clear in Java:
> https://lists.apache.org/thread.html/r47f5f14ba9729a2800222440425543ef40bea12eef1f2739d42e75db%40%3Cdev.beam.apache.org%3E
> and a previous attempt in https://github.com/apache/beam/pull/12836.
> Boyuan might have more insights into this +Boyuan Zhang
> <boyu...@google.com>
>
>>
>>>
>>>> 3. This transform was implemented before OnWindowExpiration was
>>>> implemented. I think we should add a window-expiration callback, and stop
>>>> setting the end-of window timer.
>>>>
>>>
>>> +1
>>>
>>> Kenn
>>>
>>

Reply via email to