If I understand it correctly (and what I have observed from the actual behavior on both FlinkRunner and DirectRunner) a relative timer with zero duration will not fire immediately. It has to wait for the watermark to advance. It requires to fix [1] for the relative timer with output timestamp to work reliably.

 Jan

[1] https://issues.apache.org/jira/browse/BEAM-12276

On 5/20/21 7:08 PM, Reuven Lax wrote:
I don't think that would work well. It's entirely possible that the input watermark will already have passed the timestamp of the hold, in which case an event-time timer would fire immediately. You could make it a looping timer, but the new timer would also fire immediately after being set, and a tight timer loop isn't a good idea.

A Timer.get() is one solution, though I think the only way we have to implement it is to store the timer's timestamp in a ValueState; doing this for every timer would add a lot of cost to pipelines.

Another option is a Timer.getCurrentTime() method, that would return the current base time that setRelative is based off of. It seems like a strange function to add to Timer though.

Another option is to use TimerMap to bucket timers. Every minute we round the current processing time to the nearest minute and set a timer with an expiration of that minute (and with the minute timestamp as its tag as well). This way we would have a continuous sequence of expiring timers, and we wouldn't have to set just the first one. The biggest problem with this approach is that we would also have to use MapState to store the desired watermark hold per  processing-time bucket. MapState is not supported by many runners yet, so I don't want to use it in a basic transform like GroupIntoBatches (furthermore - a transform that works on most runners today).

Reuven


On Thu, May 20, 2021 at 2:11 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Sounds like you could solve that using second event time timer,
    that would be actually used only to hold the output timestamp
    (watermark hold). Something like

    
eventTimer.withOutputTimestamp(currentBufferMinimalStamp).offset(Duration.ZERO).setRelative()

    when the timer fires, you would only reset the minimum.

    It is sort of ugly, though. It would be cool to have a way to get
    the current timestamp a timer is set to (if any).

     Jan

    On 5/20/21 3:12 AM, Reuven Lax wrote:
    100% - the contract should not change because things are in a
    bundle. IIRC there are some open bugs in Beam around this that
    really should be fixed.

    My issue with GroupIntoBatches is different. This transform works
    as follows:

    if (this is the first element in batch - checked by reading a
    count stored in a ValueState)
       timer.offset(bufferingDuration).setRelative()

    This makes it tricky to use setTimer.withOutputTimestamp. Inputs
    are not guaranteed to be in order, so simply adding a
    withOutputTimestamp would set the timestamp to be whatever the
    first element happened to be; it really should be the minimum
    timestamp of all elements in the buffer. If we started setting
    the timer on every element, then timer.offset.setRelative would
    keep bumping the (processing-time) timer into the future and it
    would never expire.

    One solution would be to store the timer timestamp in a
    ValueState, and use Timer.set to set the timer to an absolute
    timestamp. This would allow us to always reset the timer to the
    same expiration target, just modifying the output timestamp each
    time. However, this will break DirectRunner tests. The
    DirectRunner allows the user to control the advancement of
    processing time when using TestStream, but this facility doesn't
    work well if the transform sets the processing-time timer using
    absolute set() calls.

    I'm not sure how to solve this using the existing Timer API.

    On Wed, May 19, 2021 at 4:39 PM Robert Bradshaw
    <rober...@google.com <mailto:rober...@google.com>> wrote:

        +1. It was my understanding as well that consensus was that
        timers
        must be delivered in timestamp order, and "within bundle"
        resetting/clearing of timers should be respected (as if each
        timer was
        in its own bundle).

        On Wed, May 19, 2021 at 3:01 PM Kenneth Knowles
        <k...@apache.org <mailto:k...@apache.org>> wrote:
        >
        > Reading over the other thread, there was consensus to
        implement.
        >
        > Reading commentary on the PR, there were good questions
        raised about the semantics. Questions which I feel able to
        have an opinion about :-)
        >
        > The questions surrounded bundling and timers in the same
        bundling clearing each other. Actually the same questions
        apply to timers re-setting later timers and +Jan Lukavský has
        raised this already (among other people) so we kind of know
        the answer now, and I think +Boyuan Zhang code was good (from
        my quick read). What has changed is that we have a better
        idea of the contract with the runner. I'm not sure if
        portability makes this more complex. I will share all my
        thoughts on this:
        >
        > I think one key to the Beam model is that bundles are for
        performance and also intended as the unit of commitment
        (until FinishBundle is called, there may be unfinished work).
        They can affect *behavior* (what the program does - including
        what can be observed) ... but not *semantics* (what the
        output means).
        >
        > So, for example, bundling affects how many files are
        written but you are expected to read all the files and the
        number or ordering you must not depend on. The behavior is
        different, but the semantics are the same.
        >
        > When it comes to timers, behavior and semantics are very
        tightly coupled; timers are like a self loop. The firing of a
        timer is a behavior w.r.t. the output of the stateful ParDo
        but it is semantics of the timer itself (my definitions don't
        actually apply so simply so don't dig too hard :-). So to get
        bundling-invariant semantics, we should try for
        bundling-invariant behavior. When some clock moves from T to
        T+D between bundles then all the timers in the range [T, T+D)
        may fire so they are delivered in the bundle. I believe we
        have in the prior re-setting issue agreed that timers should
        always be called back in timestamp order. Bundling-invariance
        then implies that earlier timers can clear later timers just
        like they can re-set them. So even though a timer is
        delivered in a bundle, the local state of the timer wins.
        This matches how state works as well; no matter how things
        are bundled, the state you read is always whatever was
        written last.
        >
        > Kenn
        >
        > On Tue, May 11, 2021 at 1:24 PM Siyuan Chen
        <syc...@google.com <mailto:syc...@google.com>> wrote:
        >>
        >>
        >>
        >> On Tue, May 11, 2021 at 11:08 AM Reuven Lax
        <re...@google.com <mailto:re...@google.com>> wrote:
        >>>
        >>>
        >>>
        >>> On Tue, May 11, 2021 at 9:01 AM Kenneth Knowles
        <k...@apache.org <mailto:k...@apache.org>> wrote:
        >>>>
        >>>>
        >>>>
        >>>> On Mon, May 10, 2021 at 7:40 PM Reuven Lax
        <re...@google.com <mailto:re...@google.com>> wrote:
        >>>>>
        >>>>> Hi,
        >>>>>
        >>>>> I've been looking at the implementation of
        GroupIntoBatches (hoping to add support to group based on
        byte size), and I have a few questions about the current
        implementation.
        >>>>>
        >>>>> 1. I noticed that the transform does not preserve input
        timestamps. The output timestamp is not explicitly set, so it
        will be whatever the default output timestamp. Confusingly
        this will depend on which path is taken. If the batch is
        output while processing an element, then the timestamp of tha
        element will be used. If the batch is output from the timer,
        then the processing-time value of the timer will be used.
        >>>>
        >>>>
        >>>> Seems like bugs.
        >>>>
        >>>>>
        >>>>>      - Should I start setting outputTimestamp
        explicitly - ideally to the minimum timestamp in the current
        batch of elements?
        >>>>
        >>>>
        >>>> That's a sensible default. Could pass a
        TimestampCombiner if that leads to watermark delays. Given
        the nature of the transform, the simple approach will
        probably be fine.
        >>>
        >>>
        >>> This is a bit tricky to do. in order to update the output
        timestamp, we need to call setTimer again. However since we
        are calling timer.offset().setRelative(), this will keep
        bumping the timer into the future  and it will never fire.
        >>>
        >>> One solution would be for GIB to just store the current
        timer ts in state, and make sure that keep setting the same
        target until the timer fires. However that precludes us from
        using setRelative (since that method does not allow the DoFn
        to see what target time is being set). I think that this
        approach won't play nice with TestStream.advanceProcessingTimeTo.
        >>>
        >>> We could also add a way to update _just_ the output time
        for a timer without resetting the entire timer.
        >>>
        >>>>
        >>>>
        >>>>>      - Should we have the option to preserve all
        element timestamps? this could look something like this:
        >>>>>
        >>>>>     PColllection<KV<K, TimestampedValue<V>>> batches =
        >>>>>
         input.apply(GroupIntoBatches.ofSize(N).withTimestampedValues());
        >>>>
        >>>>
        >>>> This seems useful.
        >>>>
        >>>>>
        >>>>> 2. flushBatch always resets the timer, even after the
        batch is processed. The only reason I can think of for doing
        so is to update the watermark hold. TimerInternals contains a
        deleteTimer method - is there any reason we shouldn't simply
        implement Timer.clear and hook it up to the existing deleteTimer?
        >>>>
        >>>>
        >>>> Makes sense to me. Prior thread on this seemed to have
        lightweight consensus:
        
https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E
        
<https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E>
        >>
        >> There was another thread about adding Timer.clear in Java:
        
https://lists.apache.org/thread.html/r47f5f14ba9729a2800222440425543ef40bea12eef1f2739d42e75db%40%3Cdev.beam.apache.org%3E
        
<https://lists.apache.org/thread.html/r47f5f14ba9729a2800222440425543ef40bea12eef1f2739d42e75db%40%3Cdev.beam.apache.org%3E>
        >> and a previous attempt in
        https://github.com/apache/beam/pull/12836
        <https://github.com/apache/beam/pull/12836>. Boyuan might
        have more insights into this +Boyuan Zhang
        >>>>
        >>>>
        >>>>>
        >>>>> 3. This transform was implemented before
        OnWindowExpiration was implemented. I think we should add a
        window-expiration callback, and stop setting the end-of
        window timer.
        >>>>
        >>>>
        >>>> +1
        >>>>
        >>>> Kenn

Reply via email to