One more question about GroupIntoBatches. The current code has a "prefetch"
frequency built in that calls readLater on the bag state every time we
reach 20% of the batch size. Does anyone know what this is for? Was this
intended to be an optimization, and if so did it help?

On Thu, May 20, 2021 at 10:08 AM Reuven Lax <re...@google.com> wrote:

> I don't think that would work well. It's entirely possible that the input
> watermark will already have passed the timestamp of the hold, in which case
> an event-time timer would fire immediately. You could make it a looping
> timer, but the new timer would also fire immediately after being set, and a
> tight timer loop isn't a good idea.
>
> A Timer.get() is one solution, though I think the only way we have to
> implement it is to store the timer's timestamp in a ValueState; doing this
> for every timer would add a lot of cost to pipelines.
>
> Another option is a Timer.getCurrentTime() method, that would return the
> current base time that setRelative is based off of. It seems like a strange
> function to add to Timer though.
>
> Another option is to use TimerMap to bucket timers. Every minute we round
> the current processing time to the nearest minute and set a timer with an
> expiration of that minute (and with the minute timestamp as its tag as
> well). This way we would have a continuous sequence of expiring timers, and
> we wouldn't have to set just the first one. The biggest problem with this
> approach is that we would also have to use MapState to store the desired
> watermark hold per  processing-time bucket. MapState is not supported by
> many runners yet, so I don't want to use it in a basic transform like
> GroupIntoBatches (furthermore - a transform that works on most runners
> today).
>
> Reuven
>
>
> On Thu, May 20, 2021 at 2:11 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Sounds like you could solve that using second event time timer, that
>> would be actually used only to hold the output timestamp (watermark hold).
>> Something like
>>
>>
>> eventTimer.withOutputTimestamp(currentBufferMinimalStamp).offset(Duration.ZERO).setRelative()
>>
>> when the timer fires, you would only reset the minimum.
>>
>> It is sort of ugly, though. It would be cool to have a way to get the
>> current timestamp a timer is set to (if any).
>>
>>  Jan
>> On 5/20/21 3:12 AM, Reuven Lax wrote:
>>
>> 100% - the contract should not change because things are in a bundle.
>> IIRC there are some open bugs in Beam around this that really should be
>> fixed.
>>
>> My issue with GroupIntoBatches is different. This transform works as
>> follows:
>>
>> if (this is the first element in batch - checked by reading a count
>> stored in a ValueState)
>>    timer.offset(bufferingDuration).setRelative()
>>
>> This makes it tricky to use setTimer.withOutputTimestamp. Inputs are not
>> guaranteed to be in order, so simply adding a withOutputTimestamp would set
>> the timestamp to be whatever the first element happened to be; it really
>> should be the minimum timestamp of all elements in the buffer. If we
>> started setting the timer on every element, then timer.offset.setRelative
>> would keep bumping the (processing-time) timer into the future and it would
>> never expire.
>>
>> One solution would be to store the timer timestamp in a ValueState, and
>> use Timer.set to set the timer to an absolute timestamp. This would allow
>> us to always reset the timer to the same expiration target, just modifying
>> the output timestamp each time. However, this will break DirectRunner
>> tests. The DirectRunner allows the user to control the advancement of
>> processing time when using TestStream, but this facility doesn't work well
>> if the transform sets the processing-time timer using absolute set() calls.
>>
>> I'm not sure how to solve this using the existing Timer API.
>>
>> On Wed, May 19, 2021 at 4:39 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> +1. It was my understanding as well that consensus was that timers
>>> must be delivered in timestamp order, and "within bundle"
>>> resetting/clearing of timers should be respected (as if each timer was
>>> in its own bundle).
>>>
>>> On Wed, May 19, 2021 at 3:01 PM Kenneth Knowles <k...@apache.org> wrote:
>>> >
>>> > Reading over the other thread, there was consensus to implement.
>>> >
>>> > Reading commentary on the PR, there were good questions raised about
>>> the semantics. Questions which I feel able to have an opinion about :-)
>>> >
>>> > The questions surrounded bundling and timers in the same bundling
>>> clearing each other. Actually the same questions apply to timers re-setting
>>> later timers and +Jan Lukavský has raised this already (among other people)
>>> so we kind of know the answer now, and I think +Boyuan Zhang code was good
>>> (from my quick read). What has changed is that we have a better idea of the
>>> contract with the runner. I'm not sure if portability makes this more
>>> complex. I will share all my thoughts on this:
>>> >
>>> > I think one key to the Beam model is that bundles are for performance
>>> and also intended as the unit of commitment (until FinishBundle is called,
>>> there may be unfinished work). They can affect *behavior* (what the program
>>> does - including what can be observed) ... but not *semantics* (what the
>>> output means).
>>> >
>>> > So, for example, bundling affects how many files are written but you
>>> are expected to read all the files and the number or ordering you must not
>>> depend on. The behavior is different, but the semantics are the same.
>>> >
>>> > When it comes to timers, behavior and semantics are very tightly
>>> coupled; timers are like a self loop. The firing of a timer is a behavior
>>> w.r.t. the output of the stateful ParDo but it is semantics of the timer
>>> itself (my definitions don't actually apply so simply so don't dig too hard
>>> :-). So to get bundling-invariant semantics, we should try for
>>> bundling-invariant behavior. When some clock moves from T to T+D between
>>> bundles then all the timers in the range [T, T+D) may fire so they are
>>> delivered in the bundle. I believe we have in the prior re-setting issue
>>> agreed that timers should always be called back in timestamp order.
>>> Bundling-invariance then implies that earlier timers can clear later timers
>>> just like they can re-set them. So even though a timer is delivered in a
>>> bundle, the local state of the timer wins. This matches how state works as
>>> well; no matter how things are bundled, the state you read is always
>>> whatever was written last.
>>> >
>>> > Kenn
>>> >
>>> > On Tue, May 11, 2021 at 1:24 PM Siyuan Chen <syc...@google.com> wrote:
>>> >>
>>> >>
>>> >>
>>> >> On Tue, May 11, 2021 at 11:08 AM Reuven Lax <re...@google.com> wrote:
>>> >>>
>>> >>>
>>> >>>
>>> >>> On Tue, May 11, 2021 at 9:01 AM Kenneth Knowles <k...@apache.org>
>>> wrote:
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> On Mon, May 10, 2021 at 7:40 PM Reuven Lax <re...@google.com>
>>> wrote:
>>> >>>>>
>>> >>>>> Hi,
>>> >>>>>
>>> >>>>> I've been looking at the implementation of GroupIntoBatches
>>> (hoping to add support to group based on byte size), and I have a few
>>> questions about the current implementation.
>>> >>>>>
>>> >>>>> 1. I noticed that the transform does not preserve input
>>> timestamps. The output timestamp is not explicitly set, so it will be
>>> whatever the default output timestamp. Confusingly this will depend on
>>> which path is taken. If the batch is output while processing an element,
>>> then the timestamp of tha element will be used. If the batch is output from
>>> the timer, then the processing-time value of the timer will be used.
>>> >>>>
>>> >>>>
>>> >>>> Seems like bugs.
>>> >>>>
>>> >>>>>
>>> >>>>>      - Should I start setting outputTimestamp explicitly - ideally
>>> to the minimum timestamp in the current batch of elements?
>>> >>>>
>>> >>>>
>>> >>>> That's a sensible default. Could pass a TimestampCombiner if that
>>> leads to watermark delays. Given the nature of the transform, the simple
>>> approach will probably be fine.
>>> >>>
>>> >>>
>>> >>> This is a bit tricky to do. in order to update the output timestamp,
>>> we need to call setTimer again. However since we are calling
>>> timer.offset().setRelative(), this will keep bumping the timer into the
>>> future  and it will never fire.
>>> >>>
>>> >>> One solution would be for GIB to just store the current timer ts in
>>> state, and make sure that keep setting the same target until the timer
>>> fires. However that precludes us from using setRelative (since that method
>>> does not allow the DoFn to see what target time is being set). I think that
>>> this approach won't play nice with TestStream.advanceProcessingTimeTo.
>>> >>>
>>> >>> We could also add a way to update _just_ the output time for a timer
>>> without resetting the entire timer.
>>> >>>
>>> >>>>
>>> >>>>
>>> >>>>>      - Should we have the option to preserve all element
>>> timestamps? this could look something like this:
>>> >>>>>
>>> >>>>>     PColllection<KV<K, TimestampedValue<V>>> batches =
>>> >>>>>
>>>  input.apply(GroupIntoBatches.ofSize(N).withTimestampedValues());
>>> >>>>
>>> >>>>
>>> >>>> This seems useful.
>>> >>>>
>>> >>>>>
>>> >>>>> 2. flushBatch always resets the timer, even after the batch is
>>> processed. The only reason I can think of for doing so is to update the
>>> watermark hold. TimerInternals contains a deleteTimer method - is there any
>>> reason we shouldn't simply implement Timer.clear and hook it up to the
>>> existing deleteTimer?
>>> >>>>
>>> >>>>
>>> >>>> Makes sense to me. Prior thread on this seemed to have lightweight
>>> consensus:
>>> https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E
>>> >>
>>> >> There was another thread about adding Timer.clear in Java:
>>> https://lists.apache.org/thread.html/r47f5f14ba9729a2800222440425543ef40bea12eef1f2739d42e75db%40%3Cdev.beam.apache.org%3E
>>> >> and a previous attempt in https://github.com/apache/beam/pull/12836.
>>> Boyuan might have more insights into this +Boyuan Zhang
>>> >>>>
>>> >>>>
>>> >>>>>
>>> >>>>> 3. This transform was implemented before OnWindowExpiration was
>>> implemented. I think we should add a window-expiration callback, and stop
>>> setting the end-of window timer.
>>> >>>>
>>> >>>>
>>> >>>> +1
>>> >>>>
>>> >>>> Kenn
>>>
>>

Reply via email to