> But that is exactly how time advances. Watermarks often don't move smoothly, as a single old element can hold up the watermark. When that element is finished, the watermark can jump forward in time, triggering many timers.

Sure. Absolutely agree. But the move from time T1 to T2 can be viewed as discrete jump, or smooth move, so that when you fire timer, any internal timings are set to the actual timestamp of the timer. I believe that is how flink works. And this might be related to the fact that Flink lacks concept of bundles.

> I'm not sure how this breaks that invariant. The input watermark has only moved forward, as should be true fo the output watermark. The output watermark is help up by watermark holds in the step, which usually means that the output watermark is already being help to the earliest pending timer.

The problem was stated at the beginning of this thread. I can restate it:

- let's have four times - T0 < T1 < T2 < T3

- let's have a two timers A and B, set for time T1 and T3, respectively

- watermark moves time from T0 to T3

- that move fires both timers A and B (in this order), *but* timer A is free to set more timers, let's suppose it sets timer for T2

- the second instance of timer A (set for T2) will fire *after* timer B (set for T3), breaking time invariant

Jan

On 6/20/19 8:43 PM, Reuven Lax wrote:


On Thu, Jun 20, 2019 at 8:03 PM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi Reuven,

    > I would be cautious changing this. Being able to put multiple
    timers in the same bundle saves a lot, and if we force them to all
    run separate through ReduceFnRunner we risk regressing performance
    of some pipelines.

    I understand your point. The issue here is that, the current
    behavior is at least ... unexpected. There might be one different
    conceptual approach to that:

     a) if a bundle contains timers for several distinct timestamps
    (say T1 and T2), then it implies, that timer T1 is effectively not
    fired at time T1, but at time T2 - that is due to the fact, that
    logically, the time hopped discretely from some previous time T0
    to T2 without any "stopping by". Hence, it should be invalid to
    setup timer for any time lower than T2.


But that is exactly how time advances. Watermarks often don't move smoothly, as a single old element can hold up the watermark. When that element is finished, the watermark can jump forward in time, triggering many timers.

    b) the time will move smoothly (or, millisecond precision
    smoothly), but that implies, that there cannot be more distinct
    timers inside single bundle.

    If we don't want to take path b), we are probably left with path
    a) (as doing nothing seems weird, because it breaks one invariant,
    that time can only move forward).

I'm not sure how this breaks that invariant. The input watermark has only moved forward, as should be true fo the output watermark. The output watermark is help up by watermark holds in the step, which usually means that the output watermark is already being help to the earliest pending timer.

    Option a) can be done - we might add something like
    `getInputWatermark()` and `getOutputWatermark()` to
    `DoFn.OnTimerContext`, and throw exception when user tries to
    setup timer for time before input watermark. Effectively, that way
    we will let the user know, that his timer was set to time T1, but
    was fired at T2. But, that seems to be breaking change, unfortunately.

    What do you think?

    Jan

    On 6/20/19 5:29 PM, Reuven Lax wrote:


    On Thu, Jun 20, 2019 at 3:08 PM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        Hi,

        this problem seems to be harder than I thought. I have a
        somewhat working code in [1], but there are still failing
        some tests (now tests for ReduceFnRunner), but I'm not sure,
        if the problem is not in the tests, so that my current
        behavior is actually correct. Let me explain the problem:

         - let's have a fixed window with allowed lateness of 1 ms

         - let's add two elements into the window (on time), no late
        elements

         - now, ReduceFnRunner with default trigger will set *two*
        timers - one for window.maxTimestamp() and second for
        window.maxTimestamp() + allowedLateness

         - the previous implementation fired *both* timers at once
        (within single call to ReduceFnRunner#onTimers), but now it
        fires twice - once for the first timer and second for the other


    I would be cautious changing this. Being able to put multiple
    timers in the same bundle saves a lot, and if we force them to
    all run separate through ReduceFnRunner we risk regressing
    performance of some pipelines.

         - the result of this is that although in both cases only
        single pane is emitted, in my branch the fired pane doesn't
        have the `isLast` flag set (that is because the window is not
        yet garbage collected - waiting for late data - but the
        second time it is not fired, because no late data arrived)

        Would anyone know what is actually the correct behavior
        regarding the PaneInfo.isLast? I suppose there are only two
        options - either two panes can come with isLast flag (both
        end-of-window and late), or it might be possible, that no
        pane will marked with this flag (because no late pane is fired).

        Jan

         [1] https://github.com/apache/beam/pull/8815


        On 6/10/19 6:26 PM, Jan Lukavský wrote:
        It seems to me that watermark hold cannot change it
        (currently), because in the current implementation timers
        fire according to input watermark, but watermark holds apply
        to output watermark. If I didn't miss anything.

        Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik
        <lc...@google.com> <mailto:lc...@google.com>:

            I see. Is there a missing watermark hold for timers less
            then T2?

            On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                Yes, there is no difference between GC and user
                timers in this case. I think the problem is simply
                that when watermark moves from time T1 to T2,
                DirectRunner fires all timers that fire until T2,
                but that can create new timers for time between T1
                and T2, and these will be fired later, although
                should have been fired before T2.

                Jan

                On 6/10/19 5:48 PM, Kenneth Knowles wrote:

                    Reading your Jira, I believe this problem will
                    manifest without the interaction of user timers
                    and GC. Interesting case. It surrounds whether a
                    runner makes a timer available or fires it prior
                    to the bundle being committed.

                    I have commented elsewhere about this part,
                    quoting the Jira:

                    > have experimented with this a little and have
                    not yet figured out what the correct solution
                    should be. What I tried:
                    > 1) hold input watermark for min(setup timers)
                    > 2) fire timers based not on input watermark,
                    but on output watermark (output watermark is
                    held by min timer stamp)

                    Neither of these quite works. What we need is a
                    separate "element input watermark" and "timer
                    input watermark". The overall input watermark
                    that drives GC is the min of these. The output
                    watermark is also held to this overall input
                    watermark. User timers fire according to the
                    element input watermark.

                    Kenn

                    On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik
                    <lc...@google.com <mailto:lc...@google.com>> wrote:

                        Jan are you editing the implementation of
                        how timers work within the DirectRunner or
                        are trying to build support for time sorted
                        input on top of the Beam model for timers?
                        Because I think you will need to do the former.

                        On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský
                        <je...@seznam.cz <mailto:je...@seznam.cz>>
                        wrote:

                            Hm, that would probably work, thanks!

                            But, should the timers behave like that?
                            I'm trying to fix tris by introducing a
                            sequence of watermarks

                             inputs watermark -> timer watermark ->
                            output watermark

                            as suggested in the JIRA, and it
                            actually seems to be working as
                            expected. It even cleans some code
                            paths, but I'm debugging some strange
                            behavior this exposed -
                            `WatermarkHold.watermarkHoldTagForTimestampCombiner`
                            seems to have stopped clearing itself
                            after this change and some Pipelines
                            therefore stopped working. I'm little
                            lost why this happened. I can push code
                            I have if anyone interested.

                            Jan

                            On 6/10/19 5:32 PM, Lukasz Cwik wrote:

                                We hit an instance of this problem
                                before and solved it rescheduling
                                the GC timer again if there was a
                                conflicting timer that was also
                                meant to fire.

                                On Mon, Jun 10, 2019 at 8:17 AM Jan
                                Lukavský <je...@seznam.cz
                                <mailto:je...@seznam.cz>> wrote:

                                    For a single key. I'm getting
                                    into collision of timerId
                                    `__StatefulParDoGcTimerId`
                                    (StatefulDoFnRunner) and my
                                    timerId for flushing sorted
                                    elements in implementation of
                                    @RequiresTimeSortedInput. The
                                    timers are being swapped at the
                                    end of input (but it can happen
                                    anywhere near end of window),
                                    which results in state being
                                    cleared before it gets flushed,
                                    which means data loss.

                                     Jan

                                    On 6/10/19 5:08 PM, Reuven Lax
                                    wrote:

                                        Do you mean for a single key
                                        or across keys?

                                        On Mon, Jun 10, 2019, 5:11
                                        AM Jan Lukavský
                                        <je...@seznam.cz
                                        <mailto:je...@seznam.cz>> wrote:

                                            Hi,

                                            I have come across issue
                                            [1], where I'm not sure
                                            how to solve this in
                                            most elegant way.

                                            Any suggestions?

                                            Thanks,

                                              Jan

                                            [1]
                                            
https://issues.apache.org/jira/browse/BEAM-7520



        It seems to me that watermark hold cannot change it (currently), 
because in the current implementation timers fire according to input watermark, 
but watermark holds apply to output watermark. If I didn't miss anything.

        Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik<lc...@google.com>  
<mailto:lc...@google.com>:
        I see. Is there a missing watermark hold for timers less then T2?

        On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský<je...@seznam.cz>  
<mailto:je...@seznam.cz>  wrote:
        Yes, there is no difference between GC and user timers in this case. I 
think the problem is simply that when watermark moves from time T1 to T2, 
DirectRunner fires all timers that fire until T2, but that can create new 
timers for time between T1 and T2, and these will be fired later, although 
should have been fired before T2.

        Jan

        On 6/10/19 5:48 PM, Kenneth Knowles wrote:
        Reading your Jira, I believe this problem will manifest without the 
interaction of user timers and GC. Interesting case. It surrounds whether a 
runner makes a timer available or fires it prior to the bundle being committed.

        I have commented elsewhere about this part, quoting the Jira:

        have experimented with this a little and have not yet figured out what 
the correct solution should be. What I tried:
        1) hold input watermark for min(setup timers)
        2) fire timers based not on input watermark, but on output watermark 
(output watermark is held by min timer stamp)
        Neither of these quite works. What we need is a separate "element input 
watermark" and "timer input watermark". The overall input watermark that drives GC 
is the min of these. The output watermark is also held to this overall input watermark. User timers 
fire according to the element input watermark.

        Kenn

        On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik<lc...@google.com>  
<mailto:lc...@google.com>  wrote:
        Jan are you editing the implementation of how timers work within the 
DirectRunner or are trying to build support for time sorted input on top of the 
Beam model for timers?
        Because I think you will need to do the former.

        On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský<je...@seznam.cz>  
<mailto:je...@seznam.cz>  wrote:
        Hm, that would probably work, thanks!

        But, should the timers behave like that? I'm trying to fix tris by 
introducing a sequence of watermarks

          inputs watermark -> timer watermark -> output watermark

        as suggested in the JIRA, and it actually seems to be working as 
expected. It even cleans some code paths, but I'm debugging some strange 
behavior this exposed - `WatermarkHold.watermarkHoldTagForTimestampCombiner` 
seems to have stopped clearing itself after this change and some Pipelines 
therefore stopped working. I'm little lost why this happened. I can push code I 
have if anyone interested.

        Jan

        On 6/10/19 5:32 PM, Lukasz Cwik wrote:
        We hit an instance of this problem before and solved it rescheduling 
the GC timer again if there was a conflicting timer that was also meant to fire.

        On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský<je...@seznam.cz>  
<mailto:je...@seznam.cz>  wrote:
        For a single key. I'm getting into collision of timerId 
`__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for flushing 
sorted elements in implementation of @RequiresTimeSortedInput. The timers are 
being swapped at the end of input (but it can happen anywhere near end of 
window), which results in state being cleared before it gets flushed, which 
means data loss.

          Jan

        On 6/10/19 5:08 PM, Reuven Lax wrote:
        Do you mean for a single key or across keys?

        On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský<je...@seznam.cz>  
<mailto:je...@seznam.cz>  wrote:
        Hi,

        I have come across issue [1], where I'm not sure how to solve this in
        most elegant way.

        Any suggestions?

        Thanks,

           Jan

        [1]https://issues.apache.org/jira/browse/BEAM-7520

Reply via email to