Reading your Jira, I believe this problem will manifest without the
interaction of user timers and GC. Interesting case. It surrounds whether a
runner makes a timer available or fires it prior to the bundle being
committed.

I have commented elsewhere about this part, quoting the Jira:

> have experimented with this a little and have not yet figured out what
the correct solution should be. What I tried:
> 1) hold input watermark for min(setup timers)
> 2) fire timers based not on input watermark, but on output watermark
(output watermark is held by min timer stamp)

Neither of these quite works. What we need is a separate "element input
watermark" and "timer input watermark". The overall input watermark that
drives GC is the min of these. The output watermark is also held to this
overall input watermark. User timers fire according to the element input
watermark.

Kenn

On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <[email protected]> wrote:

> Jan are you editing the implementation of how timers work within the
> DirectRunner or are trying to build support for time sorted input on top of
> the Beam model for timers?
> Because I think you will need to do the former.
>
> On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <[email protected]> wrote:
>
>> Hm, that would probably work, thanks!
>>
>> But, should the timers behave like that? I'm trying to fix tris by
>> introducing a sequence of watermarks
>>
>>  inputs watermark -> timer watermark -> output watermark
>>
>> as suggested in the JIRA, and it actually seems to be working as
>> expected. It even cleans some code paths, but I'm debugging some strange
>> behavior this exposed -
>> `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have stopped
>> clearing itself after this change and some Pipelines therefore stopped
>> working. I'm little lost why this happened. I can push code I have if
>> anyone interested.
>>
>> Jan
>> On 6/10/19 5:32 PM, Lukasz Cwik wrote:
>>
>> We hit an instance of this problem before and solved it rescheduling the
>> GC timer again if there was a conflicting timer that was also meant to fire.
>>
>> On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <[email protected]> wrote:
>>
>>> For a single key. I'm getting into collision of timerId
>>> `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for flushing
>>> sorted elements in implementation of @RequiresTimeSortedInput. The timers
>>> are being swapped at the end of input (but it can happen anywhere near end
>>> of window), which results in state being cleared before it gets flushed,
>>> which means data loss.
>>>
>>>  Jan
>>> On 6/10/19 5:08 PM, Reuven Lax wrote:
>>>
>>> Do you mean for a single key or across keys?
>>>
>>> On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <[email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have come across issue [1], where I'm not sure how to solve this in
>>>> most elegant way.
>>>>
>>>> Any suggestions?
>>>>
>>>> Thanks,
>>>>
>>>>   Jan
>>>>
>>>> [1] https://issues.apache.org/jira/browse/BEAM-7520
>>>>
>>>>

Reply via email to