[ 
https://issues.apache.org/jira/browse/BEAM-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin updated BEAM-2859:
-----------------------------
    Description: 
{{AfterProcessingTime}} based timers are not fired when the input watermark 
does not advance, preventing from buffered element to be emitted.

The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} 
determines what triggers are ready to be processed by using the following 
condition: 

{code:java}
timer.getTimestamp().isBefore(inputWatermark)
{code}

However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of 
the input watermark should *NOT* have effect.

In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers 
once they are deemed eligible for processing (but will not necessarily fire). 
This may not be the correct behavior for timers in general and for timers in 
the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain 
scheduled until the corresponding window expires and all state is cleared.
For instance, consider a timer that is found eligible for processing and is 
thus deleted, then it just so happens to be that its {{shouldFire()}} returns 
{{false}} and it is not fired and needs to be re-run next time around, but 
won't, since it's been deleted.

It may be better to avoid removing timers in 
{{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up 
to {{ReduceFnRunner#clearAllState()}} which has more context to determine 
whether it's time for a given timer to be deleted.

  was:
{{AfterProcessingTime}} based timers are not fired when the input watermark 
does not advance, preventing from buffered element to be emitted.

The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} 
determines what triggers are ready to be processed by using the following 
condition: 

{code:java}
timer.getTimestamp().isBefore(inputWatermark)
{code}

However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of 
the input watermark should *NOT* have effect.

In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers 
once they are deemed eligible for processing (but will not necessarily fire). 
This may not be the correct behavior for timers in general and for timers in 
the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain 
scheduled until the corresponding window expires and all state is cleared.
For instance, consider a timer that is found eligible for processing and is 
thus deleted, then it just so happens to be that its {{shouldFire()}} returns 
{{false}} and it is not fired and needs to be re-run next time around, but 
won't, since it's been deleted.

It may be better to avoid removing timers in 
{{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up 
to {{ReduceFnRunner#clearAllState()}} which has more context to determine 
whether it's time for a given timer to deleted.


> Processing time based timers are not properly fired in case the watermark 
> stays put
> -----------------------------------------------------------------------------------
>
>                 Key: BEAM-2859
>                 URL: https://issues.apache.org/jira/browse/BEAM-2859
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.0.0, 2.1.0
>            Reporter: Stas Levin
>            Assignee: Stas Levin
>
> {{AfterProcessingTime}} based timers are not fired when the input watermark 
> does not advance, preventing from buffered element to be emitted.
> The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} 
> determines what triggers are ready to be processed by using the following 
> condition: 
> {code:java}
> timer.getTimestamp().isBefore(inputWatermark)
> {code}
> However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position 
> of the input watermark should *NOT* have effect.
> In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers 
> once they are deemed eligible for processing (but will not necessarily fire). 
> This may not be the correct behavior for timers in general and for timers in 
> the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain 
> scheduled until the corresponding window expires and all state is cleared.
> For instance, consider a timer that is found eligible for processing and is 
> thus deleted, then it just so happens to be that its {{shouldFire()}} returns 
> {{false}} and it is not fired and needs to be re-run next time around, but 
> won't, since it's been deleted.
> It may be better to avoid removing timers in 
> {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management 
> up to {{ReduceFnRunner#clearAllState()}} which has more context to determine 
> whether it's time for a given timer to be deleted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to