On Wed, Dec 9, 2020 at 3:48 PM Kyle Weaver <[email protected]> wrote:
> Possibly a dumb question, but: if "the static timer is just a special case > of the dynamic timer," why do we need to use different classes at all? > I agree, I would argue that there is little if any value to the user to distinguish between these two "types" of timers at all. > On Wed, Dec 9, 2020 at 2:30 PM Yichi Zhang <[email protected]> wrote: > >> Hi, Beam community, >> >> I'm trying to add the dynamic timer >> <https://issues.apache.org/jira/browse/BEAM-6857> support to the python >> sdk. In java sdk, a dynamic timer is specified through declaring a >> TimerSpec of timerMap type and annotating it with @TimerFamily in >> process method parameter: >> >> @TimerFamily("timers") private final TimerSpec timer = >> TimerSpecs.timerMap(TimeDomain.EVENT_TIME); >> >> @ProcessElement public void process( >> @Element KV<String, ValueT> element, >> @Timestamp Instant elementTs, >> @TimerFamily("timers") TimerMap timers) { >> timers.set(element.getValue().getActionType(), elementTs); >> } >> >> >> In python, I'm trying to figure out how we should differentiate a dynamic >> timer to the conventional static timer, at the timer spec declaration or >> process parameters annotation, or both: >> >> Approach 1: if we differentiate them only at timer spec declaration it >> looks like: >> >> class TimerDoFn(beam.DoFn): >> timer_spec = userstate.TimerSpec('static_timer', >> userstate.TimeDomain.WATERMARK) >> timer_family_spec = userstate.TimerFamilySpec('dynamic_timer', >> userstate.TimeDomain.WATERMARK) >> def process(self, >> element, >> timer=beam.DoFn.TimerParam(timer_spec), >> timer_map=beam.DoFn.TimerParam(timer_family_spec)): >> >> @userstate.on_timer(timer_spec) >> def process_timer( >> self, >> ts=beam.DoFn.TimestampParam, >> timer=beam.DoFn.TimerParam(timer_spec)): >> >> @userstate.on_timer(timer_family_spec) >> def process_timer_map( >> self, >> ts=beam.DoFn.TimestampParam, >> dynamic_tag=DoFn.DynamicTagParam, >> timer_map=beam.DoFn.TimerParam(timer_family_spec)) >> >> Approach 2: if only at parameter annotation: >> >> class TimerDoFn(beam.DoFn): >> timer_spec = userstate.TimerSpec('static_timer', >> userstate.TimeDomain.WATERMARK) >> timer_family_spec = userstate.TimerSpec('dynamic_timer', >> userstate.TimeDomain.WATERMARK) >> def process(self, >> element, >> timer=beam.DoFn.TimerParam(timer_spec), >> timer_map=beam.DoFn.TimerFamilyParam(timer_family_spec)): >> >> @userstate.on_timer(timer_spec) >> def process_timer( >> self, >> ts=beam.DoFn.TimestampParam, >> timer=beam.DoFn.TimerParam(timer_spec)): >> >> @userstate.on_timer(timer_family_spec) >> def process_timer( >> self, >> ts=beam.DoFn.TimestampParam, >> timer=beam.DoFn.TimerFamilyParam(timer_family_spec)): >> >> >> Approach 3: we can differentiate them at both places, but since the >> static timer is just a special case of the dynamic timer, it will introduce >> a certain amount of code duplication. >> >> I'm trying to decide which one offers the best API readability and >> usability. >> >> Anyone have any opinions? >> >> Thanks, >> Yichi >> >
