gemini-code-assist[bot] commented on code in PR #38427:
URL: https://github.com/apache/beam/pull/38427#discussion_r3212475848


##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -165,9 +165,14 @@ def reset_state():
       if AsyncWrapper._loop_started is not None:
         AsyncWrapper._loop_started.clear()
 
-      for pool in AsyncWrapper._pool.values():
-        pool.acquire(AsyncWrapper.initialize_pool(1)).shutdown(
-            wait=True, cancel_futures=True)
+      pools_to_shutdown = [
+          pool.acquire(AsyncWrapper.initialize_pool(1))
+          for pool in AsyncWrapper._pool.values()
+      ]

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `pool.acquire()` call is still executed within the `AsyncWrapper._lock` 
context. If the resource pool is at capacity (which is common when a worker 
thread is active, as the pool size is typically 1), `acquire()` will block 
until the resource is returned. However, the worker thread requires 
`AsyncWrapper._lock` to complete its work and release the resource (via 
`decrement_items_in_buffer`), leading to a deadlock. To fully resolve this, the 
acquisition of pool resources should also be moved outside the lock scope.
   
   ```suggestion
         pools = list(AsyncWrapper._pool.values())
   
       pools_to_shutdown = [
           pool.acquire(AsyncWrapper.initialize_pool(1)) for pool in pools
       ]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to