This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 269706a585c Async Post Load Test Fixes (#36596)
269706a585c is described below

commit 269706a585c85cae34e6d00c960090cc638ce821
Author: Dustin Rhodes <[email protected]>
AuthorDate: Tue Nov 4 09:36:34 2025 -0800

    Async Post Load Test Fixes (#36596)
    
    * Update async_dofn.py
    
    Fixes three problems found in load testing:
    
    1) We should not do any blocking operations under a lock in 
commit_finished_items.  Add items outside the lock.
    2) Timers should be skewed so they don't all fire at once.
    
    * Formatting fixes
    
    * extra tab
    
    * import random and make randomness deterministic per key.
    
    * Fix unit test failure caused by passing timeout to a function when the 
function does not define timeout.
    
    * Fix unit tests now that we don't block
    
    * formatting
    
    * re add sleep so message submission doesn't race with processing in the 
unit test
    
    * Address Comments
    
    * fix formatting and typo
---
 sdks/python/apache_beam/transforms/async_dofn.py   | 44 +++++++++++++---------
 .../apache_beam/transforms/async_dofn_test.py      | 39 +++++++++----------
 2 files changed, 43 insertions(+), 40 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/async_dofn.py 
b/sdks/python/apache_beam/transforms/async_dofn.py
index 6dc43dbf8da..d2fa90c8508 100644
--- a/sdks/python/apache_beam/transforms/async_dofn.py
+++ b/sdks/python/apache_beam/transforms/async_dofn.py
@@ -18,6 +18,7 @@
 from __future__ import absolute_import
 
 import logging
+import random
 import uuid
 from concurrent.futures import ThreadPoolExecutor
 from math import floor
@@ -55,9 +56,8 @@ class AsyncWrapper(beam.DoFn):
   TIMER_SET = ReadModifyWriteStateSpec('timer_set', coders.BooleanCoder())
   TO_PROCESS = BagStateSpec(
       'to_process',
-      coders.TupleCoder([coders.StrUtf8Coder(), coders.StrUtf8Coder()]),
-  )
-  _timer_frequency = 20
+      coders.TupleCoder(
+          [coders.FastPrimitivesCoder(), coders.FastPrimitivesCoder()]))
   # The below items are one per dofn (not instance) so are maps of UUID to
   # value.
   _processing_elements = {}
@@ -75,7 +75,8 @@ class AsyncWrapper(beam.DoFn):
       parallelism=1,
       callback_frequency=5,
       max_items_to_buffer=None,
-      max_wait_time=120,
+      timeout=1,
+      max_wait_time=0.5,
   ):
     """Wraps the sync_fn to create an asynchronous version.
 
@@ -96,14 +97,17 @@ class AsyncWrapper(beam.DoFn):
       max_items_to_buffer: We should ideally buffer enough to always be busy 
but
         not so much that the worker ooms.  By default will be 2x the 
parallelism
         which should be good for most pipelines.
-      max_wait_time: The maximum amount of time an item should wait to be added
-        to the buffer.  Used for testing to ensure timeouts are met.
+      timeout: The maximum amount of time an item should try to be scheduled
+        locally before it goes in the queue of waiting work.
+      max_wait_time: The maximum amount of sleep time while attempting to
+        schedule an item.  Used in testing to ensure timeouts are met.
     """
     self._sync_fn = sync_fn
     self._uuid = uuid.uuid4().hex
     self._parallelism = parallelism
+    self._timeout = timeout
     self._max_wait_time = max_wait_time
-    self._timer_frequency = 20
+    self._timer_frequency = callback_frequency
     if max_items_to_buffer is None:
       self._max_items_to_buffer = max(parallelism * 2, 10)
     else:
@@ -112,9 +116,6 @@ class AsyncWrapper(beam.DoFn):
     AsyncWrapper._processing_elements[self._uuid] = {}
     AsyncWrapper._items_in_buffer[self._uuid] = 0
     self.max_wait_time = max_wait_time
-    self.timer_frequency_ = callback_frequency
-    self.parallelism_ = parallelism
-    self._next_time_to_fire = Timestamp.now() + Duration(seconds=5)
     self._shared_handle = Shared()
 
   @staticmethod
@@ -238,9 +239,9 @@ class AsyncWrapper(beam.DoFn):
       **kwargs: keyword arguments that the wrapped dofn requires.
     """
     done = False
-    sleep_time = 1
+    sleep_time = 0.01
     total_sleep = 0
-    while not done:
+    while not done and total_sleep < self._timeout:
       done = self.schedule_if_room(element, ignore_buffer, *args, **kwargs)
       if not done:
         sleep_time = min(self.max_wait_time, sleep_time * 2)
@@ -256,10 +257,12 @@ class AsyncWrapper(beam.DoFn):
         total_sleep += sleep_time
         sleep(sleep_time)
 
-  def next_time_to_fire(self):
+  def next_time_to_fire(self, key):
+    random.seed(key)
     return (
         floor((time() + self._timer_frequency) / self._timer_frequency) *
-        self._timer_frequency)
+        self._timer_frequency) + (
+            random.random() * self._timer_frequency)
 
   def accepting_items(self):
     with AsyncWrapper._lock:
@@ -301,7 +304,7 @@ class AsyncWrapper(beam.DoFn):
     # Set a timer to fire on the next round increment of timer_frequency_. Note
     # we do this so that each messages timer doesn't get overwritten by the
     # next.
-    time_to_fire = self.next_time_to_fire()
+    time_to_fire = self.next_time_to_fire(element[0])
     timer.set(time_to_fire)
 
     # Don't output any elements.  This will be done in commit_finished_items.
@@ -346,6 +349,7 @@ class AsyncWrapper(beam.DoFn):
     # from local state and cancel their futures.
     to_remove = []
     key = None
+    to_reschedule = []
     if to_process_local:
       key = str(to_process_local[0][0])
     else:
@@ -387,9 +391,13 @@ class AsyncWrapper(beam.DoFn):
               'item %s found in processing state but not local state,'
               ' scheduling now',
               x)
-          self.schedule_item(x, ignore_buffer=True)
+          to_reschedule.append(x)
           items_rescheduled += 1
 
+    # Reschedule the items not under a lock
+    for x in to_reschedule:
+      self.schedule_item(x, ignore_buffer=False)
+
     # Update processing state to remove elements we've finished
     to_process.clear()
     for x in to_process_local:
@@ -408,8 +416,8 @@ class AsyncWrapper(beam.DoFn):
     # If there are items not yet finished then set a timer to fire in the
     # future.
     self._next_time_to_fire = Timestamp.now() + Duration(seconds=5)
-    if items_not_yet_finished > 0:
-      time_to_fire = self.next_time_to_fire()
+    if items_in_processing_state > 0:
+      time_to_fire = self.next_time_to_fire(key)
       timer.set(time_to_fire)
 
     # Each result is a list. We want to combine them into a single
diff --git a/sdks/python/apache_beam/transforms/async_dofn_test.py 
b/sdks/python/apache_beam/transforms/async_dofn_test.py
index ecc730a66f9..7577e215d1c 100644
--- a/sdks/python/apache_beam/transforms/async_dofn_test.py
+++ b/sdks/python/apache_beam/transforms/async_dofn_test.py
@@ -343,10 +343,15 @@ class AsyncTest(unittest.TestCase):
     self.assertEqual(async_dofn._max_items_to_buffer, 5)
     self.check_items_in_buffer(async_dofn, 5)
 
-    # After 55 seconds all items should be finished (including those which were
-    # waiting on the buffer).
+    # Wait for all buffered items to finish.
     self.wait_for_empty(async_dofn, 100)
+    # This will commit buffered items and add new items which didn't fit in the
+    # buffer.
     result = async_dofn.commit_finished_items(fake_bag_state, fake_timer)
+
+    # Wait for the new buffered items to finish.
+    self.wait_for_empty(async_dofn, 100)
+    result.extend(async_dofn.commit_finished_items(fake_bag_state, fake_timer))
     self.check_output(result, expected_output)
     self.check_items_in_buffer(async_dofn, 0)
 
@@ -414,33 +419,23 @@ class AsyncTest(unittest.TestCase):
     # Run for a while. Should be enough to start all items but not finish them
     # all.
     time.sleep(random.randint(30, 50))
-    # Commit some stuff
-    pre_crash_results = []
-    for i in range(0, 10):
-      pre_crash_results.append(
-          async_dofn.commit_finished_items(
-              bag_states['key' + str(i)], timers['key' + str(i)]))
 
-    # Wait for all items to at least make it into the buffer.
     done = False
+    results = [[] for _ in range(0, 10)]
     while not done:
-      time.sleep(10)
       done = True
-      for future in futures:
-        if not future.done():
+      for i in range(0, 10):
+        results[i].extend(
+            async_dofn.commit_finished_items(
+                bag_states['key' + str(i)], timers['key' + str(i)]))
+        if not bag_states['key' + str(i)].items:
+          self.check_output(results[i], expected_outputs['key' + str(i)])
+        else:
           done = False
-          break
-
-    # Wait for all items to finish.
-    self.wait_for_empty(async_dofn)
+      time.sleep(random.randint(10, 30))
 
     for i in range(0, 10):
-      result = async_dofn.commit_finished_items(
-          bag_states['key' + str(i)], timers['key' + str(i)])
-      logging.info('pre_crash_results %s', pre_crash_results[i])
-      logging.info('result %s', result)
-      self.check_output(
-          pre_crash_results[i] + result, expected_outputs['key' + str(i)])
+      self.check_output(results[i], expected_outputs['key' + str(i)])
       self.assertEqual(bag_states['key' + str(i)].items, [])
 
 

Reply via email to