+1. It was my understanding as well that consensus was that timers
must be delivered in timestamp order, and "within bundle"
resetting/clearing of timers should be respected (as if each timer was
in its own bundle).

On Wed, May 19, 2021 at 3:01 PM Kenneth Knowles <k...@apache.org> wrote:
>
> Reading over the other thread, there was consensus to implement.
>
> Reading commentary on the PR, there were good questions raised about the 
> semantics. Questions which I feel able to have an opinion about :-)
>
> The questions surrounded bundling and timers in the same bundling clearing 
> each other. Actually the same questions apply to timers re-setting later 
> timers and +Jan Lukavský has raised this already (among other people) so we 
> kind of know the answer now, and I think +Boyuan Zhang code was good (from my 
> quick read). What has changed is that we have a better idea of the contract 
> with the runner. I'm not sure if portability makes this more complex. I will 
> share all my thoughts on this:
>
> I think one key to the Beam model is that bundles are for performance and 
> also intended as the unit of commitment (until FinishBundle is called, there 
> may be unfinished work). They can affect *behavior* (what the program does - 
> including what can be observed) ... but not *semantics* (what the output 
> means).
>
> So, for example, bundling affects how many files are written but you are 
> expected to read all the files and the number or ordering you must not depend 
> on. The behavior is different, but the semantics are the same.
>
> When it comes to timers, behavior and semantics are very tightly coupled; 
> timers are like a self loop. The firing of a timer is a behavior w.r.t. the 
> output of the stateful ParDo but it is semantics of the timer itself (my 
> definitions don't actually apply so simply so don't dig too hard :-). So to 
> get bundling-invariant semantics, we should try for bundling-invariant 
> behavior. When some clock moves from T to T+D between bundles then all the 
> timers in the range [T, T+D) may fire so they are delivered in the bundle. I 
> believe we have in the prior re-setting issue agreed that timers should 
> always be called back in timestamp order. Bundling-invariance then implies 
> that earlier timers can clear later timers just like they can re-set them. So 
> even though a timer is delivered in a bundle, the local state of the timer 
> wins. This matches how state works as well; no matter how things are bundled, 
> the state you read is always whatever was written last.
>
> Kenn
>
> On Tue, May 11, 2021 at 1:24 PM Siyuan Chen <syc...@google.com> wrote:
>>
>>
>>
>> On Tue, May 11, 2021 at 11:08 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>
>>>
>>> On Tue, May 11, 2021 at 9:01 AM Kenneth Knowles <k...@apache.org> wrote:
>>>>
>>>>
>>>>
>>>> On Mon, May 10, 2021 at 7:40 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I've been looking at the implementation of GroupIntoBatches (hoping to 
>>>>> add support to group based on byte size), and I have a few questions 
>>>>> about the current implementation.
>>>>>
>>>>> 1. I noticed that the transform does not preserve input timestamps. The 
>>>>> output timestamp is not explicitly set, so it will be whatever the 
>>>>> default output timestamp. Confusingly this will depend on which path is 
>>>>> taken. If the batch is output while processing an element, then the 
>>>>> timestamp of tha element will be used. If the batch is output from the 
>>>>> timer, then the processing-time value of the timer will be used.
>>>>
>>>>
>>>> Seems like bugs.
>>>>
>>>>>
>>>>>      - Should I start setting outputTimestamp explicitly - ideally to the 
>>>>> minimum timestamp in the current batch of elements?
>>>>
>>>>
>>>> That's a sensible default. Could pass a TimestampCombiner if that leads to 
>>>> watermark delays. Given the nature of the transform, the simple approach 
>>>> will probably be fine.
>>>
>>>
>>> This is a bit tricky to do. in order to update the output timestamp, we 
>>> need to call setTimer again. However since we are calling 
>>> timer.offset().setRelative(), this will keep bumping the timer into the 
>>> future  and it will never fire.
>>>
>>> One solution would be for GIB to just store the current timer ts in state, 
>>> and make sure that keep setting the same target until the timer fires. 
>>> However that precludes us from using setRelative (since that method does 
>>> not allow the DoFn to see what target time is being set). I think that this 
>>> approach won't play nice with TestStream.advanceProcessingTimeTo.
>>>
>>> We could also add a way to update _just_ the output time for a timer 
>>> without resetting the entire timer.
>>>
>>>>
>>>>
>>>>>      - Should we have the option to preserve all element timestamps? this 
>>>>> could look something like this:
>>>>>
>>>>>     PColllection<KV<K, TimestampedValue<V>>> batches =
>>>>>                 
>>>>> input.apply(GroupIntoBatches.ofSize(N).withTimestampedValues());
>>>>
>>>>
>>>> This seems useful.
>>>>
>>>>>
>>>>> 2. flushBatch always resets the timer, even after the batch is processed. 
>>>>> The only reason I can think of for doing so is to update the watermark 
>>>>> hold. TimerInternals contains a deleteTimer method - is there any reason 
>>>>> we shouldn't simply implement Timer.clear and hook it up to the existing 
>>>>> deleteTimer?
>>>>
>>>>
>>>> Makes sense to me. Prior thread on this seemed to have lightweight 
>>>> consensus: 
>>>> https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E
>>
>> There was another thread about adding Timer.clear in Java: 
>> https://lists.apache.org/thread.html/r47f5f14ba9729a2800222440425543ef40bea12eef1f2739d42e75db%40%3Cdev.beam.apache.org%3E
>> and a previous attempt in https://github.com/apache/beam/pull/12836. Boyuan 
>> might have more insights into this +Boyuan Zhang
>>>>
>>>>
>>>>>
>>>>> 3. This transform was implemented before OnWindowExpiration was 
>>>>> implemented. I think we should add a window-expiration callback, and stop 
>>>>> setting the end-of window timer.
>>>>
>>>>
>>>> +1
>>>>
>>>> Kenn

Reply via email to