I would think, that runners would actually fire correctly (at least FlinkRunner I think does). The problem is IMHO only in DirectRunner.

Here is draft of PR [1]. As I said, I'm hunting a bug which causes tests not to finish, because watermark is stuck due to watermark hold in GroupAlsoByWindowEvaluator (via ReduceFnRunner).

Jan

[1] https://github.com/apache/beam/pull/8807

On 6/10/19 5:43 PM, Lukasz Cwik wrote:
Jan are you editing the implementation of how timers work within the DirectRunner or are trying to build support for time sorted input on top of the Beam model for timers?
Because I think you will need to do the former.

On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hm, that would probably work, thanks!

    But, should the timers behave like that? I'm trying to fix tris by
    introducing a sequence of watermarks

     inputs watermark -> timer watermark -> output watermark

    as suggested in the JIRA, and it actually seems to be working as
    expected. It even cleans some code paths, but I'm debugging some
    strange behavior this exposed -
    `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have
    stopped clearing itself after this change and some Pipelines
    therefore stopped working. I'm little lost why this happened. I
    can push code I have if anyone interested.

    Jan

    On 6/10/19 5:32 PM, Lukasz Cwik wrote:
    We hit an instance of this problem before and solved it
    rescheduling the GC timer again if there was a conflicting timer
    that was also meant to fire.

    On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        For a single key. I'm getting into collision of timerId
        `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my
        timerId for flushing sorted elements in implementation of
        @RequiresTimeSortedInput. The timers are being swapped at the
        end of input (but it can happen anywhere near end of window),
        which results in state being cleared before it gets flushed,
        which means data loss.

         Jan

        On 6/10/19 5:08 PM, Reuven Lax wrote:
        Do you mean for a single key or across keys?

        On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <je...@seznam.cz
        <mailto:je...@seznam.cz>> wrote:

            Hi,

            I have come across issue [1], where I'm not sure how to
            solve this in
            most elegant way.

            Any suggestions?

            Thanks,

              Jan

            [1] https://issues.apache.org/jira/browse/BEAM-7520

Reply via email to