[
https://issues.apache.org/jira/browse/BEAM-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stas Levin updated BEAM-2859:
-----------------------------
Description:
{{AfterProcessingTime}} based timers are not fired when the input watermark
does not advance, preventing from buffered element to be emitted.
The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}}
determines what triggers are ready to be processed by using the following
condition:
{code:java}
timer.getTimestamp().isBefore(inputWatermark)
{code}
However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of
the input watermark should *NOT* have effect.
In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers
once they are deemed eligible for processing (but will not necessarily fire).
This may not be the correct behavior for timers in general and for timers in
the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain
scheduled until the corresponding window expires and all state is cleared.
For instance, consider a timer that is found eligible for processing and is
thus deleted, then it just so happens to be that its {{shouldFire()}} returns
{{false}} and it is not fired and needs to be re-run next time around, but
won't since it's been deleted.
It may be better to avoid removing timers in
{{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up
to {{ReduceFnRunner#clearAllState()}} which has more context to determine
whether it's time for a given timer to deleted.
was:
{{AfterProcessingTime}} based timers are not fired when the input watermark
does not advance, preventing from buffered element to be emitted.
The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}}
determines what triggers are ready to be processed by using the following
condition:
{code:java}
timer.getTimestamp().isBefore(inputWatermark)
{code}
However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of
the input watermark should *NOT* have effect.
In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers
once they are deemed eligible for processing (but will not necessarily fire).
This may not be the correct behavior for timers in general and for timers in
the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain
scheduled until the corresponding window expires and all state is cleared.
For instance, consider a timer that is found eligible for processing and is
thus deleted, then it just so happens to be that its {{shouldFire()}} returns
{{false}} and it needs to be re-run next time around, but won't since it's been
deleted.
It may be better to avoid removing timers in
{{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up
to {{ReduceFnRunner#clearAllState()}} which has more context to determine
whether it's time for a given timer to deleted.
> Processing time based timers are not properly fired in case the watermark
> stays put
> -----------------------------------------------------------------------------------
>
> Key: BEAM-2859
> URL: https://issues.apache.org/jira/browse/BEAM-2859
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Affects Versions: 2.0.0, 2.1.0
> Reporter: Stas Levin
> Assignee: Stas Levin
>
> {{AfterProcessingTime}} based timers are not fired when the input watermark
> does not advance, preventing from buffered element to be emitted.
> The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}}
> determines what triggers are ready to be processed by using the following
> condition:
> {code:java}
> timer.getTimestamp().isBefore(inputWatermark)
> {code}
> However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position
> of the input watermark should *NOT* have effect.
> In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers
> once they are deemed eligible for processing (but will not necessarily fire).
> This may not be the correct behavior for timers in general and for timers in
> the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain
> scheduled until the corresponding window expires and all state is cleared.
> For instance, consider a timer that is found eligible for processing and is
> thus deleted, then it just so happens to be that its {{shouldFire()}} returns
> {{false}} and it is not fired and needs to be re-run next time around, but
> won't since it's been deleted.
> It may be better to avoid removing timers in
> {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management
> up to {{ReduceFnRunner#clearAllState()}} which has more context to determine
> whether it's time for a given timer to deleted.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)