uranusjr commented on code in PR #68377:
URL: https://github.com/apache/airflow/pull/68377#discussion_r3458588696
##########
task-sdk/src/airflow/sdk/execution_time/comms.py:
##########
@@ -208,42 +245,80 @@ class CommsDecoder(Generic[ReceiveMsgType, SendMsgType]):
_thread_lock: threading.Lock = attrs.field(factory=threading.Lock,
repr=False)
# Async lock for async operations
_async_lock: asyncio.Lock = attrs.field(factory=asyncio.Lock, repr=False)
+ # Thread ID of the event loop thread, recorded on the first asend() call.
+ # Used by send() to detect an imminent deadlock.
+ _loop_thread_id: int | None = attrs.field(default=None, repr=False,
init=False)
def _make_frame(self, msg: SendMsgType) -> _RequestFrame:
carrier: dict[str, str] = {}
_trace_propagator.inject(carrier)
return _RequestFrame(id=next(self.id_counter), body=msg.model_dump(),
context_carrier=carrier or None)
+ # A deadlock can only occur when the event loop is *currently running* in
+ # this thread. After AsyncOperator.execute() / loop.run_until_complete()
+ # returns, the loop is no longer running; subsequent sync send() calls are
+ # safe. asyncio.get_running_loop() raises RuntimeError when there is no
+ # active loop in the calling thread, so we use it to gate the check.
+ @property
+ def _is_on_loop_thread(self) -> bool:
+ if threading.get_ident() == self._loop_thread_id:
+ with suppress(RuntimeError):
+ return bool(asyncio.get_running_loop())
+ return False
+
def send(self, msg: SendMsgType) -> ReceiveMsgType | None:
"""Send a request to the parent and block until the response is
received."""
+ # Two-level detection for sync send() called from the event loop
thread.
+ # Raises DeadlockImminentError (BaseException subclass) so it escapes
+ # contextlib.suppress(Exception) in mask_secret() and other helpers.
+ #
+ # Level 1 — wrong pattern (broader): send() is on the event loop
thread at
+ # all, detected via the stored loop thread id from the last asend()
call.
+ # Level 2 — imminent deadlock (precise): _thread_lock is also
currently held
+ # by a thread-pool worker from a concurrent asend() round-trip.
+ #
+ # Typical cause: BaseHook.get_hook() or BaseHook.get_connection()
called
+ # from inside an async task / aexecute().
+ # Fix: use 'await BaseHook.aget_hook()' or 'await
BaseHook.aget_connection()'.
frame_bytes = self._make_frame(msg).as_bytes()
# We must make sure sockets aren't intermixed between sync and async
calls,
# thus we need a dual locking mechanism to ensure that.
- with self._thread_lock:
+ # When called from the event loop thread, use non-blocking acquire to
detect
+ # an imminent deadlock: an asend() coroutine currently holds
_thread_lock and
+ # is waiting for the event loop to complete its I/O — a true deadlock.
+ if not self._thread_lock.acquire(blocking=not self._is_on_loop_thread):
+ raise DeadlockImminentError(msg, deadlock_imminent=True)
+ try:
self.socket.sendall(frame_bytes)
if isinstance(msg, ResendLoggingFD):
- if recv_fds is None:
- return None
# We need special handling here! The server can't send us the
fd number, as the number on the
# supervisor will be different to in this process, so we have
to mutate the message ourselves here.
+ if recv_fds is None:
+ return None
Review Comment:
This does not need to change (I think it makes more sense to put it above
the comment since it does not count as special handling)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]