Hi,

this problem seems to be harder than I thought. I have a somewhat working code in [1], but there are still failing some tests (now tests for ReduceFnRunner), but I'm not sure, if the problem is not in the tests, so that my current behavior is actually correct. Let me explain the problem:

 - let's have a fixed window with allowed lateness of 1 ms

 - let's add two elements into the window (on time), no late elements

 - now, ReduceFnRunner with default trigger will set *two* timers - one for window.maxTimestamp() and second for window.maxTimestamp() + allowedLateness

 - the previous implementation fired *both* timers at once (within single call to ReduceFnRunner#onTimers), but now it fires twice - once for the first timer and second for the other

 - the result of this is that although in both cases only single pane is emitted, in my branch the fired pane doesn't have the `isLast` flag set (that is because the window is not yet garbage collected - waiting for late data - but the second time it is not fired, because no late data arrived)

Would anyone know what is actually the correct behavior regarding the PaneInfo.isLast? I suppose there are only two options - either two panes can come with isLast flag (both end-of-window and late), or it might be possible, that no pane will marked with this flag (because no late pane is fired).

Jan

 [1] https://github.com/apache/beam/pull/8815


On 6/10/19 6:26 PM, Jan Lukavský wrote:
It seems to me that watermark hold cannot change it (currently), because in the current implementation timers fire according to input watermark, but watermark holds apply to output watermark. If I didn't miss anything.

Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <lc...@google.com>:

    I see. Is there a missing watermark hold for timers less then T2?

    On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        Yes, there is no difference between GC and user timers in this
        case. I think the problem is simply that when watermark moves
        from time T1 to T2, DirectRunner fires all timers that fire
        until T2, but that can create new timers for time between T1
        and T2, and these will be fired later, although should have
        been fired before T2.

        Jan

        On 6/10/19 5:48 PM, Kenneth Knowles wrote:

            Reading your Jira, I believe this problem will manifest
            without the interaction of user timers and GC. Interesting
            case. It surrounds whether a runner makes a timer
            available or fires it prior to the bundle being committed.

            I have commented elsewhere about this part, quoting the Jira:

            > have experimented with this a little and have not yet figured
            out what the correct solution should be. What I tried:
            > 1) hold input watermark for min(setup timers)
            > 2) fire timers based not on input watermark, but on output
            watermark (output watermark is held by min timer stamp)

            Neither of these quite works. What we need is a separate
            "element input watermark" and "timer input watermark". The
            overall input watermark that drives GC is the min of
            these. The output watermark is also held to this overall
            input watermark. User timers fire according to the element
            input watermark.

            Kenn

            On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik
            <lc...@google.com <mailto:lc...@google.com>> wrote:

                Jan are you editing the implementation of how timers
                work within the DirectRunner or are trying to build
                support for time sorted input on top of the Beam model
                for timers?
                Because I think you will need to do the former.

                On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský
                <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                    Hm, that would probably work, thanks!

                    But, should the timers behave like that? I'm
                    trying to fix tris by introducing a sequence of
                    watermarks

                     inputs watermark -> timer watermark -> output
                    watermark

                    as suggested in the JIRA, and it actually seems to
                    be working as expected. It even cleans some code
                    paths, but I'm debugging some strange behavior
                    this exposed -
                    `WatermarkHold.watermarkHoldTagForTimestampCombiner`
                    seems to have stopped clearing itself after this
                    change and some Pipelines therefore stopped
                    working. I'm little lost why this happened. I can
                    push code I have if anyone interested.

                    Jan

                    On 6/10/19 5:32 PM, Lukasz Cwik wrote:

                        We hit an instance of this problem before and
                        solved it rescheduling the GC timer again if
                        there was a conflicting timer that was also
                        meant to fire.

                        On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský
                        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                            For a single key. I'm getting into
                            collision of timerId
                            `__StatefulParDoGcTimerId`
                            (StatefulDoFnRunner) and my timerId for
                            flushing sorted elements in implementation
                            of @RequiresTimeSortedInput. The timers
                            are being swapped at the end of input (but
                            it can happen anywhere near end of
                            window), which results in state being
                            cleared before it gets flushed, which
                            means data loss.

                             Jan

                            On 6/10/19 5:08 PM, Reuven Lax wrote:

                                Do you mean for a single key or across
                                keys?

                                On Mon, Jun 10, 2019, 5:11 AM Jan
                                Lukavský <je...@seznam.cz
                                <mailto:je...@seznam.cz>> wrote:

                                    Hi,

                                    I have come across issue [1],
                                    where I'm not sure how to solve
                                    this in
                                    most elegant way.

                                    Any suggestions?

                                    Thanks,

                                      Jan

                                    [1]
                                    
https://issues.apache.org/jira/browse/BEAM-7520



It seems to me that watermark hold cannot change it (currently), because in the 
current implementation timers fire according to input watermark, but watermark 
holds apply to output watermark. If I didn't miss anything.

Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <lc...@google.com>:
I see. Is there a missing watermark hold for timers less then T2?

On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <je...@seznam.cz> wrote:
Yes, there is no difference between GC and user timers in this case. I think 
the problem is simply that when watermark moves from time T1 to T2, 
DirectRunner fires all timers that fire until T2, but that can create new 
timers for time between T1 and T2, and these will be fired later, although 
should have been fired before T2.

Jan

On 6/10/19 5:48 PM, Kenneth Knowles wrote:
Reading your Jira, I believe this problem will manifest without the interaction 
of user timers and GC. Interesting case. It surrounds whether a runner makes a 
timer available or fires it prior to the bundle being committed.

I have commented elsewhere about this part, quoting the Jira:

have experimented with this a little and have not yet figured out what the 
correct solution should be. What I tried:
1) hold input watermark for min(setup timers)
2) fire timers based not on input watermark, but on output watermark (output 
watermark is held by min timer stamp)
Neither of these quite works. What we need is a separate "element input watermark" and 
"timer input watermark". The overall input watermark that drives GC is the min of these. 
The output watermark is also held to this overall input watermark. User timers fire according to 
the element input watermark.

Kenn

On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <lc...@google.com> wrote:
Jan are you editing the implementation of how timers work within the 
DirectRunner or are trying to build support for time sorted input on top of the 
Beam model for timers?
Because I think you will need to do the former.

On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <je...@seznam.cz> wrote:
Hm, that would probably work, thanks!

But, should the timers behave like that? I'm trying to fix tris by introducing 
a sequence of watermarks

  inputs watermark -> timer watermark -> output watermark

as suggested in the JIRA, and it actually seems to be working as expected. It 
even cleans some code paths, but I'm debugging some strange behavior this 
exposed - `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have 
stopped clearing itself after this change and some Pipelines therefore stopped 
working. I'm little lost why this happened. I can push code I have if anyone 
interested.

Jan

On 6/10/19 5:32 PM, Lukasz Cwik wrote:
We hit an instance of this problem before and solved it rescheduling the GC 
timer again if there was a conflicting timer that was also meant to fire.

On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <je...@seznam.cz> wrote:
For a single key. I'm getting into collision of timerId 
`__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for flushing 
sorted elements in implementation of @RequiresTimeSortedInput. The timers are 
being swapped at the end of input (but it can happen anywhere near end of 
window), which results in state being cleared before it gets flushed, which 
means data loss.

  Jan

On 6/10/19 5:08 PM, Reuven Lax wrote:
Do you mean for a single key or across keys?

On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,

I have come across issue [1], where I'm not sure how to solve this in
most elegant way.

Any suggestions?

Thanks,

   Jan

[1] https://issues.apache.org/jira/browse/BEAM-7520

Reply via email to