On Wed, Dec 9, 2020 at 3:48 PM Kyle Weaver <[email protected]> wrote:

> Possibly a dumb question, but: if "the static timer is just a special case
> of the dynamic timer," why do we need to use different classes at all?
>

I agree, I would argue that there is little if any value to the user
to distinguish between these two "types" of timers at all.


> On Wed, Dec 9, 2020 at 2:30 PM Yichi Zhang <[email protected]> wrote:
>
>> Hi, Beam community,
>>
>> I'm trying to add the dynamic timer
>> <https://issues.apache.org/jira/browse/BEAM-6857> support to the python
>> sdk. In java sdk, a dynamic timer is specified through declaring a
>> TimerSpec of timerMap type and annotating it with @TimerFamily in
>> process method parameter:
>>
>>   @TimerFamily("timers") private final TimerSpec timer =
>>     TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>
>>   @ProcessElement public void process(
>>       @Element KV<String, ValueT> element,
>>       @Timestamp Instant elementTs,
>>       @TimerFamily("timers") TimerMap timers) {
>>      timers.set(element.getValue().getActionType(), elementTs);
>>   }
>>
>>
>> In python, I'm trying to figure out how we should differentiate a dynamic
>> timer to the conventional static timer, at the timer spec declaration or
>> process parameters annotation, or both:
>>
>> Approach 1: if we differentiate them only at timer spec declaration it
>> looks like:
>>
>>     class TimerDoFn(beam.DoFn):
>>       timer_spec = userstate.TimerSpec('static_timer', 
>> userstate.TimeDomain.WATERMARK)
>>       timer_family_spec = userstate.TimerFamilySpec('dynamic_timer', 
>> userstate.TimeDomain.WATERMARK)
>>       def process(self,
>>                   element,
>>                   timer=beam.DoFn.TimerParam(timer_spec),
>>                   timer_map=beam.DoFn.TimerParam(timer_family_spec)):
>>
>>       @userstate.on_timer(timer_spec)
>>       def process_timer(
>>           self,
>>           ts=beam.DoFn.TimestampParam,
>>           timer=beam.DoFn.TimerParam(timer_spec)):
>>
>>       @userstate.on_timer(timer_family_spec)
>>       def process_timer_map(
>>           self,
>>           ts=beam.DoFn.TimestampParam,
>>           dynamic_tag=DoFn.DynamicTagParam,
>>           timer_map=beam.DoFn.TimerParam(timer_family_spec))
>>
>> Approach 2: if only at parameter annotation:
>>
>>     class TimerDoFn(beam.DoFn):
>>       timer_spec = userstate.TimerSpec('static_timer', 
>> userstate.TimeDomain.WATERMARK)
>>       timer_family_spec = userstate.TimerSpec('dynamic_timer', 
>> userstate.TimeDomain.WATERMARK)
>>       def process(self,
>>                   element,
>>                   timer=beam.DoFn.TimerParam(timer_spec),
>>                   timer_map=beam.DoFn.TimerFamilyParam(timer_family_spec)):
>>
>>       @userstate.on_timer(timer_spec)
>>       def process_timer(
>>           self,
>>           ts=beam.DoFn.TimestampParam,
>>           timer=beam.DoFn.TimerParam(timer_spec)):
>>
>>       @userstate.on_timer(timer_family_spec)
>>       def process_timer(
>>           self,
>>           ts=beam.DoFn.TimestampParam,
>>           timer=beam.DoFn.TimerFamilyParam(timer_family_spec)):
>>
>>
>> Approach 3: we can differentiate them at both places, but since the
>> static timer is just a special case of the dynamic timer, it will introduce
>> a certain amount of code duplication.
>>
>> I'm trying to decide which one offers the best API readability and
>> usability.
>>
>> Anyone have any opinions?
>>
>> Thanks,
>> Yichi
>>
>

Reply via email to