Jan, I do believe that BEAM-2535 is related since the input time holds the
input watermark and will allow people to set timers which will fire in the
order that they want. This would allow users to say fire at X but I will
only create a new timer at X+Y which would allow the input watermark to
advance to X+Y allowing multiple timers to fire.

On Thu, Jun 20, 2019 at 1:45 PM Jan Lukavský <je...@seznam.cz> wrote:

> I'm not sure if I understand it in detail, but I'd say that it is related.
> Both issues probably have common cause - that is, in the scenario, where
> timers are fired in bundle, the issue is that when watermark moves from
> time T0 to T3 (from the last example), there is actually _no time between
> T0 and T3_, because all times in between are effectively "collapsed" into
> single instant. The "timer watermark" specified in [BEAM-2535] is something
> I also experimented with at first (in DirectRunner), but it then turned
> out, that it is equivalent to firing only timers for lowest timestamp.
> On 6/20/19 9:52 PM, Reuven Lax wrote:
>
> I think BEAM-2535 is independent.
>
> On Thu, Jun 20, 2019 at 9:47 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> Does BEAM-2535 provide more context?
>>
>> On Thu, Jun 20, 2019 at 12:44 PM Reuven Lax <re...@google.com> wrote:
>>
>>>
>>>
>>> On Thu, Jun 20, 2019 at 9:35 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>>
>>>> On 6/20/19 9:30 PM, Reuven Lax wrote:
>>>>
>>>>
>>>>
>>>> On Thu, Jun 20, 2019 at 8:54 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> > But that is exactly how time advances. Watermarks often don't move
>>>>> smoothly, as a single old element can hold up the watermark. When that
>>>>> element is finished, the watermark can jump forward in time, triggering
>>>>> many timers.
>>>>>
>>>>> Sure. Absolutely agree. But the move from time T1 to T2 can be viewed
>>>>> as discrete jump, or smooth move, so that when you fire timer, any 
>>>>> internal
>>>>> timings are set to the actual timestamp of the timer. I believe that is 
>>>>> how
>>>>> flink works. And this might be related to the fact that Flink lacks 
>>>>> concept
>>>>> of bundles.
>>>>>
>>>>> > I'm not sure how this breaks that invariant. The input watermark has
>>>>> only moved forward, as should be true fo the output watermark. The output
>>>>> watermark is help up by watermark holds in the step, which usually means
>>>>> that the output watermark is already being help to the earliest pending
>>>>> timer.
>>>>>
>>>>> The problem was stated at the beginning of this thread. I can restate
>>>>> it:
>>>>>
>>>>> - let's have four times - T0 < T1 < T2 < T3
>>>>>
>>>>> - let's have a two timers A and B, set for time T1 and T3, respectively
>>>>>
>>>>> - watermark moves time from T0 to T3
>>>>>
>>>>> - that move fires both timers A and B (in this order), *but* timer A
>>>>> is free to set more timers, let's suppose it sets timer for T2
>>>>>
>>>>> - the second instance of timer A (set for T2) will fire *after* timer
>>>>> B (set for T3), breaking time invariant
>>>>>
>>>> Ah, by time invariant you mean the in-order firing of timers?]
>>>>
>>>> Yes, sorry, I meant "time monotonicity invariant with relation to
>>>> timers". Basically that timers should be fired in timestamp order, because
>>>> otherwise it might cause unpredictable results.
>>>>
>>>
>>> I think there were similar issues with resetting timers. If you reset a
>>> timer to a different timestamp, but a firing of that timer is already in
>>> the bundle at the old timestamp. I believe that either choice (modify the
>>> bundle or allow the timer to fire) can lead to consistency problems. Kenn
>>> might remember the details here.
>>>
>>>
>>>
>>>>
>>>>
>>>>> Jan
>>>>> On 6/20/19 8:43 PM, Reuven Lax wrote:
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Jun 20, 2019 at 8:03 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> Hi Reuven,
>>>>>>
>>>>>> > I would be cautious changing this. Being able to put multiple
>>>>>> timers in the same bundle saves a lot, and if we force them to all run
>>>>>> separate through ReduceFnRunner we risk regressing performance of some
>>>>>> pipelines.
>>>>>>
>>>>>> I understand your point. The issue here is that, the current behavior
>>>>>> is at least ... unexpected. There might be one different conceptual
>>>>>> approach to that:
>>>>>>
>>>>>>  a) if a bundle contains timers for several distinct timestamps (say
>>>>>> T1 and T2), then it implies, that timer T1 is effectively not fired at 
>>>>>> time
>>>>>> T1, but at time T2 - that is due to the fact, that logically, the time
>>>>>> hopped discretely from some previous time T0 to T2 without any "stopping
>>>>>> by". Hence, it should be invalid to setup timer for any time lower than 
>>>>>> T2
>>>>>> .
>>>>>>
>>>>>
>>>>> But that is exactly how time advances. Watermarks often don't move
>>>>> smoothly, as a single old element can hold up the watermark. When that
>>>>> element is finished, the watermark can jump forward in time, triggering
>>>>> many timers.
>>>>>
>>>>>
>>>>>> b) the time will move smoothly (or, millisecond precision smoothly),
>>>>>> but that implies, that there cannot be more distinct timers inside single
>>>>>> bundle.
>>>>>>
>>>>>> If we don't want to take path b), we are probably left with path a)
>>>>>> (as doing nothing seems weird, because it breaks one invariant, that time
>>>>>> can only move forward).
>>>>>>
>>>>> I'm not sure how this breaks that invariant. The input watermark has
>>>>> only moved forward, as should be true fo the output watermark. The output
>>>>> watermark is help up by watermark holds in the step, which usually means
>>>>> that the output watermark is already being help to the earliest pending
>>>>> timer.
>>>>>
>>>>>
>>>>>> Option a) can be done - we might add something like
>>>>>> `getInputWatermark()` and `getOutputWatermark()` to 
>>>>>> `DoFn.OnTimerContext`,
>>>>>> and throw exception when user tries to setup timer for time before input
>>>>>> watermark. Effectively, that way we will let the user know, that his 
>>>>>> timer
>>>>>> was set to time T1, but was fired at T2. But, that seems to be breaking
>>>>>> change, unfortunately.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Jan
>>>>>> On 6/20/19 5:29 PM, Reuven Lax wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 20, 2019 at 3:08 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> this problem seems to be harder than I thought. I have a somewhat
>>>>>>> working code in [1], but there are still failing some tests (now tests 
>>>>>>> for
>>>>>>> ReduceFnRunner), but I'm not sure, if the problem is not in the tests, 
>>>>>>> so
>>>>>>> that my current behavior is actually correct. Let me explain the 
>>>>>>> problem:
>>>>>>>
>>>>>>>  - let's have a fixed window with allowed lateness of 1 ms
>>>>>>>
>>>>>>>  - let's add two elements into the window (on time), no late elements
>>>>>>>
>>>>>>>  - now, ReduceFnRunner with default trigger will set *two* timers -
>>>>>>> one for window.maxTimestamp() and second for window.maxTimestamp() +
>>>>>>> allowedLateness
>>>>>>>
>>>>>>>  - the previous implementation fired *both* timers at once (within
>>>>>>> single call to ReduceFnRunner#onTimers), but now it fires twice - once 
>>>>>>> for
>>>>>>> the first timer and second for the other
>>>>>>>
>>>>>>
>>>>>> I would be cautious changing this. Being able to put multiple timers
>>>>>> in the same bundle saves a lot, and if we force them to all run separate
>>>>>> through ReduceFnRunner we risk regressing performance of some pipelines.
>>>>>>
>>>>>>
>>>>>>>  - the result of this is that although in both cases only single
>>>>>>> pane is emitted, in my branch the fired pane doesn't have the `isLast` 
>>>>>>> flag
>>>>>>> set (that is because the window is not yet garbage collected - waiting 
>>>>>>> for
>>>>>>> late data - but the second time it is not fired, because no late data
>>>>>>> arrived)
>>>>>>>
>>>>>>> Would anyone know what is actually the correct behavior regarding
>>>>>>> the PaneInfo.isLast? I suppose there are only two options - either two
>>>>>>> panes can come with isLast flag (both end-of-window and late), or it 
>>>>>>> might
>>>>>>> be possible, that no pane will marked with this flag (because no late 
>>>>>>> pane
>>>>>>> is fired).
>>>>>>>
>>>>>>> Jan
>>>>>>>
>>>>>>>  [1] https://github.com/apache/beam/pull/8815
>>>>>>>
>>>>>>>
>>>>>>> On 6/10/19 6:26 PM, Jan Lukavský wrote:
>>>>>>>
>>>>>>> It seems to me that watermark hold cannot change it (currently),
>>>>>>> because in the current implementation timers fire according to input
>>>>>>> watermark, but watermark holds apply to output watermark. If I didn't 
>>>>>>> miss
>>>>>>> anything.
>>>>>>>
>>>>>>> Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <lc...@google.com>
>>>>>>> <lc...@google.com>:
>>>>>>>
>>>>>>> I see. Is there a missing watermark hold for timers less then T2?
>>>>>>>
>>>>>>> On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <je...@seznam.cz>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Yes, there is no difference between GC and user timers in this case.
>>>>>>> I think the problem is simply that when watermark moves from time T1 to 
>>>>>>> T2,
>>>>>>> DirectRunner fires all timers that fire until T2, but that can create 
>>>>>>> new
>>>>>>> timers for time between T1 and T2, and these will be fired later, 
>>>>>>> although
>>>>>>> should have been fired before T2.
>>>>>>>
>>>>>>> Jan
>>>>>>> On 6/10/19 5:48 PM, Kenneth Knowles wrote:
>>>>>>>
>>>>>>> Reading your Jira, I believe this problem will manifest without the
>>>>>>> interaction of user timers and GC. Interesting case. It surrounds 
>>>>>>> whether a
>>>>>>> runner makes a timer available or fires it prior to the bundle being
>>>>>>> committed.
>>>>>>>
>>>>>>> I have commented elsewhere about this part, quoting the Jira:
>>>>>>>
>>>>>>> > have experimented with this a little and have not yet figured out
>>>>>>> what the correct solution should be. What I tried:
>>>>>>> > 1) hold input watermark for min(setup timers)
>>>>>>> > 2) fire timers based not on input watermark, but on output
>>>>>>> watermark (output watermark is held by min timer stamp)
>>>>>>>
>>>>>>> Neither of these quite works. What we need is a separate "element
>>>>>>> input watermark" and "timer input watermark". The overall input 
>>>>>>> watermark
>>>>>>> that drives GC is the min of these. The output watermark is also held to
>>>>>>> this overall input watermark. User timers fire according to the element
>>>>>>> input watermark.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Jan are you editing the implementation of how timers work within the
>>>>>>> DirectRunner or are trying to build support for time sorted input on 
>>>>>>> top of
>>>>>>> the Beam model for timers?
>>>>>>> Because I think you will need to do the former.
>>>>>>>
>>>>>>> On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <je...@seznam.cz>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hm, that would probably work, thanks!
>>>>>>>
>>>>>>> But, should the timers behave like that? I'm trying to fix tris by
>>>>>>> introducing a sequence of watermarks
>>>>>>>
>>>>>>>  inputs watermark -> timer watermark -> output watermark
>>>>>>>
>>>>>>> as suggested in the JIRA, and it actually seems to be working as
>>>>>>> expected. It even cleans some code paths, but I'm debugging some strange
>>>>>>> behavior this exposed -
>>>>>>> `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have 
>>>>>>> stopped
>>>>>>> clearing itself after this change and some Pipelines therefore stopped
>>>>>>> working. I'm little lost why this happened. I can push code I have if
>>>>>>> anyone interested.
>>>>>>>
>>>>>>> Jan
>>>>>>> On 6/10/19 5:32 PM, Lukasz Cwik wrote:
>>>>>>>
>>>>>>> We hit an instance of this problem before and solved it rescheduling
>>>>>>> the GC timer again if there was a conflicting timer that was also meant 
>>>>>>> to
>>>>>>> fire.
>>>>>>>
>>>>>>> On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <je...@seznam.cz>
>>>>>>> wrote:
>>>>>>>
>>>>>>> For a single key. I'm getting into collision of timerId
>>>>>>> `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for 
>>>>>>> flushing
>>>>>>> sorted elements in implementation of @RequiresTimeSortedInput. The 
>>>>>>> timers
>>>>>>> are being swapped at the end of input (but it can happen anywhere near 
>>>>>>> end
>>>>>>> of window), which results in state being cleared before it gets flushed,
>>>>>>> which means data loss.
>>>>>>>
>>>>>>>  Jan
>>>>>>> On 6/10/19 5:08 PM, Reuven Lax wrote:
>>>>>>>
>>>>>>> Do you mean for a single key or across keys?
>>>>>>>
>>>>>>> On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have come across issue [1], where I'm not sure how to solve this
>>>>>>> in
>>>>>>> most elegant way.
>>>>>>>
>>>>>>> Any suggestions?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>>   Jan
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-7520
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> It seems to me that watermark hold cannot change it (currently), 
>>>>>>> because in the current implementation timers fire according to input 
>>>>>>> watermark, but watermark holds apply to output watermark. If I didn't 
>>>>>>> miss anything.
>>>>>>>
>>>>>>> Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <lc...@google.com> 
>>>>>>> <lc...@google.com>:
>>>>>>>
>>>>>>> I see. Is there a missing watermark hold for timers less then T2?
>>>>>>>
>>>>>>> On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <je...@seznam.cz> 
>>>>>>> <je...@seznam.cz> wrote:
>>>>>>>
>>>>>>> Yes, there is no difference between GC and user timers in this case. I 
>>>>>>> think the problem is simply that when watermark moves from time T1 to 
>>>>>>> T2, DirectRunner fires all timers that fire until T2, but that can 
>>>>>>> create new timers for time between T1 and T2, and these will be fired 
>>>>>>> later, although should have been fired before T2.
>>>>>>>
>>>>>>> Jan
>>>>>>>
>>>>>>> On 6/10/19 5:48 PM, Kenneth Knowles wrote:
>>>>>>>
>>>>>>> Reading your Jira, I believe this problem will manifest without the 
>>>>>>> interaction of user timers and GC. Interesting case. It surrounds 
>>>>>>> whether a runner makes a timer available or fires it prior to the 
>>>>>>> bundle being committed.
>>>>>>>
>>>>>>> I have commented elsewhere about this part, quoting the Jira:
>>>>>>>
>>>>>>>
>>>>>>> have experimented with this a little and have not yet figured out what 
>>>>>>> the correct solution should be. What I tried:
>>>>>>> 1) hold input watermark for min(setup timers)
>>>>>>> 2) fire timers based not on input watermark, but on output watermark 
>>>>>>> (output watermark is held by min timer stamp)
>>>>>>>
>>>>>>> Neither of these quite works. What we need is a separate "element input 
>>>>>>> watermark" and "timer input watermark". The overall input watermark 
>>>>>>> that drives GC is the min of these. The output watermark is also held 
>>>>>>> to this overall input watermark. User timers fire according to the 
>>>>>>> element input watermark.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <lc...@google.com> 
>>>>>>> <lc...@google.com> wrote:
>>>>>>>
>>>>>>> Jan are you editing the implementation of how timers work within the 
>>>>>>> DirectRunner or are trying to build support for time sorted input on 
>>>>>>> top of the Beam model for timers?
>>>>>>> Because I think you will need to do the former.
>>>>>>>
>>>>>>> On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <je...@seznam.cz> 
>>>>>>> <je...@seznam.cz> wrote:
>>>>>>>
>>>>>>> Hm, that would probably work, thanks!
>>>>>>>
>>>>>>> But, should the timers behave like that? I'm trying to fix tris by 
>>>>>>> introducing a sequence of watermarks
>>>>>>>
>>>>>>>  inputs watermark -> timer watermark -> output watermark
>>>>>>>
>>>>>>> as suggested in the JIRA, and it actually seems to be working as 
>>>>>>> expected. It even cleans some code paths, but I'm debugging some 
>>>>>>> strange behavior this exposed - 
>>>>>>> `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have 
>>>>>>> stopped clearing itself after this change and some Pipelines therefore 
>>>>>>> stopped working. I'm little lost why this happened. I can push code I 
>>>>>>> have if anyone interested.
>>>>>>>
>>>>>>> Jan
>>>>>>>
>>>>>>> On 6/10/19 5:32 PM, Lukasz Cwik wrote:
>>>>>>>
>>>>>>> We hit an instance of this problem before and solved it rescheduling 
>>>>>>> the GC timer again if there was a conflicting timer that was also meant 
>>>>>>> to fire.
>>>>>>>
>>>>>>> On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <je...@seznam.cz> 
>>>>>>> <je...@seznam.cz> wrote:
>>>>>>>
>>>>>>> For a single key. I'm getting into collision of timerId 
>>>>>>> `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for 
>>>>>>> flushing sorted elements in implementation of @RequiresTimeSortedInput. 
>>>>>>> The timers are being swapped at the end of input (but it can happen 
>>>>>>> anywhere near end of window), which results in state being cleared 
>>>>>>> before it gets flushed, which means data loss.
>>>>>>>
>>>>>>>  Jan
>>>>>>>
>>>>>>> On 6/10/19 5:08 PM, Reuven Lax wrote:
>>>>>>>
>>>>>>> Do you mean for a single key or across keys?
>>>>>>>
>>>>>>> On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <je...@seznam.cz> 
>>>>>>> <je...@seznam.cz> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I have come across issue [1], where I'm not sure how to solve this in
>>>>>>> most elegant way.
>>>>>>>
>>>>>>> Any suggestions?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>>   Jan
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-7520
>>>>>>>
>>>>>>>

Reply via email to