jrmccluskey commented on code in PR #36596:
URL: https://github.com/apache/beam/pull/36596#discussion_r2473272328


##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -238,9 +237,10 @@ def schedule_item(self, element, ignore_buffer=False, 
*args, **kwargs):
       **kwargs: keyword arguments that the wrapped dofn requires.
     """
     done = False
-    sleep_time = 1
+    sleep_time = 0.01
     total_sleep = 0
-    while not done:
+    timeout = 1

Review Comment:
   the timeout duration could be configurable in `__init__()`



##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -256,10 +256,12 @@ def schedule_item(self, element, ignore_buffer=False, 
*args, **kwargs):
         total_sleep += sleep_time
         sleep(sleep_time)
 
-  def next_time_to_fire(self):
+  def next_time_to_fire(self, key):
+    random.seed(key)
     return (
         floor((time() + self._timer_frequency) / self._timer_frequency) *
-        self._timer_frequency)
+        self._timer_frequency) + (
+            random.random() * self._timer_frequency)

Review Comment:
   I feel like doing all of the work to find a round increment of 
`_timer_frequency` is wasted compute once you add the extra fuzziness of 
`random.random() * self._timer_frequency` since you're no longer on a round 
increment afterwards 



##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -55,9 +56,8 @@ class AsyncWrapper(beam.DoFn):
   TIMER_SET = ReadModifyWriteStateSpec('timer_set', coders.BooleanCoder())
   TO_PROCESS = BagStateSpec(
       'to_process',
-      coders.TupleCoder([coders.StrUtf8Coder(), coders.StrUtf8Coder()]),
-  )
-  _timer_frequency = 20
+      coders.TupleCoder(

Review Comment:
   This is a backwards incompatible change, since you're swapping to a 
different coder



##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -103,7 +103,7 @@ def __init__(
     self._uuid = uuid.uuid4().hex
     self._parallelism = parallelism
     self._max_wait_time = max_wait_time
-    self._timer_frequency = 20
+    self._timer_frequency = callback_frequency

Review Comment:
   As best I can tell, `self._timer_frequency` is used but 
`self.timer_frequency_` is not. Is there any reason to have both? Same goes for 
all of these duped fields



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to