On Thu, Jun 20, 2019 at 8:54 PM Jan Lukavský <je...@seznam.cz> wrote:

> > But that is exactly how time advances. Watermarks often don't move
> smoothly, as a single old element can hold up the watermark. When that
> element is finished, the watermark can jump forward in time, triggering
> many timers.
>
> Sure. Absolutely agree. But the move from time T1 to T2 can be viewed as
> discrete jump, or smooth move, so that when you fire timer, any internal
> timings are set to the actual timestamp of the timer. I believe that is how
> flink works. And this might be related to the fact that Flink lacks concept
> of bundles.
>
> > I'm not sure how this breaks that invariant. The input watermark has
> only moved forward, as should be true fo the output watermark. The output
> watermark is help up by watermark holds in the step, which usually means
> that the output watermark is already being help to the earliest pending
> timer.
>
> The problem was stated at the beginning of this thread. I can restate it:
>
> - let's have four times - T0 < T1 < T2 < T3
>
> - let's have a two timers A and B, set for time T1 and T3, respectively
>
> - watermark moves time from T0 to T3
>
> - that move fires both timers A and B (in this order), *but* timer A is
> free to set more timers, let's suppose it sets timer for T2
>
> - the second instance of timer A (set for T2) will fire *after* timer B
> (set for T3), breaking time invariant
>
Ah, by time invariant you mean the in-order firing of timers?


> Jan
> On 6/20/19 8:43 PM, Reuven Lax wrote:
>
>
>
> On Thu, Jun 20, 2019 at 8:03 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Reuven,
>>
>> > I would be cautious changing this. Being able to put multiple timers in
>> the same bundle saves a lot, and if we force them to all run separate
>> through ReduceFnRunner we risk regressing performance of some pipelines.
>>
>> I understand your point. The issue here is that, the current behavior is
>> at least ... unexpected. There might be one different conceptual approach
>> to that:
>>
>>  a) if a bundle contains timers for several distinct timestamps (say T1
>> and T2), then it implies, that timer T1 is effectively not fired at time
>> T1, but at time T2 - that is due to the fact, that logically, the time
>> hopped discretely from some previous time T0 to T2 without any "stopping
>> by". Hence, it should be invalid to setup timer for any time lower than T2
>> .
>>
>
> But that is exactly how time advances. Watermarks often don't move
> smoothly, as a single old element can hold up the watermark. When that
> element is finished, the watermark can jump forward in time, triggering
> many timers.
>
>
>> b) the time will move smoothly (or, millisecond precision smoothly), but
>> that implies, that there cannot be more distinct timers inside single
>> bundle.
>>
>> If we don't want to take path b), we are probably left with path a) (as
>> doing nothing seems weird, because it breaks one invariant, that time can
>> only move forward).
>>
> I'm not sure how this breaks that invariant. The input watermark has only
> moved forward, as should be true fo the output watermark. The output
> watermark is help up by watermark holds in the step, which usually means
> that the output watermark is already being help to the earliest pending
> timer.
>
>
>> Option a) can be done - we might add something like `getInputWatermark()`
>> and `getOutputWatermark()` to `DoFn.OnTimerContext`, and throw exception
>> when user tries to setup timer for time before input watermark.
>> Effectively, that way we will let the user know, that his timer was set to
>> time T1, but was fired at T2. But, that seems to be breaking change,
>> unfortunately.
>>
>> What do you think?
>>
>> Jan
>> On 6/20/19 5:29 PM, Reuven Lax wrote:
>>
>>
>>
>> On Thu, Jun 20, 2019 at 3:08 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi,
>>>
>>> this problem seems to be harder than I thought. I have a somewhat
>>> working code in [1], but there are still failing some tests (now tests for
>>> ReduceFnRunner), but I'm not sure, if the problem is not in the tests, so
>>> that my current behavior is actually correct. Let me explain the problem:
>>>
>>>  - let's have a fixed window with allowed lateness of 1 ms
>>>
>>>  - let's add two elements into the window (on time), no late elements
>>>
>>>  - now, ReduceFnRunner with default trigger will set *two* timers - one
>>> for window.maxTimestamp() and second for window.maxTimestamp() +
>>> allowedLateness
>>>
>>>  - the previous implementation fired *both* timers at once (within
>>> single call to ReduceFnRunner#onTimers), but now it fires twice - once for
>>> the first timer and second for the other
>>>
>>
>> I would be cautious changing this. Being able to put multiple timers in
>> the same bundle saves a lot, and if we force them to all run separate
>> through ReduceFnRunner we risk regressing performance of some pipelines.
>>
>>
>>>  - the result of this is that although in both cases only single pane is
>>> emitted, in my branch the fired pane doesn't have the `isLast` flag set
>>> (that is because the window is not yet garbage collected - waiting for late
>>> data - but the second time it is not fired, because no late data arrived)
>>>
>>> Would anyone know what is actually the correct behavior regarding the
>>> PaneInfo.isLast? I suppose there are only two options - either two panes
>>> can come with isLast flag (both end-of-window and late), or it might be
>>> possible, that no pane will marked with this flag (because no late pane is
>>> fired).
>>>
>>> Jan
>>>
>>>  [1] https://github.com/apache/beam/pull/8815
>>>
>>>
>>> On 6/10/19 6:26 PM, Jan Lukavský wrote:
>>>
>>> It seems to me that watermark hold cannot change it (currently), because
>>> in the current implementation timers fire according to input watermark, but
>>> watermark holds apply to output watermark. If I didn't miss anything.
>>>
>>> Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <lc...@google.com>
>>> <lc...@google.com>:
>>>
>>> I see. Is there a missing watermark hold for timers less then T2?
>>>
>>> On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>> Yes, there is no difference between GC and user timers in this case. I
>>> think the problem is simply that when watermark moves from time T1 to T2,
>>> DirectRunner fires all timers that fire until T2, but that can create new
>>> timers for time between T1 and T2, and these will be fired later, although
>>> should have been fired before T2.
>>>
>>> Jan
>>> On 6/10/19 5:48 PM, Kenneth Knowles wrote:
>>>
>>> Reading your Jira, I believe this problem will manifest without the
>>> interaction of user timers and GC. Interesting case. It surrounds whether a
>>> runner makes a timer available or fires it prior to the bundle being
>>> committed.
>>>
>>> I have commented elsewhere about this part, quoting the Jira:
>>>
>>> > have experimented with this a little and have not yet figured out what
>>> the correct solution should be. What I tried:
>>> > 1) hold input watermark for min(setup timers)
>>> > 2) fire timers based not on input watermark, but on output watermark
>>> (output watermark is held by min timer stamp)
>>>
>>> Neither of these quite works. What we need is a separate "element input
>>> watermark" and "timer input watermark". The overall input watermark that
>>> drives GC is the min of these. The output watermark is also held to this
>>> overall input watermark. User timers fire according to the element input
>>> watermark.
>>>
>>> Kenn
>>>
>>> On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>> Jan are you editing the implementation of how timers work within the
>>> DirectRunner or are trying to build support for time sorted input on top of
>>> the Beam model for timers?
>>> Because I think you will need to do the former.
>>>
>>> On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>> Hm, that would probably work, thanks!
>>>
>>> But, should the timers behave like that? I'm trying to fix tris by
>>> introducing a sequence of watermarks
>>>
>>>  inputs watermark -> timer watermark -> output watermark
>>>
>>> as suggested in the JIRA, and it actually seems to be working as
>>> expected. It even cleans some code paths, but I'm debugging some strange
>>> behavior this exposed -
>>> `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have stopped
>>> clearing itself after this change and some Pipelines therefore stopped
>>> working. I'm little lost why this happened. I can push code I have if
>>> anyone interested.
>>>
>>> Jan
>>> On 6/10/19 5:32 PM, Lukasz Cwik wrote:
>>>
>>> We hit an instance of this problem before and solved it rescheduling the
>>> GC timer again if there was a conflicting timer that was also meant to fire.
>>>
>>> On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>> For a single key. I'm getting into collision of timerId
>>> `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for flushing
>>> sorted elements in implementation of @RequiresTimeSortedInput. The timers
>>> are being swapped at the end of input (but it can happen anywhere near end
>>> of window), which results in state being cleared before it gets flushed,
>>> which means data loss.
>>>
>>>  Jan
>>> On 6/10/19 5:08 PM, Reuven Lax wrote:
>>>
>>> Do you mean for a single key or across keys?
>>>
>>> On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>> Hi,
>>>
>>> I have come across issue [1], where I'm not sure how to solve this in
>>> most elegant way.
>>>
>>> Any suggestions?
>>>
>>> Thanks,
>>>
>>>   Jan
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-7520
>>>
>>>
>>>
>>> It seems to me that watermark hold cannot change it (currently), because in 
>>> the current implementation timers fire according to input watermark, but 
>>> watermark holds apply to output watermark. If I didn't miss anything.
>>>
>>> Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <lc...@google.com> 
>>> <lc...@google.com>:
>>>
>>> I see. Is there a missing watermark hold for timers less then T2?
>>>
>>> On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <je...@seznam.cz> 
>>> <je...@seznam.cz> wrote:
>>>
>>> Yes, there is no difference between GC and user timers in this case. I 
>>> think the problem is simply that when watermark moves from time T1 to T2, 
>>> DirectRunner fires all timers that fire until T2, but that can create new 
>>> timers for time between T1 and T2, and these will be fired later, although 
>>> should have been fired before T2.
>>>
>>> Jan
>>>
>>> On 6/10/19 5:48 PM, Kenneth Knowles wrote:
>>>
>>> Reading your Jira, I believe this problem will manifest without the 
>>> interaction of user timers and GC. Interesting case. It surrounds whether a 
>>> runner makes a timer available or fires it prior to the bundle being 
>>> committed.
>>>
>>> I have commented elsewhere about this part, quoting the Jira:
>>>
>>>
>>> have experimented with this a little and have not yet figured out what the 
>>> correct solution should be. What I tried:
>>> 1) hold input watermark for min(setup timers)
>>> 2) fire timers based not on input watermark, but on output watermark 
>>> (output watermark is held by min timer stamp)
>>>
>>> Neither of these quite works. What we need is a separate "element input 
>>> watermark" and "timer input watermark". The overall input watermark that 
>>> drives GC is the min of these. The output watermark is also held to this 
>>> overall input watermark. User timers fire according to the element input 
>>> watermark.
>>>
>>> Kenn
>>>
>>> On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <lc...@google.com> 
>>> <lc...@google.com> wrote:
>>>
>>> Jan are you editing the implementation of how timers work within the 
>>> DirectRunner or are trying to build support for time sorted input on top of 
>>> the Beam model for timers?
>>> Because I think you will need to do the former.
>>>
>>> On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <je...@seznam.cz> 
>>> <je...@seznam.cz> wrote:
>>>
>>> Hm, that would probably work, thanks!
>>>
>>> But, should the timers behave like that? I'm trying to fix tris by 
>>> introducing a sequence of watermarks
>>>
>>>  inputs watermark -> timer watermark -> output watermark
>>>
>>> as suggested in the JIRA, and it actually seems to be working as expected. 
>>> It even cleans some code paths, but I'm debugging some strange behavior 
>>> this exposed - `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems 
>>> to have stopped clearing itself after this change and some Pipelines 
>>> therefore stopped working. I'm little lost why this happened. I can push 
>>> code I have if anyone interested.
>>>
>>> Jan
>>>
>>> On 6/10/19 5:32 PM, Lukasz Cwik wrote:
>>>
>>> We hit an instance of this problem before and solved it rescheduling the GC 
>>> timer again if there was a conflicting timer that was also meant to fire.
>>>
>>> On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <je...@seznam.cz> 
>>> <je...@seznam.cz> wrote:
>>>
>>> For a single key. I'm getting into collision of timerId 
>>> `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for flushing 
>>> sorted elements in implementation of @RequiresTimeSortedInput. The timers 
>>> are being swapped at the end of input (but it can happen anywhere near end 
>>> of window), which results in state being cleared before it gets flushed, 
>>> which means data loss.
>>>
>>>  Jan
>>>
>>> On 6/10/19 5:08 PM, Reuven Lax wrote:
>>>
>>> Do you mean for a single key or across keys?
>>>
>>> On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <je...@seznam.cz> 
>>> <je...@seznam.cz> wrote:
>>>
>>> Hi,
>>>
>>> I have come across issue [1], where I'm not sure how to solve this in
>>> most elegant way.
>>>
>>> Any suggestions?
>>>
>>> Thanks,
>>>
>>>   Jan
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-7520
>>>
>>>

Reply via email to