Yes, using end-of-window timer would work as well. The problem with output timestamp can be solved using setAllowedTimestampSkew, which is deprecated, but AFAIK there is no working alternative.

Maybe the best option would be to enable modification of the output timestamp only, not the firing timestamp? I think there are cases, that do that and actually store the fire timestamp in a ValueState now. I'm not sure if creating timer.updateOutputTimestampTo could spare us from saving the fire timestamp in a ValueState, though. What is the exact reason we don't know the firing timestamp? I'm not 100% sure of the runner details, but just from the logic - if we can reset the timer by first cancelling it and then setting to a new firing timestamp, there has to be the knowledge, what the current time was, otherwise, how would do you cancel it?

 Jan

On 5/21/21 4:53 PM, Reuven Lax wrote:
Ah I see, you want to set the event-time timer in the future. Could also accomplish this with an end-of-window timer for which we keep updating the hold.

Unfortunately, this still doesn't quite work. The output will need to happen from the processing-time timer, so Beam will likely reject it due to being an output "in the past". In the past is determined by comparing the output timestamp to the timestamp of the current element being processed (for the case of timers, the timestamp of the timer is the output timestamp of that timer). The fact that there is another timer extant with that output timestamp doesn't help here.

On Fri, May 21, 2021 at 2:06 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    If I understand it correctly (and what I have observed from the
    actual behavior on both FlinkRunner and DirectRunner) a relative
    timer with zero duration will not fire immediately. It has to wait
    for the watermark to advance. It requires to fix [1] for the
    relative timer with output timestamp to work reliably.

     Jan

    [1] https://issues.apache.org/jira/browse/BEAM-12276
    <https://issues.apache.org/jira/browse/BEAM-12276>

    On 5/20/21 7:08 PM, Reuven Lax wrote:
    I don't think that would work well. It's entirely possible that
    the input watermark will already have passed the timestamp of the
    hold, in which case an event-time timer would fire immediately.
    You could make it a looping timer, but the new timer would also
    fire immediately after being set, and a tight timer loop isn't a
    good idea.

    A Timer.get() is one solution, though I think the only way we
    have to implement it is to store the timer's timestamp in a
    ValueState; doing this for every timer would add a lot of cost to
    pipelines.

    Another option is a Timer.getCurrentTime() method, that would
    return the current base time that setRelative is based off of. It
    seems like a strange function to add to Timer though.

    Another option is to use TimerMap to bucket timers. Every minute
    we round the current processing time to the nearest minute and
    set a timer with an expiration of that minute (and with the
    minute timestamp as its tag as well). This way we would have a
    continuous sequence of expiring timers, and we wouldn't have to
    set just the first one. The biggest problem with this approach is
    that we would also have to use MapState to store the desired
    watermark hold per  processing-time bucket. MapState is not
    supported by many runners yet, so I don't want to use it in a
    basic transform like GroupIntoBatches (furthermore - a transform
    that works on most runners today).

    Reuven


    On Thu, May 20, 2021 at 2:11 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        Sounds like you could solve that using second event time
        timer, that would be actually used only to hold the output
        timestamp (watermark hold). Something like

        
eventTimer.withOutputTimestamp(currentBufferMinimalStamp).offset(Duration.ZERO).setRelative()

        when the timer fires, you would only reset the minimum.

        It is sort of ugly, though. It would be cool to have a way to
        get the current timestamp a timer is set to (if any).

         Jan

        On 5/20/21 3:12 AM, Reuven Lax wrote:
        100% - the contract should not change because things are in
        a bundle. IIRC there are some open bugs in Beam around this
        that really should be fixed.

        My issue with GroupIntoBatches is different. This transform
        works as follows:

        if (this is the first element in batch - checked by reading
        a count stored in a ValueState)
         timer.offset(bufferingDuration).setRelative()

        This makes it tricky to use setTimer.withOutputTimestamp.
        Inputs are not guaranteed to be in order, so simply adding a
        withOutputTimestamp would set the timestamp to be whatever
        the first element happened to be; it really should be the
        minimum timestamp of all elements in the buffer. If we
        started setting the timer on every element, then
        timer.offset.setRelative would keep bumping the
        (processing-time) timer into the future and it would never
        expire.

        One solution would be to store the timer timestamp in a
        ValueState, and use Timer.set to set the timer to an
        absolute timestamp. This would allow us to always reset the
        timer to the same expiration target, just modifying the
        output timestamp each time. However, this will break
        DirectRunner tests. The DirectRunner allows the user to
        control the advancement of processing time when using
        TestStream, but this facility doesn't work well if the
        transform sets the processing-time timer using absolute
        set() calls.

        I'm not sure how to solve this using the existing Timer API.

        On Wed, May 19, 2021 at 4:39 PM Robert Bradshaw
        <rober...@google.com <mailto:rober...@google.com>> wrote:

            +1. It was my understanding as well that consensus was
            that timers
            must be delivered in timestamp order, and "within bundle"
            resetting/clearing of timers should be respected (as if
            each timer was
            in its own bundle).

            On Wed, May 19, 2021 at 3:01 PM Kenneth Knowles
            <k...@apache.org <mailto:k...@apache.org>> wrote:
            >
            > Reading over the other thread, there was consensus to
            implement.
            >
            > Reading commentary on the PR, there were good
            questions raised about the semantics. Questions which I
            feel able to have an opinion about :-)
            >
            > The questions surrounded bundling and timers in the
            same bundling clearing each other. Actually the same
            questions apply to timers re-setting later timers and
            +Jan Lukavský has raised this already (among other
            people) so we kind of know the answer now, and I think
            +Boyuan Zhang code was good (from my quick read). What
            has changed is that we have a better idea of the
            contract with the runner. I'm not sure if portability
            makes this more complex. I will share all my thoughts on
            this:
            >
            > I think one key to the Beam model is that bundles are
            for performance and also intended as the unit of
            commitment (until FinishBundle is called, there may be
            unfinished work). They can affect *behavior* (what the
            program does - including what can be observed) ... but
            not *semantics* (what the output means).
            >
            > So, for example, bundling affects how many files are
            written but you are expected to read all the files and
            the number or ordering you must not depend on. The
            behavior is different, but the semantics are the same.
            >
            > When it comes to timers, behavior and semantics are
            very tightly coupled; timers are like a self loop. The
            firing of a timer is a behavior w.r.t. the output of the
            stateful ParDo but it is semantics of the timer itself
            (my definitions don't actually apply so simply so don't
            dig too hard :-). So to get bundling-invariant
            semantics, we should try for bundling-invariant
            behavior. When some clock moves from T to T+D between
            bundles then all the timers in the range [T, T+D) may
            fire so they are delivered in the bundle. I believe we
            have in the prior re-setting issue agreed that timers
            should always be called back in timestamp order.
            Bundling-invariance then implies that earlier timers can
            clear later timers just like they can re-set them. So
            even though a timer is delivered in a bundle, the local
            state of the timer wins. This matches how state works as
            well; no matter how things are bundled, the state you
            read is always whatever was written last.
            >
            > Kenn
            >
            > On Tue, May 11, 2021 at 1:24 PM Siyuan Chen
            <syc...@google.com <mailto:syc...@google.com>> wrote:
            >>
            >>
            >>
            >> On Tue, May 11, 2021 at 11:08 AM Reuven Lax
            <re...@google.com <mailto:re...@google.com>> wrote:
            >>>
            >>>
            >>>
            >>> On Tue, May 11, 2021 at 9:01 AM Kenneth Knowles
            <k...@apache.org <mailto:k...@apache.org>> wrote:
            >>>>
            >>>>
            >>>>
            >>>> On Mon, May 10, 2021 at 7:40 PM Reuven Lax
            <re...@google.com <mailto:re...@google.com>> wrote:
            >>>>>
            >>>>> Hi,
            >>>>>
            >>>>> I've been looking at the implementation of
            GroupIntoBatches (hoping to add support to group based
            on byte size), and I have a few questions about the
            current implementation.
            >>>>>
            >>>>> 1. I noticed that the transform does not preserve
            input timestamps. The output timestamp is not explicitly
            set, so it will be whatever the default output
            timestamp. Confusingly this will depend on which path is
            taken. If the batch is output while processing an
            element, then the timestamp of tha element will be used.
            If the batch is output from the timer, then the
            processing-time value of the timer will be used.
            >>>>
            >>>>
            >>>> Seems like bugs.
            >>>>
            >>>>>
            >>>>>      - Should I start setting outputTimestamp
            explicitly - ideally to the minimum timestamp in the
            current batch of elements?
            >>>>
            >>>>
            >>>> That's a sensible default. Could pass a
            TimestampCombiner if that leads to watermark delays.
            Given the nature of the transform, the simple approach
            will probably be fine.
            >>>
            >>>
            >>> This is a bit tricky to do. in order to update the
            output timestamp, we need to call setTimer again.
            However since we are calling
            timer.offset().setRelative(), this will keep bumping the
            timer into the future and it will never fire.
            >>>
            >>> One solution would be for GIB to just store the
            current timer ts in state, and make sure that keep
            setting the same target until the timer fires. However
            that precludes us from using setRelative (since that
            method does not allow the DoFn to see what target time
            is being set). I think that this approach won't play
            nice with TestStream.advanceProcessingTimeTo.
            >>>
            >>> We could also add a way to update _just_ the output
            time for a timer without resetting the entire timer.
            >>>
            >>>>
            >>>>
            >>>>>      - Should we have the option to preserve all
            element timestamps? this could look something like this:
            >>>>>
            >>>>>  PColllection<KV<K, TimestampedValue<V>>> batches =
            >>>>>
             input.apply(GroupIntoBatches.ofSize(N).withTimestampedValues());
            >>>>
            >>>>
            >>>> This seems useful.
            >>>>
            >>>>>
            >>>>> 2. flushBatch always resets the timer, even after
            the batch is processed. The only reason I can think of
            for doing so is to update the watermark hold.
            TimerInternals contains a deleteTimer method - is there
            any reason we shouldn't simply implement Timer.clear and
            hook it up to the existing deleteTimer?
            >>>>
            >>>>
            >>>> Makes sense to me. Prior thread on this seemed to
            have lightweight consensus:
            
https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E
            
<https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E>
            >>
            >> There was another thread about adding Timer.clear in
            Java:
            
https://lists.apache.org/thread.html/r47f5f14ba9729a2800222440425543ef40bea12eef1f2739d42e75db%40%3Cdev.beam.apache.org%3E
            
<https://lists.apache.org/thread.html/r47f5f14ba9729a2800222440425543ef40bea12eef1f2739d42e75db%40%3Cdev.beam.apache.org%3E>
            >> and a previous attempt in
            https://github.com/apache/beam/pull/12836
            <https://github.com/apache/beam/pull/12836>. Boyuan
            might have more insights into this +Boyuan Zhang
            >>>>
            >>>>
            >>>>>
            >>>>> 3. This transform was implemented before
            OnWindowExpiration was implemented. I think we should
            add a window-expiration callback, and stop setting the
            end-of window timer.
            >>>>
            >>>>
            >>>> +1
            >>>>
            >>>> Kenn

Reply via email to