This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f1bbb637e31 Fix deadlock in AsyncWrapper reset_state() (#38427)
f1bbb637e31 is described below

commit f1bbb637e3181bbbb5f3b815a1327fe1c2425bb7
Author: Shunping Huang <[email protected]>
AuthorDate: Mon May 11 13:16:25 2026 -0400

    Fix deadlock in AsyncWrapper reset_state() (#38427)
    
    * Add a test to reproduce hanging.
    
    * Fix deadlock between shutdown in main thread and done callback in worker 
threads.
    
    * Address review comments.
    
    * Fix format
    
    * Modify the test to cover reset_state() hanging in asyncio mode.
    
    * Fix the deadlock when asyncio is used.
    
    * Fix formatting.
    
    * Increase timeout to reduce false-positives.
    
    * Revise test function names and some comments.
---
 sdks/python/apache_beam/transforms/async_dofn.py   | 29 ++++++++++++++----
 .../apache_beam/transforms/async_dofn_test.py      | 35 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/async_dofn.py 
b/sdks/python/apache_beam/transforms/async_dofn.py
index 28568bd893c..ad3d5bc6646 100644
--- a/sdks/python/apache_beam/transforms/async_dofn.py
+++ b/sdks/python/apache_beam/transforms/async_dofn.py
@@ -153,21 +153,39 @@ class AsyncWrapper(beam.DoFn):
 
   @staticmethod
   def reset_state():
+    event_loop_thread_to_join = None
     with AsyncWrapper._lock:
       if AsyncWrapper._event_loop:
         AsyncWrapper._event_loop.call_soon_threadsafe(
             AsyncWrapper._event_loop.stop)
       if AsyncWrapper._event_loop_thread:
-        AsyncWrapper._event_loop_thread.join()
+        event_loop_thread_to_join = AsyncWrapper._event_loop_thread
 
       AsyncWrapper._event_loop = None
       AsyncWrapper._event_loop_thread = None
       if AsyncWrapper._loop_started is not None:
         AsyncWrapper._loop_started.clear()
 
-      for pool in AsyncWrapper._pool.values():
-        pool.acquire(AsyncWrapper.initialize_pool(1)).shutdown(
-            wait=True, cancel_futures=True)
+      pools = list(AsyncWrapper._pool.values())
+
+    # We must join the asyncio event loop thread outside of the lock block.
+    # If joined inside the lock, the waiting thread holds the lock while 
blocking,
+    # preventing active coroutines' done callbacks from acquiring the lock on 
the
+    # event loop thread, resulting in a deadlock.
+    if event_loop_thread_to_join:
+      event_loop_thread_to_join.join()
+
+    # We must acquire and shut down the thread pools outside of the lock block.
+    # If shutdown(wait=True) is called inside the lock, the caller blocks 
holding
+    # the lock, preventing active worker threads from acquiring the lock to run
+    # their done callbacks, resulting in a deadlock.
+    pools_to_shutdown = [
+        pool.acquire(AsyncWrapper.initialize_pool(1)) for pool in pools
+    ]
+
+    for pool in pools_to_shutdown:
+      pool.shutdown(wait=True, cancel_futures=True)
+
     with AsyncWrapper._lock:
       AsyncWrapper._pool = {}
       AsyncWrapper._processing_elements = {}
@@ -268,7 +286,8 @@ class AsyncWrapper(beam.DoFn):
 
   def decrement_items_in_buffer(self, future):
     with AsyncWrapper._lock:
-      AsyncWrapper._items_in_buffer[self._uuid] -= 1
+      if self._uuid in AsyncWrapper._items_in_buffer:
+        AsyncWrapper._items_in_buffer[self._uuid] -= 1
 
   def schedule_if_room(self, element, ignore_buffer=False, *args, **kwargs):
     """Schedules an item to be processed asynchronously if there is room.
diff --git a/sdks/python/apache_beam/transforms/async_dofn_test.py 
b/sdks/python/apache_beam/transforms/async_dofn_test.py
index 81c7b8e163f..39901d791fb 100644
--- a/sdks/python/apache_beam/transforms/async_dofn_test.py
+++ b/sdks/python/apache_beam/transforms/async_dofn_test.py
@@ -16,6 +16,7 @@
 #
 
 import logging
+import multiprocessing
 import random
 import time
 import unittest
@@ -487,6 +488,40 @@ class AsyncTest(unittest.TestCase):
       self.check_output(results[i], expected_outputs['key' + str(i)])
       self.assertEqual(bag_states['key' + str(i)].items, [])
 
+  @staticmethod
+  def _run_reset_state_concurrent_teardown(use_asyncio):
+    dofn = BasicDofn(sleep_time=0.5)
+    async_dofn = async_lib.AsyncWrapper(dofn, use_asyncio=use_asyncio)
+    async_dofn.setup()
+    fake_bag_state = FakeBagState([])
+    fake_timer = FakeTimer(0)
+
+    # Start processing an item. This starts a worker thread/coroutine sleeping 
for 0.5s.
+    async_dofn.process(('key1', 1), to_process=fake_bag_state, 
timer=fake_timer)
+    time.sleep(0.05)
+
+    # Verify that calling reset_state() while background tasks are actively 
running
+    # completes cleanly without causing lock-ordering deadlocks.
+    async_lib.AsyncWrapper.reset_state()
+
+  def test_reset_state_concurrent_teardown(self):
+    # Verify concurrent teardown safety in a separate process to prevent any 
potential
+    # regressions from freezing the main pytest process at exit.
+    p = multiprocessing.Process(
+        target=AsyncTest._run_reset_state_concurrent_teardown,
+        args=(self.use_asyncio, ))
+    p.start()
+    p.join(timeout=10.0)
+
+    if p.is_alive():
+      p.terminate()
+      p.join()
+      self.fail(
+          "reset_state() deadlocked/hung waiting for active threads/tasks to 
finish"
+      )
+    else:
+      self.assertEqual(p.exitcode, 0)
+
 
 if __name__ == '__main__':
   unittest.main()

Reply via email to