I'm not sure if I understand it in detail, but I'd say that it is related. Both issues probably have common cause - that is, in the scenario, where timers are fired in bundle, the issue is that when watermark moves from time T0 to T3 (from the last example), there is actually _no time between T0 and T3_, because all times in between are effectively "collapsed" into single instant. The "timer watermark" specified in [BEAM-2535] is something I also experimented with at first (in DirectRunner), but it then turned out, that it is equivalent to firing only timers for lowest timestamp.

On 6/20/19 9:52 PM, Reuven Lax wrote:
I think BEAM-2535 is independent.

On Thu, Jun 20, 2019 at 9:47 PM Lukasz Cwik <lc...@google.com <mailto:lc...@google.com>> wrote:

    Does BEAM-2535 provide more context?

    On Thu, Jun 20, 2019 at 12:44 PM Reuven Lax <re...@google.com
    <mailto:re...@google.com>> wrote:



        On Thu, Jun 20, 2019 at 9:35 PM Jan Lukavský <je...@seznam.cz
        <mailto:je...@seznam.cz>> wrote:


            On 6/20/19 9:30 PM, Reuven Lax wrote:


            On Thu, Jun 20, 2019 at 8:54 PM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                > But that is exactly how time advances. Watermarks
                often don't move smoothly, as a single old element
                can hold up the watermark. When that element is
                finished, the watermark can jump forward in time,
                triggering many timers.

                Sure. Absolutely agree. But the move from time T1 to
                T2 can be viewed as discrete jump, or smooth move, so
                that when you fire timer, any internal timings are
                set to the actual timestamp of the timer. I believe
                that is how flink works. And this might be related to
                the fact that Flink lacks concept of bundles.

                > I'm not sure how this breaks that invariant. The
                input watermark has only moved forward, as should be
                true fo the output watermark. The output watermark is
                help up by watermark holds in the step, which usually
                means that the output watermark is already being help
                to the earliest pending timer.

                The problem was stated at the beginning of this
                thread. I can restate it:

                - let's have four times - T0 < T1 < T2 < T3

                - let's have a two timers A and B, set for time T1
                and T3, respectively

                - watermark moves time from T0 to T3

                - that move fires both timers A and B (in this
                order), *but* timer A is free to set more timers,
                let's suppose it sets timer for T2

                - the second instance of timer A (set for T2) will
                fire *after* timer B (set for T3), breaking time
                invariant

            Ah, by time invariant you mean the in-order firing of
            timers?]
            Yes, sorry, I meant "time monotonicity invariant with
            relation to timers". Basically that timers should be fired
            in timestamp order, because otherwise it might cause
            unpredictable results.


        I think there were similar issues with resetting timers. If
        you reset a timer to a different timestamp, but a firing of
        that timer is already in the bundle at the old timestamp. I
        believe that either choice (modify the bundle or allow the
        timer to fire) can lead to consistency problems. Kenn might
        remember the details here.


                Jan

                On 6/20/19 8:43 PM, Reuven Lax wrote:


                On Thu, Jun 20, 2019 at 8:03 PM Jan Lukavský
                <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                    Hi Reuven,

                    > I would be cautious changing this. Being able
                    to put multiple timers in the same bundle saves
                    a lot, and if we force them to all run separate
                    through ReduceFnRunner we risk regressing
                    performance of some pipelines.

                    I understand your point. The issue here is that,
                    the current behavior is at least ... unexpected.
                    There might be one different conceptual approach
                    to that:

                     a) if a bundle contains timers for several
                    distinct timestamps (say T1 and T2), then it
                    implies, that timer T1 is effectively not fired
                    at time T1, but at time T2 - that is due to the
                    fact, that logically, the time hopped discretely
                    from some previous time T0 to T2 without any
                    "stopping by". Hence, it should be invalid to
                    setup timer for any time lower than T2.


                But that is exactly how time advances. Watermarks
                often don't move smoothly, as a single old element
                can hold up the watermark. When that element is
                finished, the watermark can jump forward in time,
                triggering many timers.

                    b) the time will move smoothly (or, millisecond
                    precision smoothly), but that implies, that
                    there cannot be more distinct timers inside
                    single bundle.

                    If we don't want to take path b), we are
                    probably left with path a) (as doing nothing
                    seems weird, because it breaks one invariant,
                    that time can only move forward).

                I'm not sure how this breaks that invariant. The
                input watermark has only moved forward, as should be
                true fo the output watermark. The output watermark
                is help up by watermark holds in the step, which
                usually means that the output watermark is already
                being help to the earliest pending timer.

                    Option a) can be done - we might add something
                    like `getInputWatermark()` and
                    `getOutputWatermark()` to `DoFn.OnTimerContext`,
                    and throw exception when user tries to setup
                    timer for time before input watermark.
                    Effectively, that way we will let the user know,
                    that his timer was set to time T1, but was fired
                    at T2. But, that seems to be breaking change,
                    unfortunately.

                    What do you think?

                    Jan

                    On 6/20/19 5:29 PM, Reuven Lax wrote:


                    On Thu, Jun 20, 2019 at 3:08 PM Jan Lukavský
                    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                        Hi,

                        this problem seems to be harder than I
                        thought. I have a somewhat working code in
                        [1], but there are still failing some tests
                        (now tests for ReduceFnRunner), but I'm not
                        sure, if the problem is not in the tests,
                        so that my current behavior is actually
                        correct. Let me explain the problem:

                         - let's have a fixed window with allowed
                        lateness of 1 ms

                         - let's add two elements into the window
                        (on time), no late elements

                         - now, ReduceFnRunner with default trigger
                        will set *two* timers - one for
                        window.maxTimestamp() and second for
                        window.maxTimestamp() + allowedLateness

                         - the previous implementation fired *both*
                        timers at once (within single call to
                        ReduceFnRunner#onTimers), but now it fires
                        twice - once for the first timer and second
                        for the other


                    I would be cautious changing this. Being able
                    to put multiple timers in the same bundle saves
                    a lot, and if we force them to all run separate
                    through ReduceFnRunner we risk regressing
                    performance of some pipelines.

                         - the result of this is that although in
                        both cases only single pane is emitted, in
                        my branch the fired pane doesn't have the
                        `isLast` flag set (that is because the
                        window is not yet garbage collected -
                        waiting for late data - but the second time
                        it is not fired, because no late data arrived)

                        Would anyone know what is actually the
                        correct behavior regarding the
                        PaneInfo.isLast? I suppose there are only
                        two options - either two panes can come
                        with isLast flag (both end-of-window and
                        late), or it might be possible, that no
                        pane will marked with this flag (because no
                        late pane is fired).

                        Jan

                         [1] https://github.com/apache/beam/pull/8815


                        On 6/10/19 6:26 PM, Jan Lukavský wrote:
                        It seems to me that watermark hold cannot
                        change it (currently), because in the
                        current implementation timers fire
                        according to input watermark, but
                        watermark holds apply to output watermark.
                        If I didn't miss anything.

                        Dne 10. 6. 2019 18:15 napsal uživatel
                        Lukasz Cwik <lc...@google.com>
                        <mailto:lc...@google.com>:

                            I see. Is there a missing watermark
                            hold for timers less then T2?

                            On Mon, Jun 10, 2019 at 9:08 AM Jan
                            Lukavský <je...@seznam.cz
                            <mailto:je...@seznam.cz>> wrote:

                                Yes, there is no difference
                                between GC and user timers in this
                                case. I think the problem is
                                simply that when watermark moves
                                from time T1 to T2, DirectRunner
                                fires all timers that fire until
                                T2, but that can create new timers
                                for time between T1 and T2, and
                                these will be fired later,
                                although should have been fired
                                before T2.

                                Jan

                                On 6/10/19 5:48 PM, Kenneth
                                Knowles wrote:

                                    Reading your Jira, I believe
                                    this problem will manifest
                                    without the interaction of
                                    user timers and GC.
                                    Interesting case. It surrounds
                                    whether a runner makes a timer
                                    available or fires it prior to
                                    the bundle being committed.

                                    I have commented elsewhere
                                    about this part, quoting the Jira:

                                    > have experimented with this a
                                    little and have not yet
                                    figured out what the correct
                                    solution should be. What I tried:
                                    > 1) hold input watermark for
                                    min(setup timers)
                                    > 2) fire timers based not on
                                    input watermark, but on output
                                    watermark (output watermark is
                                    held by min timer stamp)

                                    Neither of these quite works.
                                    What we need is a separate
                                    "element input watermark" and
                                    "timer input watermark". The
                                    overall input watermark that
                                    drives GC is the min of these.
                                    The output watermark is also
                                    held to this overall input
                                    watermark. User timers fire
                                    according to the element input
                                    watermark.

                                    Kenn

                                    On Mon, Jun 10, 2019 at 8:44
                                    AM Lukasz Cwik
                                    <lc...@google.com
                                    <mailto:lc...@google.com>> wrote:

                                        Jan are you editing the
                                        implementation of how
                                        timers work within the
                                        DirectRunner or are trying
                                        to build support for time
                                        sorted input on top of the
                                        Beam model for timers?
                                        Because I think you will
                                        need to do the former.

                                        On Mon, Jun 10, 2019 at
                                        8:41 AM Jan Lukavský
                                        <je...@seznam.cz
                                        <mailto:je...@seznam.cz>>
                                        wrote:

                                            Hm, that would
                                            probably work, thanks!

                                            But, should the timers
                                            behave like that? I'm
                                            trying to fix tris by
                                            introducing a sequence
                                            of watermarks

                                             inputs watermark ->
                                            timer watermark ->
                                            output watermark

                                            as suggested in the
                                            JIRA, and it actually
                                            seems to be working as
                                            expected. It even
                                            cleans some code
                                            paths, but I'm
                                            debugging some strange
                                            behavior this exposed
                                            -
                                            
`WatermarkHold.watermarkHoldTagForTimestampCombiner`
                                            seems to have stopped
                                            clearing itself after
                                            this change and some
                                            Pipelines therefore
                                            stopped working. I'm
                                            little lost why this
                                            happened. I can push
                                            code I have if anyone
                                            interested.

                                            Jan

                                            On 6/10/19 5:32 PM,
                                            Lukasz Cwik wrote:

                                                We hit an instance
                                                of this problem
                                                before and solved
                                                it rescheduling
                                                the GC timer again
                                                if there was a
                                                conflicting timer
                                                that was also
                                                meant to fire.

                                                On Mon, Jun 10,
                                                2019 at 8:17 AM
                                                Jan Lukavský
                                                <je...@seznam.cz
                                                <mailto:je...@seznam.cz>>
                                                wrote:

                                                    For a single
                                                    key. I'm
                                                    getting into
                                                    collision of
                                                    timerId
                                                    `__StatefulParDoGcTimerId`
                                                    (StatefulDoFnRunner)
                                                    and my timerId
                                                    for flushing
                                                    sorted
                                                    elements in
                                                    implementation
                                                    of
                                                    @RequiresTimeSortedInput.
                                                    The timers are
                                                    being swapped
                                                    at the end of
                                                    input (but it
                                                    can happen
                                                    anywhere near
                                                    end of
                                                    window), which
                                                    results in
                                                    state being
                                                    cleared before
                                                    it gets
                                                    flushed, which
                                                    means data loss.

                                                     Jan

                                                    On 6/10/19
                                                    5:08 PM,
                                                    Reuven Lax wrote:

                                                        Do you
                                                        mean for a
                                                        single key
                                                        or across
                                                        keys?

                                                        On Mon,
                                                        Jun 10,
                                                        2019, 5:11
                                                        AM Jan
                                                        Lukavský
                                                        <je...@seznam.cz
                                                        
<mailto:je...@seznam.cz>>
                                                        wrote:

                                                            Hi,

                                                            I have
                                                            come
                                                            across
                                                            issue
                                                            [1],
                                                            where
                                                            I'm
                                                            not
                                                            sure
                                                            how to
                                                            solve
                                                            this in
                                                            most
                                                            elegant
                                                            way.

                                                            Any
                                                            suggestions?

                                                            Thanks,

                                                              Jan

                                                            [1]
                                                            
https://issues.apache.org/jira/browse/BEAM-7520



                        It seems to me that watermark hold cannot change it 
(currently), because in the current implementation timers fire according to 
input watermark, but watermark holds apply to output watermark. If I didn't 
miss anything.

                        Dne 10. 6. 2019 18:15 napsal uživatel Lukasz 
Cwik<lc...@google.com>  <mailto:lc...@google.com>:
                        I see. Is there a missing watermark hold for timers 
less then T2?

                        On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský<je...@seznam.cz> 
 <mailto:je...@seznam.cz>  wrote:
                        Yes, there is no difference between GC and user timers 
in this case. I think the problem is simply that when watermark moves from time 
T1 to T2, DirectRunner fires all timers that fire until T2, but that can create 
new timers for time between T1 and T2, and these will be fired later, although 
should have been fired before T2.

                        Jan

                        On 6/10/19 5:48 PM, Kenneth Knowles wrote:
                        Reading your Jira, I believe this problem will manifest 
without the interaction of user timers and GC. Interesting case. It surrounds 
whether a runner makes a timer available or fires it prior to the bundle being 
committed.

                        I have commented elsewhere about this part, quoting the 
Jira:

                        have experimented with this a little and have not yet 
figured out what the correct solution should be. What I tried:
                        1) hold input watermark for min(setup timers)
                        2) fire timers based not on input watermark, but on 
output watermark (output watermark is held by min timer stamp)
                        Neither of these quite works. What we need is a separate "element 
input watermark" and "timer input watermark". The overall input watermark that 
drives GC is the min of these. The output watermark is also held to this overall input watermark. 
User timers fire according to the element input watermark.

                        Kenn

                        On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik<lc...@google.com> 
 <mailto:lc...@google.com>  wrote:
                        Jan are you editing the implementation of how timers 
work within the DirectRunner or are trying to build support for time sorted 
input on top of the Beam model for timers?
                        Because I think you will need to do the former.

                        On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský<je...@seznam.cz> 
 <mailto:je...@seznam.cz>  wrote:
                        Hm, that would probably work, thanks!

                        But, should the timers behave like that? I'm trying to 
fix tris by introducing a sequence of watermarks

                          inputs watermark -> timer watermark -> output 
watermark

                        as suggested in the JIRA, and it actually seems to be 
working as expected. It even cleans some code paths, but I'm debugging some 
strange behavior this exposed - 
`WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have stopped 
clearing itself after this change and some Pipelines therefore stopped working. 
I'm little lost why this happened. I can push code I have if anyone interested.

                        Jan

                        On 6/10/19 5:32 PM, Lukasz Cwik wrote:
                        We hit an instance of this problem before and solved it 
rescheduling the GC timer again if there was a conflicting timer that was also 
meant to fire.

                        On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský<je...@seznam.cz> 
 <mailto:je...@seznam.cz>  wrote:
                        For a single key. I'm getting into collision of timerId 
`__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for flushing 
sorted elements in implementation of @RequiresTimeSortedInput. The timers are 
being swapped at the end of input (but it can happen anywhere near end of 
window), which results in state being cleared before it gets flushed, which 
means data loss.

                          Jan

                        On 6/10/19 5:08 PM, Reuven Lax wrote:
                        Do you mean for a single key or across keys?

                        On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský<je...@seznam.cz>  
<mailto:je...@seznam.cz>  wrote:
                        Hi,

                        I have come across issue [1], where I'm not sure how to 
solve this in
                        most elegant way.

                        Any suggestions?

                        Thanks,

                           Jan

                        [1]https://issues.apache.org/jira/browse/BEAM-7520

Reply via email to