[
https://issues.apache.org/jira/browse/BEAM-14127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17541630#comment-17541630
]
Beam JIRA Bot commented on BEAM-14127:
--------------------------------------
This issue is P2 but has been unassigned without any comment for 60 days so it
has been labeled "stale-P2". If this issue is still affecting you, we care!
Please comment and remove the label. Otherwise, in 14 days the issue will be
moved to P3.
Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed
explanation of what these priorities mean.
> Timers with same family ids in same stage (but different transforms) are
> buffered together
> ------------------------------------------------------------------------------------------
>
> Key: BEAM-14127
> URL: https://issues.apache.org/jira/browse/BEAM-14127
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core, sdk-py-harness
> Reporter: Pablo Estrada
> Priority: P2
> Labels: stale-P2
>
> The following test case does not work properly:
>
> {code:java}
> def test_dynamic_timer_clear_then_set_timer(self):
> class EmitTwoEvents(DoFn):
> EMIT_CLEAR_SET_TIMER = TimerSpec('emitclear', TimeDomain.WATERMARK)
> def process(self, element, emit=DoFn.TimerParam(EMIT_CLEAR_SET_TIMER)):
> yield ('1', 'set')
> emit.set(1)
> @on_timer(EMIT_CLEAR_SET_TIMER)
> def emit_clear(self):
> yield ('1', 'clear')
> class DynamicTimerDoFn(DoFn):
> EMIT_TIMER_FAMILY = TimerSpec('emit', TimeDomain.WATERMARK)
> def process(self, element, emit=DoFn.TimerParam(EMIT_TIMER_FAMILY)):
> if element[1] == 'set':
> emit.set(10, dynamic_timer_tag='emit1')
> emit.set(20, dynamic_timer_tag='emit2')
> if element[1] == 'clear':
> emit.set(30, dynamic_timer_tag='emit3')
> emit.clear(dynamic_timer_tag='emit3')
> emit.set(40, dynamic_timer_tag='emit3')
> return []
> @on_timer(EMIT_TIMER_FAMILY)
> def emit_callback(
> self, ts=DoFn.TimestampParam, tag=DoFn.DynamicTimerTagParam):
> yield (tag, ts)
> with TestPipeline() as p:
> res = (
> p
> | beam.Create([('1', 'impulse')])
> | beam.ParDo(EmitTwoEvents())
> | beam.ParDo(DynamicTimerDoFn()))
> assert_that(res, equal_to([('emit1', 10), ('emit2', 20), ('emit3', 40)])
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)