Hi Reuven,

> I would be cautious changing this. Being able to put multiple timers in the same bundle saves a lot, and if we force them to all run separate through ReduceFnRunner we risk regressing performance of some pipelines.

I understand your point. The issue here is that, the current behavior is at least ... unexpected. There might be one different conceptual approach to that:

 a) if a bundle contains timers for several distinct timestamps (say T1 and T2), then it implies, that timer T1 is effectively not fired at time T1, but at time T2 - that is due to the fact, that logically, the time hopped discretely from some previous time T0 to T2 without any "stopping by". Hence, it should be invalid to setup timer for any time lower than T2.

b) the time will move smoothly (or, millisecond precision smoothly), but that implies, that there cannot be more distinct timers inside single bundle.

If we don't want to take path b), we are probably left with path a) (as doing nothing seems weird, because it breaks one invariant, that time can only move forward). Option a) can be done - we might add something like `getInputWatermark()` and `getOutputWatermark()` to `DoFn.OnTimerContext`, and throw exception when user tries to setup timer for time before input watermark. Effectively, that way we will let the user know, that his timer was set to time T1, but was fired at T2. But, that seems to be breaking change, unfortunately.

What do you think?

Jan

On 6/20/19 5:29 PM, Reuven Lax wrote:


On Thu, Jun 20, 2019 at 3:08 PM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi,

    this problem seems to be harder than I thought. I have a somewhat
    working code in [1], but there are still failing some tests (now
    tests for ReduceFnRunner), but I'm not sure, if the problem is not
    in the tests, so that my current behavior is actually correct. Let
    me explain the problem:

     - let's have a fixed window with allowed lateness of 1 ms

     - let's add two elements into the window (on time), no late elements

     - now, ReduceFnRunner with default trigger will set *two* timers
    - one for window.maxTimestamp() and second for
    window.maxTimestamp() + allowedLateness

     - the previous implementation fired *both* timers at once (within
    single call to ReduceFnRunner#onTimers), but now it fires twice -
    once for the first timer and second for the other


I would be cautious changing this. Being able to put multiple timers in the same bundle saves a lot, and if we force them to all run separate through ReduceFnRunner we risk regressing performance of some pipelines.

     - the result of this is that although in both cases only single
    pane is emitted, in my branch the fired pane doesn't have the
    `isLast` flag set (that is because the window is not yet garbage
    collected - waiting for late data - but the second time it is not
    fired, because no late data arrived)

    Would anyone know what is actually the correct behavior regarding
    the PaneInfo.isLast? I suppose there are only two options - either
    two panes can come with isLast flag (both end-of-window and late),
    or it might be possible, that no pane will marked with this flag
    (because no late pane is fired).

    Jan

     [1] https://github.com/apache/beam/pull/8815


    On 6/10/19 6:26 PM, Jan Lukavský wrote:
    It seems to me that watermark hold cannot change it (currently),
    because in the current implementation timers fire according to
    input watermark, but watermark holds apply to output watermark.
    If I didn't miss anything.

    Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik
    <lc...@google.com> <mailto:lc...@google.com>:

        I see. Is there a missing watermark hold for timers less then T2?

        On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <je...@seznam.cz
        <mailto:je...@seznam.cz>> wrote:

            Yes, there is no difference between GC and user timers in
            this case. I think the problem is simply that when
            watermark moves from time T1 to T2, DirectRunner fires
            all timers that fire until T2, but that can create new
            timers for time between T1 and T2, and these will be
            fired later, although should have been fired before T2.

            Jan

            On 6/10/19 5:48 PM, Kenneth Knowles wrote:

                Reading your Jira, I believe this problem will
                manifest without the interaction of user timers and
                GC. Interesting case. It surrounds whether a runner
                makes a timer available or fires it prior to the
                bundle being committed.

                I have commented elsewhere about this part, quoting
                the Jira:

                > have experimented with this a little and have not
                yet figured out what the correct solution should be.
                What I tried:
                > 1) hold input watermark for min(setup timers)
                > 2) fire timers based not on input watermark, but on
                output watermark (output watermark is held by min
                timer stamp)

                Neither of these quite works. What we need is a
                separate "element input watermark" and "timer input
                watermark". The overall input watermark that drives
                GC is the min of these. The output watermark is also
                held to this overall input watermark. User timers
                fire according to the element input watermark.

                Kenn

                On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik
                <lc...@google.com <mailto:lc...@google.com>> wrote:

                    Jan are you editing the implementation of how
                    timers work within the DirectRunner or are trying
                    to build support for time sorted input on top of
                    the Beam model for timers?
                    Because I think you will need to do the former.

                    On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský
                    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                        Hm, that would probably work, thanks!

                        But, should the timers behave like that? I'm
                        trying to fix tris by introducing a sequence
                        of watermarks

                         inputs watermark -> timer watermark ->
                        output watermark

                        as suggested in the JIRA, and it actually
                        seems to be working as expected. It even
                        cleans some code paths, but I'm debugging
                        some strange behavior this exposed -
                        `WatermarkHold.watermarkHoldTagForTimestampCombiner`
                        seems to have stopped clearing itself after
                        this change and some Pipelines therefore
                        stopped working. I'm little lost why this
                        happened. I can push code I have if anyone
                        interested.

                        Jan

                        On 6/10/19 5:32 PM, Lukasz Cwik wrote:

                            We hit an instance of this problem before
                            and solved it rescheduling the GC timer
                            again if there was a conflicting timer
                            that was also meant to fire.

                            On Mon, Jun 10, 2019 at 8:17 AM Jan
                            Lukavský <je...@seznam.cz
                            <mailto:je...@seznam.cz>> wrote:

                                For a single key. I'm getting into
                                collision of timerId
                                `__StatefulParDoGcTimerId`
                                (StatefulDoFnRunner) and my timerId
                                for flushing sorted elements in
                                implementation of
                                @RequiresTimeSortedInput. The timers
                                are being swapped at the end of input
                                (but it can happen anywhere near end
                                of window), which results in state
                                being cleared before it gets flushed,
                                which means data loss.

                                 Jan

                                On 6/10/19 5:08 PM, Reuven Lax wrote:

                                    Do you mean for a single key or
                                    across keys?

                                    On Mon, Jun 10, 2019, 5:11 AM Jan
                                    Lukavský <je...@seznam.cz
                                    <mailto:je...@seznam.cz>> wrote:

                                        Hi,

                                        I have come across issue [1],
                                        where I'm not sure how to
                                        solve this in
                                        most elegant way.

                                        Any suggestions?

                                        Thanks,

                                          Jan

                                        [1]
                                        
https://issues.apache.org/jira/browse/BEAM-7520



    It seems to me that watermark hold cannot change it (currently), because in 
the current implementation timers fire according to input watermark, but 
watermark holds apply to output watermark. If I didn't miss anything.

    Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik<lc...@google.com>  
<mailto:lc...@google.com>:
    I see. Is there a missing watermark hold for timers less then T2?

    On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský<je...@seznam.cz>  
<mailto:je...@seznam.cz>  wrote:
    Yes, there is no difference between GC and user timers in this case. I 
think the problem is simply that when watermark moves from time T1 to T2, 
DirectRunner fires all timers that fire until T2, but that can create new 
timers for time between T1 and T2, and these will be fired later, although 
should have been fired before T2.

    Jan

    On 6/10/19 5:48 PM, Kenneth Knowles wrote:
    Reading your Jira, I believe this problem will manifest without the 
interaction of user timers and GC. Interesting case. It surrounds whether a 
runner makes a timer available or fires it prior to the bundle being committed.

    I have commented elsewhere about this part, quoting the Jira:

    have experimented with this a little and have not yet figured out what the 
correct solution should be. What I tried:
    1) hold input watermark for min(setup timers)
    2) fire timers based not on input watermark, but on output watermark 
(output watermark is held by min timer stamp)
    Neither of these quite works. What we need is a separate "element input watermark" 
and "timer input watermark". The overall input watermark that drives GC is the min of 
these. The output watermark is also held to this overall input watermark. User timers fire 
according to the element input watermark.

    Kenn

    On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik<lc...@google.com>  
<mailto:lc...@google.com>  wrote:
    Jan are you editing the implementation of how timers work within the 
DirectRunner or are trying to build support for time sorted input on top of the 
Beam model for timers?
    Because I think you will need to do the former.

    On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský<je...@seznam.cz>  
<mailto:je...@seznam.cz>  wrote:
    Hm, that would probably work, thanks!

    But, should the timers behave like that? I'm trying to fix tris by 
introducing a sequence of watermarks

      inputs watermark -> timer watermark -> output watermark

    as suggested in the JIRA, and it actually seems to be working as expected. 
It even cleans some code paths, but I'm debugging some strange behavior this 
exposed - `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have 
stopped clearing itself after this change and some Pipelines therefore stopped 
working. I'm little lost why this happened. I can push code I have if anyone 
interested.

    Jan

    On 6/10/19 5:32 PM, Lukasz Cwik wrote:
    We hit an instance of this problem before and solved it rescheduling the GC 
timer again if there was a conflicting timer that was also meant to fire.

    On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský<je...@seznam.cz>  
<mailto:je...@seznam.cz>  wrote:
    For a single key. I'm getting into collision of timerId 
`__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for flushing 
sorted elements in implementation of @RequiresTimeSortedInput. The timers are 
being swapped at the end of input (but it can happen anywhere near end of 
window), which results in state being cleared before it gets flushed, which 
means data loss.

      Jan

    On 6/10/19 5:08 PM, Reuven Lax wrote:
    Do you mean for a single key or across keys?

    On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský<je...@seznam.cz>  
<mailto:je...@seznam.cz>  wrote:
    Hi,

    I have come across issue [1], where I'm not sure how to solve this in
    most elegant way.

    Any suggestions?

    Thanks,

       Jan

    [1]https://issues.apache.org/jira/browse/BEAM-7520

Reply via email to