Possibly a dumb question, but: if "the static timer is just a special case
of the dynamic timer," why do we need to use different classes at all?

On Wed, Dec 9, 2020 at 2:30 PM Yichi Zhang <[email protected]> wrote:

> Hi, Beam community,
>
> I'm trying to add the dynamic timer
> <https://issues.apache.org/jira/browse/BEAM-6857> support to the python
> sdk. In java sdk, a dynamic timer is specified through declaring a
> TimerSpec of timerMap type and annotating it with @TimerFamily in
> process method parameter:
>
>   @TimerFamily("timers") private final TimerSpec timer =
>     TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>
>   @ProcessElement public void process(
>       @Element KV<String, ValueT> element,
>       @Timestamp Instant elementTs,
>       @TimerFamily("timers") TimerMap timers) {
>      timers.set(element.getValue().getActionType(), elementTs);
>   }
>
>
> In python, I'm trying to figure out how we should differentiate a dynamic
> timer to the conventional static timer, at the timer spec declaration or
> process parameters annotation, or both:
>
> Approach 1: if we differentiate them only at timer spec declaration it
> looks like:
>
>     class TimerDoFn(beam.DoFn):
>       timer_spec = userstate.TimerSpec('static_timer', 
> userstate.TimeDomain.WATERMARK)
>       timer_family_spec = userstate.TimerFamilySpec('dynamic_timer', 
> userstate.TimeDomain.WATERMARK)
>       def process(self,
>                   element,
>                   timer=beam.DoFn.TimerParam(timer_spec),
>                   timer_map=beam.DoFn.TimerParam(timer_family_spec)):
>
>       @userstate.on_timer(timer_spec)
>       def process_timer(
>           self,
>           ts=beam.DoFn.TimestampParam,
>           timer=beam.DoFn.TimerParam(timer_spec)):
>
>       @userstate.on_timer(timer_family_spec)
>       def process_timer_map(
>           self,
>           ts=beam.DoFn.TimestampParam,
>           dynamic_tag=DoFn.DynamicTagParam,
>           timer_map=beam.DoFn.TimerParam(timer_family_spec))
>
> Approach 2: if only at parameter annotation:
>
>     class TimerDoFn(beam.DoFn):
>       timer_spec = userstate.TimerSpec('static_timer', 
> userstate.TimeDomain.WATERMARK)
>       timer_family_spec = userstate.TimerSpec('dynamic_timer', 
> userstate.TimeDomain.WATERMARK)
>       def process(self,
>                   element,
>                   timer=beam.DoFn.TimerParam(timer_spec),
>                   timer_map=beam.DoFn.TimerFamilyParam(timer_family_spec)):
>
>       @userstate.on_timer(timer_spec)
>       def process_timer(
>           self,
>           ts=beam.DoFn.TimestampParam,
>           timer=beam.DoFn.TimerParam(timer_spec)):
>
>       @userstate.on_timer(timer_family_spec)
>       def process_timer(
>           self,
>           ts=beam.DoFn.TimestampParam,
>           timer=beam.DoFn.TimerFamilyParam(timer_family_spec)):
>
>
> Approach 3: we can differentiate them at both places, but since the static
> timer is just a special case of the dynamic timer, it will introduce a
> certain amount of code duplication.
>
> I'm trying to decide which one offers the best API readability and
> usability.
>
> Anyone have any opinions?
>
> Thanks,
> Yichi
>

Reply via email to