Possibly a dumb question, but: if "the static timer is just a special case of the dynamic timer," why do we need to use different classes at all?
On Wed, Dec 9, 2020 at 2:30 PM Yichi Zhang <[email protected]> wrote: > Hi, Beam community, > > I'm trying to add the dynamic timer > <https://issues.apache.org/jira/browse/BEAM-6857> support to the python > sdk. In java sdk, a dynamic timer is specified through declaring a > TimerSpec of timerMap type and annotating it with @TimerFamily in > process method parameter: > > @TimerFamily("timers") private final TimerSpec timer = > TimerSpecs.timerMap(TimeDomain.EVENT_TIME); > > @ProcessElement public void process( > @Element KV<String, ValueT> element, > @Timestamp Instant elementTs, > @TimerFamily("timers") TimerMap timers) { > timers.set(element.getValue().getActionType(), elementTs); > } > > > In python, I'm trying to figure out how we should differentiate a dynamic > timer to the conventional static timer, at the timer spec declaration or > process parameters annotation, or both: > > Approach 1: if we differentiate them only at timer spec declaration it > looks like: > > class TimerDoFn(beam.DoFn): > timer_spec = userstate.TimerSpec('static_timer', > userstate.TimeDomain.WATERMARK) > timer_family_spec = userstate.TimerFamilySpec('dynamic_timer', > userstate.TimeDomain.WATERMARK) > def process(self, > element, > timer=beam.DoFn.TimerParam(timer_spec), > timer_map=beam.DoFn.TimerParam(timer_family_spec)): > > @userstate.on_timer(timer_spec) > def process_timer( > self, > ts=beam.DoFn.TimestampParam, > timer=beam.DoFn.TimerParam(timer_spec)): > > @userstate.on_timer(timer_family_spec) > def process_timer_map( > self, > ts=beam.DoFn.TimestampParam, > dynamic_tag=DoFn.DynamicTagParam, > timer_map=beam.DoFn.TimerParam(timer_family_spec)) > > Approach 2: if only at parameter annotation: > > class TimerDoFn(beam.DoFn): > timer_spec = userstate.TimerSpec('static_timer', > userstate.TimeDomain.WATERMARK) > timer_family_spec = userstate.TimerSpec('dynamic_timer', > userstate.TimeDomain.WATERMARK) > def process(self, > element, > timer=beam.DoFn.TimerParam(timer_spec), > timer_map=beam.DoFn.TimerFamilyParam(timer_family_spec)): > > @userstate.on_timer(timer_spec) > def process_timer( > self, > ts=beam.DoFn.TimestampParam, > timer=beam.DoFn.TimerParam(timer_spec)): > > @userstate.on_timer(timer_family_spec) > def process_timer( > self, > ts=beam.DoFn.TimestampParam, > timer=beam.DoFn.TimerFamilyParam(timer_family_spec)): > > > Approach 3: we can differentiate them at both places, but since the static > timer is just a special case of the dynamic timer, it will introduce a > certain amount of code duplication. > > I'm trying to decide which one offers the best API readability and > usability. > > Anyone have any opinions? > > Thanks, > Yichi >
