shunping opened a new pull request, #38427: URL: https://github.com/apache/beam/pull/38427
During test execution or pipeline teardown, calling `reset_state` on `AsyncWrapper` intermittently hangs the pipeline indefinitely, leading to test runner timeouts (https://github.com/apache/beam/actions/runs/25590690572/job/75127752557?pr=38425). The traceback of timeout is shown below: ``` ________________________ AsyncTest_1.test_duplicates __________________________ [gw0] darwin -- Python 3.13.13 /Users/runner/work/beam/beam/sdks/python/target/.tox/py313-macos/bin/python self = <apache_beam.transforms.async_dofn_test.AsyncTest_1 testMethod=test_duplicates> def setUp(self): super().setUp() > async_lib.AsyncWrapper.reset_state() apache_beam/transforms/async_dofn_test.py:102: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ target/.tox/py313-macos/lib/python3.13/site-packages/apache_beam/transforms/async_dofn.py:169: in reset_state pool.acquire(AsyncWrapper.initialize_pool(1)).shutdown( /Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/concurrent/futures/thread.py:239: in shutdown t.join() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <Thread(ThreadPoolExecutor-180_0, stopped 14025928704)>, timeout = None def join(self, timeout=None): """Wait until the thread terminates. This blocks the calling thread until the thread whose join() method is called terminates -- either normally or through an unhandled exception or until the optional timeout occurs. When the timeout argument is present and not None, it should be a floating-point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened -- if the thread is still alive, the join() call timed out. When the timeout argument is not present or None, the operation will block until the thread terminates. A thread can be join()ed many times. join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception. """ if not self._initialized: raise RuntimeError("Thread.__init__() not called") if not self._started.is_set(): raise RuntimeError("cannot join thread before it is started") if self is current_thread(): raise RuntimeError("cannot join current thread") # the behavior of a negative timeout isn't documented, but self._bootstrap_inner() File "/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/threading.py", line 1044, in _bootstrap_inner self.run() File "/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/threading.py", line 995, in run self._target(*self._args, **self._kwargs) File "/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/managers.py", line 179, in serve_forever self.stop_event.wait(1) File "/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/threading.py", line 660, in wait signaled = self._cond.wait(timeout) File "/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/threading.py", line 363, in wait gotit = waiter.acquire(True, timeout) ``` The deadlock occurs in the following order: - The main thread executing `reset_state` acquires `AsyncWrapper._lock` and calls `pool.shutdown(wait=True)`. It remains blocked waiting for active worker threads to finish while holding the lock. - When an active worker thread finishes processing an item, the underlying future completes and tries to invoke its done callbacks on that same worker thread. The registered callback `decrement_items_in_buffer`, however, attempts to acquire `AsyncWrapper._lock` to decrement the buffer count. This leads to a deadlock. To fix it, we move the shutdown call outside of the lock scope, so while the main thread is waiting for worker threads to complete, it won't hold the lock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
