my2038 commented on issue #67380:
URL: https://github.com/apache/airflow/issues/67380#issuecomment-4536486967

   Update — upstream fix reviewed; revised proposed fix for 3.1.0–3.2.1
   We have reviewed the fix in the upstream main branch. The dedicated 
_reader_loop task + asyncio.Future per request is the correct long-term 
solution — no ordering constraint, fully concurrent asend() calls, and clean 
separation between writing and reading. We recommend that approach for the 
upstream release.
   For deployments still on 3.1.0–3.2.1, the bug is present and structurally 
identical across all four versions. We are running an Airflow plugin as an 
interim fix. The approach below is what the plugin implements — it is simpler 
than the upstream solution but correct for the constrained case where a 
dedicated reader task cannot be backported cleanly.
   
   Why the originally proposed fix (move counter inside lock) deadlocks
   python# WRONG — deadlocks under greenback re-entrancy
   async def asend(self, msg):
       async with self._async_lock:
           frame = _RequestFrame(id=next(self.id_counter), 
body=msg.model_dump())
           self._async_writer.write(frame.as_bytes())
           return await self._aget_response(frame.id)  # ← injection point, 
inside lock
   asyncio.Lock is not reentrant. When greenback injects a second asend call 
while the first is suspended at await self._aget_response(frame.id) inside the 
lock, the injected call tries to acquire a lock already held by the same 
asyncio Task and suspends forever waiting for itself.
   
   Interim fix for 3.1.0–3.2.1: per-instance ordered queue
   TriggerCommsDecoder is an attrs class, so _pending_ids is declared as a 
proper field. A new additive method _aget_queued_response gates socket reads 
without modifying _aget_response.
   pythonimport attrs
   from collections import deque
   import asyncio
   
   @attrs.define(kw_only=True)
   class TriggerCommsDecoder(CommsDecoder[...]):
       # ... existing fields ...
       _pending_ids: deque = attrs.field(factory=deque, repr=False)  # ADD: 
deque of (frame_id, Event)
   
       async def asend(self, msg):
           frame = _RequestFrame(id=next(self.id_counter), 
body=msg.model_dump())
           # ── synchronous block — no await, no injection point ──
           evt = asyncio.Event()
           self._pending_ids.append((frame.id, evt))
           self._async_writer.write(frame.as_bytes())
           # ──────────────────────────────────────────────────────
           return await self._aget_queued_response(frame.id, evt)
   
       async def _aget_queued_response(self, expected_id: int, evt: 
asyncio.Event):
           # Wait until we are at the front of the queue.
           while self._pending_ids[0][0] != expected_id:
               await evt.wait()
   
           # Sole reader — original _aget_response unmodified.
           result = await self._aget_response(expected_id)
   
           # Vacate front and signal next waiter.
           self._pending_ids.popleft()
           if self._pending_ids:
               _, next_evt = self._pending_ids[0]
               next_evt.set()
   
           return result
   Why this is correct for 3.1.0–3.2.1: next(self.id_counter), deque.append, 
and _async_writer.write are all synchronous — no await between them, so 
greenback cannot inject between counter increment and enqueue. Enqueue order == 
counter order == socket arrival order. If greenback injects a second asend, the 
injected call appends its tuple and suspends at evt.wait() — it never touches 
the socket reader. The completing caller signals the next waiter's Event; that 
caller's response is already buffered and reads immediately.
   Note on PR #64882 / #65285 (3.2.1): that PR added _thread_lock to the 
synchronous send() and _read_frame() for OS-thread safety. It did not change 
asend and does not address this greenback path. The bug is present in 3.2.1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to