Hi, Beam community,

I'm trying to add the dynamic timer
<https://issues.apache.org/jira/browse/BEAM-6857> support to the python
sdk. In java sdk, a dynamic timer is specified through declaring a
TimerSpec of timerMap type and annotating it with @TimerFamily in
process method parameter:

  @TimerFamily("timers") private final TimerSpec timer =
    TimerSpecs.timerMap(TimeDomain.EVENT_TIME);

  @ProcessElement public void process(
      @Element KV<String, ValueT> element,
      @Timestamp Instant elementTs,
      @TimerFamily("timers") TimerMap timers) {
     timers.set(element.getValue().getActionType(), elementTs);
  }


In python, I'm trying to figure out how we should differentiate a dynamic
timer to the conventional static timer, at the timer spec declaration or
process parameters annotation, or both:

Approach 1: if we differentiate them only at timer spec declaration it
looks like:

    class TimerDoFn(beam.DoFn):
      timer_spec = userstate.TimerSpec('static_timer',
userstate.TimeDomain.WATERMARK)
      timer_family_spec = userstate.TimerFamilySpec('dynamic_timer',
userstate.TimeDomain.WATERMARK)
      def process(self,
                  element,
                  timer=beam.DoFn.TimerParam(timer_spec),
                  timer_map=beam.DoFn.TimerParam(timer_family_spec)):

      @userstate.on_timer(timer_spec)
      def process_timer(
          self,
          ts=beam.DoFn.TimestampParam,
          timer=beam.DoFn.TimerParam(timer_spec)):

      @userstate.on_timer(timer_family_spec)
      def process_timer_map(
          self,
          ts=beam.DoFn.TimestampParam,
          dynamic_tag=DoFn.DynamicTagParam,
          timer_map=beam.DoFn.TimerParam(timer_family_spec))

Approach 2: if only at parameter annotation:

    class TimerDoFn(beam.DoFn):
      timer_spec = userstate.TimerSpec('static_timer',
userstate.TimeDomain.WATERMARK)
      timer_family_spec = userstate.TimerSpec('dynamic_timer',
userstate.TimeDomain.WATERMARK)
      def process(self,
                  element,
                  timer=beam.DoFn.TimerParam(timer_spec),
                  timer_map=beam.DoFn.TimerFamilyParam(timer_family_spec)):

      @userstate.on_timer(timer_spec)
      def process_timer(
          self,
          ts=beam.DoFn.TimestampParam,
          timer=beam.DoFn.TimerParam(timer_spec)):

      @userstate.on_timer(timer_family_spec)
      def process_timer(
          self,
          ts=beam.DoFn.TimestampParam,
          timer=beam.DoFn.TimerFamilyParam(timer_family_spec)):


Approach 3: we can differentiate them at both places, but since the static
timer is just a special case of the dynamic timer, it will introduce a
certain amount of code duplication.

I'm trying to decide which one offers the best API readability and
usability.

Anyone have any opinions?

Thanks,
Yichi

Reply via email to