+1. It was my understanding as well that consensus was that timers must be delivered in timestamp order, and "within bundle" resetting/clearing of timers should be respected (as if each timer was in its own bundle).
On Wed, May 19, 2021 at 3:01 PM Kenneth Knowles <k...@apache.org> wrote: > > Reading over the other thread, there was consensus to implement. > > Reading commentary on the PR, there were good questions raised about the > semantics. Questions which I feel able to have an opinion about :-) > > The questions surrounded bundling and timers in the same bundling clearing > each other. Actually the same questions apply to timers re-setting later > timers and +Jan Lukavský has raised this already (among other people) so we > kind of know the answer now, and I think +Boyuan Zhang code was good (from my > quick read). What has changed is that we have a better idea of the contract > with the runner. I'm not sure if portability makes this more complex. I will > share all my thoughts on this: > > I think one key to the Beam model is that bundles are for performance and > also intended as the unit of commitment (until FinishBundle is called, there > may be unfinished work). They can affect *behavior* (what the program does - > including what can be observed) ... but not *semantics* (what the output > means). > > So, for example, bundling affects how many files are written but you are > expected to read all the files and the number or ordering you must not depend > on. The behavior is different, but the semantics are the same. > > When it comes to timers, behavior and semantics are very tightly coupled; > timers are like a self loop. The firing of a timer is a behavior w.r.t. the > output of the stateful ParDo but it is semantics of the timer itself (my > definitions don't actually apply so simply so don't dig too hard :-). So to > get bundling-invariant semantics, we should try for bundling-invariant > behavior. When some clock moves from T to T+D between bundles then all the > timers in the range [T, T+D) may fire so they are delivered in the bundle. I > believe we have in the prior re-setting issue agreed that timers should > always be called back in timestamp order. Bundling-invariance then implies > that earlier timers can clear later timers just like they can re-set them. So > even though a timer is delivered in a bundle, the local state of the timer > wins. This matches how state works as well; no matter how things are bundled, > the state you read is always whatever was written last. > > Kenn > > On Tue, May 11, 2021 at 1:24 PM Siyuan Chen <syc...@google.com> wrote: >> >> >> >> On Tue, May 11, 2021 at 11:08 AM Reuven Lax <re...@google.com> wrote: >>> >>> >>> >>> On Tue, May 11, 2021 at 9:01 AM Kenneth Knowles <k...@apache.org> wrote: >>>> >>>> >>>> >>>> On Mon, May 10, 2021 at 7:40 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I've been looking at the implementation of GroupIntoBatches (hoping to >>>>> add support to group based on byte size), and I have a few questions >>>>> about the current implementation. >>>>> >>>>> 1. I noticed that the transform does not preserve input timestamps. The >>>>> output timestamp is not explicitly set, so it will be whatever the >>>>> default output timestamp. Confusingly this will depend on which path is >>>>> taken. If the batch is output while processing an element, then the >>>>> timestamp of tha element will be used. If the batch is output from the >>>>> timer, then the processing-time value of the timer will be used. >>>> >>>> >>>> Seems like bugs. >>>> >>>>> >>>>> - Should I start setting outputTimestamp explicitly - ideally to the >>>>> minimum timestamp in the current batch of elements? >>>> >>>> >>>> That's a sensible default. Could pass a TimestampCombiner if that leads to >>>> watermark delays. Given the nature of the transform, the simple approach >>>> will probably be fine. >>> >>> >>> This is a bit tricky to do. in order to update the output timestamp, we >>> need to call setTimer again. However since we are calling >>> timer.offset().setRelative(), this will keep bumping the timer into the >>> future and it will never fire. >>> >>> One solution would be for GIB to just store the current timer ts in state, >>> and make sure that keep setting the same target until the timer fires. >>> However that precludes us from using setRelative (since that method does >>> not allow the DoFn to see what target time is being set). I think that this >>> approach won't play nice with TestStream.advanceProcessingTimeTo. >>> >>> We could also add a way to update _just_ the output time for a timer >>> without resetting the entire timer. >>> >>>> >>>> >>>>> - Should we have the option to preserve all element timestamps? this >>>>> could look something like this: >>>>> >>>>> PColllection<KV<K, TimestampedValue<V>>> batches = >>>>> >>>>> input.apply(GroupIntoBatches.ofSize(N).withTimestampedValues()); >>>> >>>> >>>> This seems useful. >>>> >>>>> >>>>> 2. flushBatch always resets the timer, even after the batch is processed. >>>>> The only reason I can think of for doing so is to update the watermark >>>>> hold. TimerInternals contains a deleteTimer method - is there any reason >>>>> we shouldn't simply implement Timer.clear and hook it up to the existing >>>>> deleteTimer? >>>> >>>> >>>> Makes sense to me. Prior thread on this seemed to have lightweight >>>> consensus: >>>> https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E >> >> There was another thread about adding Timer.clear in Java: >> https://lists.apache.org/thread.html/r47f5f14ba9729a2800222440425543ef40bea12eef1f2739d42e75db%40%3Cdev.beam.apache.org%3E >> and a previous attempt in https://github.com/apache/beam/pull/12836. Boyuan >> might have more insights into this +Boyuan Zhang >>>> >>>> >>>>> >>>>> 3. This transform was implemented before OnWindowExpiration was >>>>> implemented. I think we should add a window-expiration callback, and stop >>>>> setting the end-of window timer. >>>> >>>> >>>> +1 >>>> >>>> Kenn