Fair enough, I think we can also just extend existing timer API to allow setting a dynamic timer tag field:
timer.set(timestamp) -> timer.set(timestamp, dynamic_timer_tag=a_tag) timer.clear() -> timer.clear(dynamic_timer_tag=a_tag) and have the default value of dynamic_timer_tag to be empty (the special case) On Wed, Dec 9, 2020 at 5:12 PM Robert Bradshaw <[email protected]> wrote: > On Wed, Dec 9, 2020 at 3:48 PM Kyle Weaver <[email protected]> wrote: > >> Possibly a dumb question, but: if "the static timer is just a special >> case of the dynamic timer," why do we need to use different classes at all? >> > > I agree, I would argue that there is little if any value to the user > to distinguish between these two "types" of timers at all. > > >> On Wed, Dec 9, 2020 at 2:30 PM Yichi Zhang <[email protected]> wrote: >> >>> Hi, Beam community, >>> >>> I'm trying to add the dynamic timer >>> <https://issues.apache.org/jira/browse/BEAM-6857> support to the python >>> sdk. In java sdk, a dynamic timer is specified through declaring a >>> TimerSpec of timerMap type and annotating it with @TimerFamily in >>> process method parameter: >>> >>> @TimerFamily("timers") private final TimerSpec timer = >>> TimerSpecs.timerMap(TimeDomain.EVENT_TIME); >>> >>> @ProcessElement public void process( >>> @Element KV<String, ValueT> element, >>> @Timestamp Instant elementTs, >>> @TimerFamily("timers") TimerMap timers) { >>> timers.set(element.getValue().getActionType(), elementTs); >>> } >>> >>> >>> In python, I'm trying to figure out how we should differentiate a >>> dynamic timer to the conventional static timer, at the timer spec >>> declaration or process parameters annotation, or both: >>> >>> Approach 1: if we differentiate them only at timer spec declaration it >>> looks like: >>> >>> class TimerDoFn(beam.DoFn): >>> timer_spec = userstate.TimerSpec('static_timer', >>> userstate.TimeDomain.WATERMARK) >>> timer_family_spec = userstate.TimerFamilySpec('dynamic_timer', >>> userstate.TimeDomain.WATERMARK) >>> def process(self, >>> element, >>> timer=beam.DoFn.TimerParam(timer_spec), >>> timer_map=beam.DoFn.TimerParam(timer_family_spec)): >>> >>> @userstate.on_timer(timer_spec) >>> def process_timer( >>> self, >>> ts=beam.DoFn.TimestampParam, >>> timer=beam.DoFn.TimerParam(timer_spec)): >>> >>> @userstate.on_timer(timer_family_spec) >>> def process_timer_map( >>> self, >>> ts=beam.DoFn.TimestampParam, >>> dynamic_tag=DoFn.DynamicTagParam, >>> timer_map=beam.DoFn.TimerParam(timer_family_spec)) >>> >>> Approach 2: if only at parameter annotation: >>> >>> class TimerDoFn(beam.DoFn): >>> timer_spec = userstate.TimerSpec('static_timer', >>> userstate.TimeDomain.WATERMARK) >>> timer_family_spec = userstate.TimerSpec('dynamic_timer', >>> userstate.TimeDomain.WATERMARK) >>> def process(self, >>> element, >>> timer=beam.DoFn.TimerParam(timer_spec), >>> timer_map=beam.DoFn.TimerFamilyParam(timer_family_spec)): >>> >>> @userstate.on_timer(timer_spec) >>> def process_timer( >>> self, >>> ts=beam.DoFn.TimestampParam, >>> timer=beam.DoFn.TimerParam(timer_spec)): >>> >>> @userstate.on_timer(timer_family_spec) >>> def process_timer( >>> self, >>> ts=beam.DoFn.TimestampParam, >>> timer=beam.DoFn.TimerFamilyParam(timer_family_spec)): >>> >>> >>> Approach 3: we can differentiate them at both places, but since the >>> static timer is just a special case of the dynamic timer, it will introduce >>> a certain amount of code duplication. >>> >>> I'm trying to decide which one offers the best API readability and >>> usability. >>> >>> Anyone have any opinions? >>> >>> Thanks, >>> Yichi >>> >>
