On Thu, Jun 20, 2019 at 8:03 PM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
Hi Reuven,
> I would be cautious changing this. Being able
to put multiple timers in the same bundle saves
a lot, and if we force them to all run separate
through ReduceFnRunner we risk regressing
performance of some pipelines.
I understand your point. The issue here is that,
the current behavior is at least ... unexpected.
There might be one different conceptual approach
to that:
a) if a bundle contains timers for several
distinct timestamps (say T1 and T2), then it
implies, that timer T1 is effectively not fired
at time T1, but at time T2 - that is due to the
fact, that logically, the time hopped discretely
from some previous time T0 to T2 without any
"stopping by". Hence, it should be invalid to
setup timer for any time lower than T2.
But that is exactly how time advances. Watermarks
often don't move smoothly, as a single old element
can hold up the watermark. When that element is
finished, the watermark can jump forward in time,
triggering many timers.
b) the time will move smoothly (or, millisecond
precision smoothly), but that implies, that
there cannot be more distinct timers inside
single bundle.
If we don't want to take path b), we are
probably left with path a) (as doing nothing
seems weird, because it breaks one invariant,
that time can only move forward).
I'm not sure how this breaks that invariant. The
input watermark has only moved forward, as should be
true fo the output watermark. The output watermark
is help up by watermark holds in the step, which
usually means that the output watermark is already
being help to the earliest pending timer.
Option a) can be done - we might add something
like `getInputWatermark()` and
`getOutputWatermark()` to `DoFn.OnTimerContext`,
and throw exception when user tries to setup
timer for time before input watermark.
Effectively, that way we will let the user know,
that his timer was set to time T1, but was fired
at T2. But, that seems to be breaking change,
unfortunately.
What do you think?
Jan
On 6/20/19 5:29 PM, Reuven Lax wrote:
On Thu, Jun 20, 2019 at 3:08 PM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
Hi,
this problem seems to be harder than I
thought. I have a somewhat working code in
[1], but there are still failing some tests
(now tests for ReduceFnRunner), but I'm not
sure, if the problem is not in the tests,
so that my current behavior is actually
correct. Let me explain the problem:
- let's have a fixed window with allowed
lateness of 1 ms
- let's add two elements into the window
(on time), no late elements
- now, ReduceFnRunner with default trigger
will set *two* timers - one for
window.maxTimestamp() and second for
window.maxTimestamp() + allowedLateness
- the previous implementation fired *both*
timers at once (within single call to
ReduceFnRunner#onTimers), but now it fires
twice - once for the first timer and second
for the other
I would be cautious changing this. Being able
to put multiple timers in the same bundle saves
a lot, and if we force them to all run separate
through ReduceFnRunner we risk regressing
performance of some pipelines.
- the result of this is that although in
both cases only single pane is emitted, in
my branch the fired pane doesn't have the
`isLast` flag set (that is because the
window is not yet garbage collected -
waiting for late data - but the second time
it is not fired, because no late data arrived)
Would anyone know what is actually the
correct behavior regarding the
PaneInfo.isLast? I suppose there are only
two options - either two panes can come
with isLast flag (both end-of-window and
late), or it might be possible, that no
pane will marked with this flag (because no
late pane is fired).
Jan
[1] https://github.com/apache/beam/pull/8815
On 6/10/19 6:26 PM, Jan Lukavský wrote:
It seems to me that watermark hold cannot
change it (currently), because in the
current implementation timers fire
according to input watermark, but
watermark holds apply to output watermark.
If I didn't miss anything.
Dne 10. 6. 2019 18:15 napsal uživatel
Lukasz Cwik <lc...@google.com>
<mailto:lc...@google.com>:
I see. Is there a missing watermark
hold for timers less then T2?
On Mon, Jun 10, 2019 at 9:08 AM Jan
Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Yes, there is no difference
between GC and user timers in this
case. I think the problem is
simply that when watermark moves
from time T1 to T2, DirectRunner
fires all timers that fire until
T2, but that can create new timers
for time between T1 and T2, and
these will be fired later,
although should have been fired
before T2.
Jan
On 6/10/19 5:48 PM, Kenneth
Knowles wrote:
Reading your Jira, I believe
this problem will manifest
without the interaction of
user timers and GC.
Interesting case. It surrounds
whether a runner makes a timer
available or fires it prior to
the bundle being committed.
I have commented elsewhere
about this part, quoting the Jira:
> have experimented with this a
little and have not yet
figured out what the correct
solution should be. What I tried:
> 1) hold input watermark for
min(setup timers)
> 2) fire timers based not on
input watermark, but on output
watermark (output watermark is
held by min timer stamp)
Neither of these quite works.
What we need is a separate
"element input watermark" and
"timer input watermark". The
overall input watermark that
drives GC is the min of these.
The output watermark is also
held to this overall input
watermark. User timers fire
according to the element input
watermark.
Kenn
On Mon, Jun 10, 2019 at 8:44
AM Lukasz Cwik
<lc...@google.com
<mailto:lc...@google.com>> wrote:
Jan are you editing the
implementation of how
timers work within the
DirectRunner or are trying
to build support for time
sorted input on top of the
Beam model for timers?
Because I think you will
need to do the former.
On Mon, Jun 10, 2019 at
8:41 AM Jan Lukavský
<je...@seznam.cz
<mailto:je...@seznam.cz>>
wrote:
Hm, that would
probably work, thanks!
But, should the timers
behave like that? I'm
trying to fix tris by
introducing a sequence
of watermarks
inputs watermark ->
timer watermark ->
output watermark
as suggested in the
JIRA, and it actually
seems to be working as
expected. It even
cleans some code
paths, but I'm
debugging some strange
behavior this exposed
-
`WatermarkHold.watermarkHoldTagForTimestampCombiner`
seems to have stopped
clearing itself after
this change and some
Pipelines therefore
stopped working. I'm
little lost why this
happened. I can push
code I have if anyone
interested.
Jan
On 6/10/19 5:32 PM,
Lukasz Cwik wrote:
We hit an instance
of this problem
before and solved
it rescheduling
the GC timer again
if there was a
conflicting timer
that was also
meant to fire.
On Mon, Jun 10,
2019 at 8:17 AM
Jan Lukavský
<je...@seznam.cz
<mailto:je...@seznam.cz>>
wrote:
For a single
key. I'm
getting into
collision of
timerId
`__StatefulParDoGcTimerId`
(StatefulDoFnRunner)
and my timerId
for flushing
sorted
elements in
implementation
of
@RequiresTimeSortedInput.
The timers are
being swapped
at the end of
input (but it
can happen
anywhere near
end of
window), which
results in
state being
cleared before
it gets
flushed, which
means data loss.
Jan
On 6/10/19
5:08 PM,
Reuven Lax wrote:
Do you
mean for a
single key
or across
keys?
On Mon,
Jun 10,
2019, 5:11
AM Jan
Lukavský
<je...@seznam.cz
<mailto:je...@seznam.cz>>
wrote:
Hi,
I have
come
across
issue
[1],
where
I'm
not
sure
how to
solve
this in
most
elegant
way.
Any
suggestions?
Thanks,
Jan
[1]
https://issues.apache.org/jira/browse/BEAM-7520
It seems to me that watermark hold cannot change it
(currently), because in the current implementation timers fire according to
input watermark, but watermark holds apply to output watermark. If I didn't
miss anything.
Dne 10. 6. 2019 18:15 napsal uživatel Lukasz
Cwik<lc...@google.com> <mailto:lc...@google.com>:
I see. Is there a missing watermark hold for timers
less then T2?
On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský<je...@seznam.cz>
<mailto:je...@seznam.cz> wrote:
Yes, there is no difference between GC and user timers
in this case. I think the problem is simply that when watermark moves from time
T1 to T2, DirectRunner fires all timers that fire until T2, but that can create
new timers for time between T1 and T2, and these will be fired later, although
should have been fired before T2.
Jan
On 6/10/19 5:48 PM, Kenneth Knowles wrote:
Reading your Jira, I believe this problem will manifest
without the interaction of user timers and GC. Interesting case. It surrounds
whether a runner makes a timer available or fires it prior to the bundle being
committed.
I have commented elsewhere about this part, quoting the
Jira:
have experimented with this a little and have not yet
figured out what the correct solution should be. What I tried:
1) hold input watermark for min(setup timers)
2) fire timers based not on input watermark, but on
output watermark (output watermark is held by min timer stamp)
Neither of these quite works. What we need is a separate "element
input watermark" and "timer input watermark". The overall input watermark that
drives GC is the min of these. The output watermark is also held to this overall input watermark.
User timers fire according to the element input watermark.
Kenn
On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik<lc...@google.com>
<mailto:lc...@google.com> wrote:
Jan are you editing the implementation of how timers
work within the DirectRunner or are trying to build support for time sorted
input on top of the Beam model for timers?
Because I think you will need to do the former.
On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský<je...@seznam.cz>
<mailto:je...@seznam.cz> wrote:
Hm, that would probably work, thanks!
But, should the timers behave like that? I'm trying to
fix tris by introducing a sequence of watermarks
inputs watermark -> timer watermark -> output
watermark
as suggested in the JIRA, and it actually seems to be
working as expected. It even cleans some code paths, but I'm debugging some
strange behavior this exposed -
`WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have stopped
clearing itself after this change and some Pipelines therefore stopped working.
I'm little lost why this happened. I can push code I have if anyone interested.
Jan
On 6/10/19 5:32 PM, Lukasz Cwik wrote:
We hit an instance of this problem before and solved it
rescheduling the GC timer again if there was a conflicting timer that was also
meant to fire.
On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský<je...@seznam.cz>
<mailto:je...@seznam.cz> wrote:
For a single key. I'm getting into collision of timerId
`__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for flushing
sorted elements in implementation of @RequiresTimeSortedInput. The timers are
being swapped at the end of input (but it can happen anywhere near end of
window), which results in state being cleared before it gets flushed, which
means data loss.
Jan
On 6/10/19 5:08 PM, Reuven Lax wrote:
Do you mean for a single key or across keys?
On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský<je...@seznam.cz>
<mailto:je...@seznam.cz> wrote:
Hi,
I have come across issue [1], where I'm not sure how to
solve this in
most elegant way.
Any suggestions?
Thanks,
Jan
[1]https://issues.apache.org/jira/browse/BEAM-7520