boyuanzz commented on a change in pull request #13421:
URL: https://github.com/apache/beam/pull/13421#discussion_r538826499
##########
File path: sdks/python/apache_beam/runners/direct/direct_userstate.py
##########
@@ -225,14 +225,28 @@ def __init__(self, step_context, dofn, key_coder):
self.cached_states = {}
self.cached_timers = {}
-
- def get_timer(self, timer_spec, key, window, timestamp, pane):
+ self.cached_timer_families = {}
+
+ def get_timer(
+ self,
+ timer_spec: userstate.TimerSpec,
+ key,
+ window,
+ timestamp,
+ pane,
+ is_timer_family=False
Review comment:
I don't think it will result in too much code duplication: basically
splitting your if branch into 2 functions and create one util function for
creating cached_key.
I'm leaning to better readability over saving code duplication.
##########
File path: sdks/python/apache_beam/transforms/userstate.py
##########
@@ -184,29 +186,6 @@ def to_runner_api(self, context, key_coder, window_coder):
coders._TimerCoder(key_coder, window_coder)))
-# TODO(BEAM-9602): Provide support for dynamic timer.
-class TimerFamilySpec(object):
Review comment:
I'm curious why do you choose `TimerSpec` ->
`TimerParam`/`TimerFamilyParam` approach over `TimerSpec`/`TimerFamilySpec` ->
`TimerParam`? Overall the API user is using `TimerSpec` to register `on_timer`
callback, right?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]