[ https://issues.apache.org/jira/browse/BEAM-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stas Levin updated BEAM-2859: ----------------------------- Description: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed by using the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it is not fired and needs to be re-run next time around, but won't, since it's been deleted. It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to be deleted. was: {{AfterProcessingTime}} based timers are not fired when the input watermark does not advance, preventing from buffered element to be emitted. The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} determines what triggers are ready to be processed by using the following condition: {code:java} timer.getTimestamp().isBefore(inputWatermark) {code} However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position of the input watermark should *NOT* have effect. In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers once they are deemed eligible for processing (but will not necessarily fire). This may not be the correct behavior for timers in general and for timers in the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain scheduled until the corresponding window expires and all state is cleared. For instance, consider a timer that is found eligible for processing and is thus deleted, then it just so happens to be that its {{shouldFire()}} returns {{false}} and it is not fired and needs to be re-run next time around, but won't, since it's been deleted. It may be better to avoid removing timers in {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management up to {{ReduceFnRunner#clearAllState()}} which has more context to determine whether it's time for a given timer to deleted. > Processing time based timers are not properly fired in case the watermark > stays put > ----------------------------------------------------------------------------------- > > Key: BEAM-2859 > URL: https://issues.apache.org/jira/browse/BEAM-2859 > Project: Beam > Issue Type: Bug > Components: runner-spark > Affects Versions: 2.0.0, 2.1.0 > Reporter: Stas Levin > Assignee: Stas Levin > > {{AfterProcessingTime}} based timers are not fired when the input watermark > does not advance, preventing from buffered element to be emitted. > The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} > determines what triggers are ready to be processed by using the following > condition: > {code:java} > timer.getTimestamp().isBefore(inputWatermark) > {code} > However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position > of the input watermark should *NOT* have effect. > In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers > once they are deemed eligible for processing (but will not necessarily fire). > This may not be the correct behavior for timers in general and for timers in > the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain > scheduled until the corresponding window expires and all state is cleared. > For instance, consider a timer that is found eligible for processing and is > thus deleted, then it just so happens to be that its {{shouldFire()}} returns > {{false}} and it is not fired and needs to be re-run next time around, but > won't, since it's been deleted. > It may be better to avoid removing timers in > {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management > up to {{ReduceFnRunner#clearAllState()}} which has more context to determine > whether it's time for a given timer to be deleted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)