Fair enough, I think we can also just extend existing timer API to allow
setting a dynamic timer tag field:

timer.set(timestamp) -> timer.set(timestamp, dynamic_timer_tag=a_tag)
timer.clear() -> timer.clear(dynamic_timer_tag=a_tag)

and have the default value of dynamic_timer_tag to be empty (the special
case)

On Wed, Dec 9, 2020 at 5:12 PM Robert Bradshaw <[email protected]> wrote:

> On Wed, Dec 9, 2020 at 3:48 PM Kyle Weaver <[email protected]> wrote:
>
>> Possibly a dumb question, but: if "the static timer is just a special
>> case of the dynamic timer," why do we need to use different classes at all?
>>
>
> I agree, I would argue that there is little if any value to the user
> to distinguish between these two "types" of timers at all.
>
>
>> On Wed, Dec 9, 2020 at 2:30 PM Yichi Zhang <[email protected]> wrote:
>>
>>> Hi, Beam community,
>>>
>>> I'm trying to add the dynamic timer
>>> <https://issues.apache.org/jira/browse/BEAM-6857> support to the python
>>> sdk. In java sdk, a dynamic timer is specified through declaring a
>>> TimerSpec of timerMap type and annotating it with @TimerFamily in
>>> process method parameter:
>>>
>>>   @TimerFamily("timers") private final TimerSpec timer =
>>>     TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>
>>>   @ProcessElement public void process(
>>>       @Element KV<String, ValueT> element,
>>>       @Timestamp Instant elementTs,
>>>       @TimerFamily("timers") TimerMap timers) {
>>>      timers.set(element.getValue().getActionType(), elementTs);
>>>   }
>>>
>>>
>>> In python, I'm trying to figure out how we should differentiate a
>>> dynamic timer to the conventional static timer, at the timer spec
>>> declaration or process parameters annotation, or both:
>>>
>>> Approach 1: if we differentiate them only at timer spec declaration it
>>> looks like:
>>>
>>>     class TimerDoFn(beam.DoFn):
>>>       timer_spec = userstate.TimerSpec('static_timer', 
>>> userstate.TimeDomain.WATERMARK)
>>>       timer_family_spec = userstate.TimerFamilySpec('dynamic_timer', 
>>> userstate.TimeDomain.WATERMARK)
>>>       def process(self,
>>>                   element,
>>>                   timer=beam.DoFn.TimerParam(timer_spec),
>>>                   timer_map=beam.DoFn.TimerParam(timer_family_spec)):
>>>
>>>       @userstate.on_timer(timer_spec)
>>>       def process_timer(
>>>           self,
>>>           ts=beam.DoFn.TimestampParam,
>>>           timer=beam.DoFn.TimerParam(timer_spec)):
>>>
>>>       @userstate.on_timer(timer_family_spec)
>>>       def process_timer_map(
>>>           self,
>>>           ts=beam.DoFn.TimestampParam,
>>>           dynamic_tag=DoFn.DynamicTagParam,
>>>           timer_map=beam.DoFn.TimerParam(timer_family_spec))
>>>
>>> Approach 2: if only at parameter annotation:
>>>
>>>     class TimerDoFn(beam.DoFn):
>>>       timer_spec = userstate.TimerSpec('static_timer', 
>>> userstate.TimeDomain.WATERMARK)
>>>       timer_family_spec = userstate.TimerSpec('dynamic_timer', 
>>> userstate.TimeDomain.WATERMARK)
>>>       def process(self,
>>>                   element,
>>>                   timer=beam.DoFn.TimerParam(timer_spec),
>>>                   timer_map=beam.DoFn.TimerFamilyParam(timer_family_spec)):
>>>
>>>       @userstate.on_timer(timer_spec)
>>>       def process_timer(
>>>           self,
>>>           ts=beam.DoFn.TimestampParam,
>>>           timer=beam.DoFn.TimerParam(timer_spec)):
>>>
>>>       @userstate.on_timer(timer_family_spec)
>>>       def process_timer(
>>>           self,
>>>           ts=beam.DoFn.TimestampParam,
>>>           timer=beam.DoFn.TimerFamilyParam(timer_family_spec)):
>>>
>>>
>>> Approach 3: we can differentiate them at both places, but since the
>>> static timer is just a special case of the dynamic timer, it will introduce
>>> a certain amount of code duplication.
>>>
>>> I'm trying to decide which one offers the best API readability and
>>> usability.
>>>
>>> Anyone have any opinions?
>>>
>>> Thanks,
>>> Yichi
>>>
>>

Reply via email to