my2038 commented on issue #67380:
URL: https://github.com/apache/airflow/issues/67380#issuecomment-4536486967
Update — upstream fix reviewed; revised proposed fix for 3.1.0–3.2.1
We have reviewed the fix in the upstream main branch. The dedicated
_reader_loop task + asyncio.Future per request is the correct long-term
solution — no ordering constraint, fully concurrent asend() calls, and clean
separation between writing and reading. We recommend that approach for the
upstream release.
For deployments still on 3.1.0–3.2.1, the bug is present and structurally
identical across all four versions. We are running an Airflow plugin as an
interim fix. The approach below is what the plugin implements — it is simpler
than the upstream solution but correct for the constrained case where a
dedicated reader task cannot be backported cleanly.
Why the originally proposed fix (move counter inside lock) deadlocks
python# WRONG — deadlocks under greenback re-entrancy
async def asend(self, msg):
async with self._async_lock:
frame = _RequestFrame(id=next(self.id_counter),
body=msg.model_dump())
self._async_writer.write(frame.as_bytes())
return await self._aget_response(frame.id) # ← injection point,
inside lock
asyncio.Lock is not reentrant. When greenback injects a second asend call
while the first is suspended at await self._aget_response(frame.id) inside the
lock, the injected call tries to acquire a lock already held by the same
asyncio Task and suspends forever waiting for itself.
Interim fix for 3.1.0–3.2.1: per-instance ordered queue
TriggerCommsDecoder is an attrs class, so _pending_ids is declared as a
proper field. A new additive method _aget_queued_response gates socket reads
without modifying _aget_response.
pythonimport attrs
from collections import deque
import asyncio
@attrs.define(kw_only=True)
class TriggerCommsDecoder(CommsDecoder[...]):
# ... existing fields ...
_pending_ids: deque = attrs.field(factory=deque, repr=False) # ADD:
deque of (frame_id, Event)
async def asend(self, msg):
frame = _RequestFrame(id=next(self.id_counter),
body=msg.model_dump())
# ── synchronous block — no await, no injection point ──
evt = asyncio.Event()
self._pending_ids.append((frame.id, evt))
self._async_writer.write(frame.as_bytes())
# ──────────────────────────────────────────────────────
return await self._aget_queued_response(frame.id, evt)
async def _aget_queued_response(self, expected_id: int, evt:
asyncio.Event):
# Wait until we are at the front of the queue.
while self._pending_ids[0][0] != expected_id:
await evt.wait()
# Sole reader — original _aget_response unmodified.
result = await self._aget_response(expected_id)
# Vacate front and signal next waiter.
self._pending_ids.popleft()
if self._pending_ids:
_, next_evt = self._pending_ids[0]
next_evt.set()
return result
Why this is correct for 3.1.0–3.2.1: next(self.id_counter), deque.append,
and _async_writer.write are all synchronous — no await between them, so
greenback cannot inject between counter increment and enqueue. Enqueue order ==
counter order == socket arrival order. If greenback injects a second asend, the
injected call appends its tuple and suspends at evt.wait() — it never touches
the socket reader. The completing caller signals the next waiter's Event; that
caller's response is already buffered and reads immediately.
Note on PR #64882 / #65285 (3.2.1): that PR added _thread_lock to the
synchronous send() and _read_frame() for OS-thread safety. It did not change
asend and does not address this greenback path. The bug is present in 3.2.1.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]