One more question about GroupIntoBatches. The current code has a "prefetch" frequency built in that calls readLater on the bag state every time we reach 20% of the batch size. Does anyone know what this is for? Was this intended to be an optimization, and if so did it help?
On Thu, May 20, 2021 at 10:08 AM Reuven Lax <re...@google.com> wrote: > I don't think that would work well. It's entirely possible that the input > watermark will already have passed the timestamp of the hold, in which case > an event-time timer would fire immediately. You could make it a looping > timer, but the new timer would also fire immediately after being set, and a > tight timer loop isn't a good idea. > > A Timer.get() is one solution, though I think the only way we have to > implement it is to store the timer's timestamp in a ValueState; doing this > for every timer would add a lot of cost to pipelines. > > Another option is a Timer.getCurrentTime() method, that would return the > current base time that setRelative is based off of. It seems like a strange > function to add to Timer though. > > Another option is to use TimerMap to bucket timers. Every minute we round > the current processing time to the nearest minute and set a timer with an > expiration of that minute (and with the minute timestamp as its tag as > well). This way we would have a continuous sequence of expiring timers, and > we wouldn't have to set just the first one. The biggest problem with this > approach is that we would also have to use MapState to store the desired > watermark hold per processing-time bucket. MapState is not supported by > many runners yet, so I don't want to use it in a basic transform like > GroupIntoBatches (furthermore - a transform that works on most runners > today). > > Reuven > > > On Thu, May 20, 2021 at 2:11 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Sounds like you could solve that using second event time timer, that >> would be actually used only to hold the output timestamp (watermark hold). >> Something like >> >> >> eventTimer.withOutputTimestamp(currentBufferMinimalStamp).offset(Duration.ZERO).setRelative() >> >> when the timer fires, you would only reset the minimum. >> >> It is sort of ugly, though. It would be cool to have a way to get the >> current timestamp a timer is set to (if any). >> >> Jan >> On 5/20/21 3:12 AM, Reuven Lax wrote: >> >> 100% - the contract should not change because things are in a bundle. >> IIRC there are some open bugs in Beam around this that really should be >> fixed. >> >> My issue with GroupIntoBatches is different. This transform works as >> follows: >> >> if (this is the first element in batch - checked by reading a count >> stored in a ValueState) >> timer.offset(bufferingDuration).setRelative() >> >> This makes it tricky to use setTimer.withOutputTimestamp. Inputs are not >> guaranteed to be in order, so simply adding a withOutputTimestamp would set >> the timestamp to be whatever the first element happened to be; it really >> should be the minimum timestamp of all elements in the buffer. If we >> started setting the timer on every element, then timer.offset.setRelative >> would keep bumping the (processing-time) timer into the future and it would >> never expire. >> >> One solution would be to store the timer timestamp in a ValueState, and >> use Timer.set to set the timer to an absolute timestamp. This would allow >> us to always reset the timer to the same expiration target, just modifying >> the output timestamp each time. However, this will break DirectRunner >> tests. The DirectRunner allows the user to control the advancement of >> processing time when using TestStream, but this facility doesn't work well >> if the transform sets the processing-time timer using absolute set() calls. >> >> I'm not sure how to solve this using the existing Timer API. >> >> On Wed, May 19, 2021 at 4:39 PM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> +1. It was my understanding as well that consensus was that timers >>> must be delivered in timestamp order, and "within bundle" >>> resetting/clearing of timers should be respected (as if each timer was >>> in its own bundle). >>> >>> On Wed, May 19, 2021 at 3:01 PM Kenneth Knowles <k...@apache.org> wrote: >>> > >>> > Reading over the other thread, there was consensus to implement. >>> > >>> > Reading commentary on the PR, there were good questions raised about >>> the semantics. Questions which I feel able to have an opinion about :-) >>> > >>> > The questions surrounded bundling and timers in the same bundling >>> clearing each other. Actually the same questions apply to timers re-setting >>> later timers and +Jan Lukavský has raised this already (among other people) >>> so we kind of know the answer now, and I think +Boyuan Zhang code was good >>> (from my quick read). What has changed is that we have a better idea of the >>> contract with the runner. I'm not sure if portability makes this more >>> complex. I will share all my thoughts on this: >>> > >>> > I think one key to the Beam model is that bundles are for performance >>> and also intended as the unit of commitment (until FinishBundle is called, >>> there may be unfinished work). They can affect *behavior* (what the program >>> does - including what can be observed) ... but not *semantics* (what the >>> output means). >>> > >>> > So, for example, bundling affects how many files are written but you >>> are expected to read all the files and the number or ordering you must not >>> depend on. The behavior is different, but the semantics are the same. >>> > >>> > When it comes to timers, behavior and semantics are very tightly >>> coupled; timers are like a self loop. The firing of a timer is a behavior >>> w.r.t. the output of the stateful ParDo but it is semantics of the timer >>> itself (my definitions don't actually apply so simply so don't dig too hard >>> :-). So to get bundling-invariant semantics, we should try for >>> bundling-invariant behavior. When some clock moves from T to T+D between >>> bundles then all the timers in the range [T, T+D) may fire so they are >>> delivered in the bundle. I believe we have in the prior re-setting issue >>> agreed that timers should always be called back in timestamp order. >>> Bundling-invariance then implies that earlier timers can clear later timers >>> just like they can re-set them. So even though a timer is delivered in a >>> bundle, the local state of the timer wins. This matches how state works as >>> well; no matter how things are bundled, the state you read is always >>> whatever was written last. >>> > >>> > Kenn >>> > >>> > On Tue, May 11, 2021 at 1:24 PM Siyuan Chen <syc...@google.com> wrote: >>> >> >>> >> >>> >> >>> >> On Tue, May 11, 2021 at 11:08 AM Reuven Lax <re...@google.com> wrote: >>> >>> >>> >>> >>> >>> >>> >>> On Tue, May 11, 2021 at 9:01 AM Kenneth Knowles <k...@apache.org> >>> wrote: >>> >>>> >>> >>>> >>> >>>> >>> >>>> On Mon, May 10, 2021 at 7:40 PM Reuven Lax <re...@google.com> >>> wrote: >>> >>>>> >>> >>>>> Hi, >>> >>>>> >>> >>>>> I've been looking at the implementation of GroupIntoBatches >>> (hoping to add support to group based on byte size), and I have a few >>> questions about the current implementation. >>> >>>>> >>> >>>>> 1. I noticed that the transform does not preserve input >>> timestamps. The output timestamp is not explicitly set, so it will be >>> whatever the default output timestamp. Confusingly this will depend on >>> which path is taken. If the batch is output while processing an element, >>> then the timestamp of tha element will be used. If the batch is output from >>> the timer, then the processing-time value of the timer will be used. >>> >>>> >>> >>>> >>> >>>> Seems like bugs. >>> >>>> >>> >>>>> >>> >>>>> - Should I start setting outputTimestamp explicitly - ideally >>> to the minimum timestamp in the current batch of elements? >>> >>>> >>> >>>> >>> >>>> That's a sensible default. Could pass a TimestampCombiner if that >>> leads to watermark delays. Given the nature of the transform, the simple >>> approach will probably be fine. >>> >>> >>> >>> >>> >>> This is a bit tricky to do. in order to update the output timestamp, >>> we need to call setTimer again. However since we are calling >>> timer.offset().setRelative(), this will keep bumping the timer into the >>> future and it will never fire. >>> >>> >>> >>> One solution would be for GIB to just store the current timer ts in >>> state, and make sure that keep setting the same target until the timer >>> fires. However that precludes us from using setRelative (since that method >>> does not allow the DoFn to see what target time is being set). I think that >>> this approach won't play nice with TestStream.advanceProcessingTimeTo. >>> >>> >>> >>> We could also add a way to update _just_ the output time for a timer >>> without resetting the entire timer. >>> >>> >>> >>>> >>> >>>> >>> >>>>> - Should we have the option to preserve all element >>> timestamps? this could look something like this: >>> >>>>> >>> >>>>> PColllection<KV<K, TimestampedValue<V>>> batches = >>> >>>>> >>> input.apply(GroupIntoBatches.ofSize(N).withTimestampedValues()); >>> >>>> >>> >>>> >>> >>>> This seems useful. >>> >>>> >>> >>>>> >>> >>>>> 2. flushBatch always resets the timer, even after the batch is >>> processed. The only reason I can think of for doing so is to update the >>> watermark hold. TimerInternals contains a deleteTimer method - is there any >>> reason we shouldn't simply implement Timer.clear and hook it up to the >>> existing deleteTimer? >>> >>>> >>> >>>> >>> >>>> Makes sense to me. Prior thread on this seemed to have lightweight >>> consensus: >>> https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E >>> >> >>> >> There was another thread about adding Timer.clear in Java: >>> https://lists.apache.org/thread.html/r47f5f14ba9729a2800222440425543ef40bea12eef1f2739d42e75db%40%3Cdev.beam.apache.org%3E >>> >> and a previous attempt in https://github.com/apache/beam/pull/12836. >>> Boyuan might have more insights into this +Boyuan Zhang >>> >>>> >>> >>>> >>> >>>>> >>> >>>>> 3. This transform was implemented before OnWindowExpiration was >>> implemented. I think we should add a window-expiration callback, and stop >>> setting the end-of window timer. >>> >>>> >>> >>>> >>> >>>> +1 >>> >>>> >>> >>>> Kenn >>> >>