Hi, Beam community,
I'm trying to add the dynamic timer
<https://issues.apache.org/jira/browse/BEAM-6857> support to the python
sdk. In java sdk, a dynamic timer is specified through declaring a
TimerSpec of timerMap type and annotating it with @TimerFamily in
process method parameter:
@TimerFamily("timers") private final TimerSpec timer =
TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
@ProcessElement public void process(
@Element KV<String, ValueT> element,
@Timestamp Instant elementTs,
@TimerFamily("timers") TimerMap timers) {
timers.set(element.getValue().getActionType(), elementTs);
}
In python, I'm trying to figure out how we should differentiate a dynamic
timer to the conventional static timer, at the timer spec declaration or
process parameters annotation, or both:
Approach 1: if we differentiate them only at timer spec declaration it
looks like:
class TimerDoFn(beam.DoFn):
timer_spec = userstate.TimerSpec('static_timer',
userstate.TimeDomain.WATERMARK)
timer_family_spec = userstate.TimerFamilySpec('dynamic_timer',
userstate.TimeDomain.WATERMARK)
def process(self,
element,
timer=beam.DoFn.TimerParam(timer_spec),
timer_map=beam.DoFn.TimerParam(timer_family_spec)):
@userstate.on_timer(timer_spec)
def process_timer(
self,
ts=beam.DoFn.TimestampParam,
timer=beam.DoFn.TimerParam(timer_spec)):
@userstate.on_timer(timer_family_spec)
def process_timer_map(
self,
ts=beam.DoFn.TimestampParam,
dynamic_tag=DoFn.DynamicTagParam,
timer_map=beam.DoFn.TimerParam(timer_family_spec))
Approach 2: if only at parameter annotation:
class TimerDoFn(beam.DoFn):
timer_spec = userstate.TimerSpec('static_timer',
userstate.TimeDomain.WATERMARK)
timer_family_spec = userstate.TimerSpec('dynamic_timer',
userstate.TimeDomain.WATERMARK)
def process(self,
element,
timer=beam.DoFn.TimerParam(timer_spec),
timer_map=beam.DoFn.TimerFamilyParam(timer_family_spec)):
@userstate.on_timer(timer_spec)
def process_timer(
self,
ts=beam.DoFn.TimestampParam,
timer=beam.DoFn.TimerParam(timer_spec)):
@userstate.on_timer(timer_family_spec)
def process_timer(
self,
ts=beam.DoFn.TimestampParam,
timer=beam.DoFn.TimerFamilyParam(timer_family_spec)):
Approach 3: we can differentiate them at both places, but since the static
timer is just a special case of the dynamic timer, it will introduce a
certain amount of code duplication.
I'm trying to decide which one offers the best API readability and
usability.
Anyone have any opinions?
Thanks,
Yichi