uranusjr commented on code in PR #68377:
URL: https://github.com/apache/airflow/pull/68377#discussion_r3458588696


##########
task-sdk/src/airflow/sdk/execution_time/comms.py:
##########
@@ -208,42 +245,80 @@ class CommsDecoder(Generic[ReceiveMsgType, SendMsgType]):
     _thread_lock: threading.Lock = attrs.field(factory=threading.Lock, 
repr=False)
     # Async lock for async operations
     _async_lock: asyncio.Lock = attrs.field(factory=asyncio.Lock, repr=False)
+    # Thread ID of the event loop thread, recorded on the first asend() call.
+    # Used by send() to detect an imminent deadlock.
+    _loop_thread_id: int | None = attrs.field(default=None, repr=False, 
init=False)
 
     def _make_frame(self, msg: SendMsgType) -> _RequestFrame:
         carrier: dict[str, str] = {}
         _trace_propagator.inject(carrier)
         return _RequestFrame(id=next(self.id_counter), body=msg.model_dump(), 
context_carrier=carrier or None)
 
+    # A deadlock can only occur when the event loop is *currently running* in
+    # this thread.  After AsyncOperator.execute() / loop.run_until_complete()
+    # returns, the loop is no longer running; subsequent sync send() calls are
+    # safe.  asyncio.get_running_loop() raises RuntimeError when there is no
+    # active loop in the calling thread, so we use it to gate the check.
+    @property
+    def _is_on_loop_thread(self) -> bool:
+        if threading.get_ident() == self._loop_thread_id:
+            with suppress(RuntimeError):
+                return bool(asyncio.get_running_loop())
+        return False
+
     def send(self, msg: SendMsgType) -> ReceiveMsgType | None:
         """Send a request to the parent and block until the response is 
received."""
+        # Two-level detection for sync send() called from the event loop 
thread.
+        # Raises DeadlockImminentError (BaseException subclass) so it escapes
+        # contextlib.suppress(Exception) in mask_secret() and other helpers.
+        #
+        # Level 1 — wrong pattern (broader): send() is on the event loop 
thread at
+        #   all, detected via the stored loop thread id from the last asend() 
call.
+        # Level 2 — imminent deadlock (precise): _thread_lock is also 
currently held
+        #   by a thread-pool worker from a concurrent asend() round-trip.
+        #
+        # Typical cause: BaseHook.get_hook() or BaseHook.get_connection() 
called
+        # from inside an async task / aexecute().
+        # Fix: use 'await BaseHook.aget_hook()' or 'await 
BaseHook.aget_connection()'.
         frame_bytes = self._make_frame(msg).as_bytes()
 
         # We must make sure sockets aren't intermixed between sync and async 
calls,
         # thus we need a dual locking mechanism to ensure that.
-        with self._thread_lock:
+        # When called from the event loop thread, use non-blocking acquire to 
detect
+        # an imminent deadlock: an asend() coroutine currently holds 
_thread_lock and
+        # is waiting for the event loop to complete its I/O — a true deadlock.
+        if not self._thread_lock.acquire(blocking=not self._is_on_loop_thread):
+            raise DeadlockImminentError(msg, deadlock_imminent=True)
+        try:
             self.socket.sendall(frame_bytes)
             if isinstance(msg, ResendLoggingFD):
-                if recv_fds is None:
-                    return None
                 # We need special handling here! The server can't send us the 
fd number, as the number on the
                 # supervisor will be different to in this process, so we have 
to mutate the message ourselves here.
+                if recv_fds is None:
+                    return None

Review Comment:
   This does not need to change (I think it makes more sense to put it above 
the comment since it does not count as special handling)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to