shunping opened a new pull request, #38427:
URL: https://github.com/apache/beam/pull/38427

   During test execution or pipeline teardown, calling `reset_state` on 
`AsyncWrapper` intermittently hangs the pipeline indefinitely, leading to test 
runner timeouts 
(https://github.com/apache/beam/actions/runs/25590690572/job/75127752557?pr=38425).
   
   The traceback of timeout is shown below:
   
   ```
   ________________________ AsyncTest_1.test_duplicates 
__________________________
   [gw0] darwin -- Python 3.13.13 
/Users/runner/work/beam/beam/sdks/python/target/.tox/py313-macos/bin/python
   
   self = <apache_beam.transforms.async_dofn_test.AsyncTest_1 
testMethod=test_duplicates>
   
       def setUp(self):
         super().setUp()
   >     async_lib.AsyncWrapper.reset_state()
   
   apache_beam/transforms/async_dofn_test.py:102: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
target/.tox/py313-macos/lib/python3.13/site-packages/apache_beam/transforms/async_dofn.py:169:
 in reset_state
       pool.acquire(AsyncWrapper.initialize_pool(1)).shutdown(
   
/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/concurrent/futures/thread.py:239:
 in shutdown
       t.join()
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
_ _ 
   
   self = <Thread(ThreadPoolExecutor-180_0, stopped 14025928704)>, timeout = 
None
   
       def join(self, timeout=None):
           """Wait until the thread terminates.
       
           This blocks the calling thread until the thread whose join() method 
is
           called terminates -- either normally or through an unhandled 
exception
           or until the optional timeout occurs.
       
           When the timeout argument is present and not None, it should be a
           floating-point number specifying a timeout for the operation in 
seconds
           (or fractions thereof). As join() always returns None, you must call
           is_alive() after join() to decide whether a timeout happened -- if 
the
           thread is still alive, the join() call timed out.
       
           When the timeout argument is not present or None, the operation will
           block until the thread terminates.
       
           A thread can be join()ed many times.
       
           join() raises a RuntimeError if an attempt is made to join the 
current
           thread as that would cause a deadlock. It is also an error to join() 
a
           thread before it has been started and attempts to do so raises the 
same
           exception.
       
           """
           if not self._initialized:
               raise RuntimeError("Thread.__init__() not called")
           if not self._started.is_set():
               raise RuntimeError("cannot join thread before it is started")
           if self is current_thread():
               raise RuntimeError("cannot join current thread")
       
           # the behavior of a negative timeout isn't documented, but
       self._bootstrap_inner()
     File 
"/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/threading.py",
 line 1044, in _bootstrap_inner
       self.run()
     File 
"/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/threading.py",
 line 995, in run
       self._target(*self._args, **self._kwargs)
     File 
"/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/managers.py",
 line 179, in serve_forever
       self.stop_event.wait(1)
     File 
"/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/threading.py",
 line 660, in wait
       signaled = self._cond.wait(timeout)
     File 
"/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/threading.py",
 line 363, in wait
       gotit = waiter.acquire(True, timeout)
   ```
   
   The deadlock occurs in the following order:
   - The main thread executing `reset_state` acquires `AsyncWrapper._lock` and 
calls `pool.shutdown(wait=True)`. It remains blocked waiting for active worker 
threads to finish while holding the lock.
   - When an active worker thread finishes processing an item, the underlying 
future completes and tries to invoke its done callbacks on that same worker 
thread. The registered callback `decrement_items_in_buffer`, however, attempts 
to acquire `AsyncWrapper._lock` to decrement the buffer count. This leads to a 
deadlock.
   
   To fix it, we move the shutdown call outside of the lock scope, so while the 
main thread is waiting for worker threads to complete, it won't hold the lock.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to