hkc-8010 opened a new issue, #64620:
URL: https://github.com/apache/airflow/issues/64620

   ## Apache Airflow Provider(s)
   
   core (triggerer)
   
   ## Versions of Apache Airflow Providers
   
   Observed in Apache Airflow 3.1.8 (Astronomer Runtime 3.1-14).
   
   ## Apache Airflow version
   
   3.1.8
   
   ## Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ## Deployment
   
   Astronomer (managed)
   
   ## Deployment details
   
   Managed Astronomer deployment with triggerer enabled, using deferrable 
operators including `CloudDataTransferServiceS3ToGCSOperator` (Google Cloud 
Storage Transfer Service) and BigQuery deferrable operators. Single triggerer 
replica, `DEFAULT_CAPACITY=1000`, 0.5 vCPU / 1.92 GiB memory.
   
   ## What happened
   
   The triggerer's `TriggerRunner` subprocess crashes with `RuntimeError: 
Response read out of order! Got frame.id=N, expect_id=N+1` raised from 
`TriggerCommsDecoder._aget_response`. When this exception propagates up through 
`TriggerRunner.arun()` → `sync_state_to_supervisor()`, it kills the entire 
`TriggerRunner` subprocess rather than just the individual failing trigger. 
After this fatal crash, the triggerer pod remains alive (its HTTP server 
continues serving requests) but the `TriggerRunner` subprocess does not 
restart, leaving all deferred tasks stuck indefinitely.
   
   The error was observed repeatedly — at least a dozen times over an 11-hour 
window — across multiple `TriggerRunner` subprocess restarts. The final fatal 
crash propagated to `arun()` itself and the subprocess did not recover:
   
   ```
   2026-04-02T05:43:00.310571Z [error] Trigger runner failed 
[airflow.jobs.triggerer_job_runner]
   RuntimeError: Response read out of order! Got frame.id=18314, expect_id=18315
     File triggerer_job_runner.py:880, in arun
     File triggerer_job_runner.py:1090, in sync_state_to_supervisor
     File triggerer_job_runner.py:1101, in asend
     File triggerer_job_runner.py:801, in asend
     File triggerer_job_runner.py:791, in _aget_response
   ```
   
   An earlier instance at 02:37:35 UTC shows the full call chain, originating 
from a BigQuery trigger calling a synchronous method from within an async 
context:
   
   ```
   RuntimeError: Response read out of order! Got frame.id=25902, expect_id=25903
     task: <Task finished 
name='kingpd-dimensions/manual__2026-04-01T00:00:00+00:00/d_kingpd_flavour_f_act_sum.insert-step-1/-1/1
 (ID 158353)'
           exception=RuntimeError('Response read out of order! Got 
frame.id=25902, expect_id=25903')>
   
     Full traceback:
       greenback/_impl.py:116   greenback_shim
       greenback/_impl.py:201   _greenback_shim
       greenback/_impl.py:81    trampoline
       outcome/_impl.py:185     send
       triggerer_job_runner.py:1152  run_trigger
       providers/google/cloud/triggers/bigquery.py:199   run
       providers/google/cloud/triggers/bigquery.py:157   safe_to_cancel
       providers/google/cloud/triggers/bigquery.py:131   get_task_state
       asgiref/sync.py:439      __call__           ← sync_to_async wrapping a 
sync fn
       greenback/_impl.py:210   _greenback_shim
       concurrent/futures/thread.py:59  run        ← runs in thread pool
       asgiref/sync.py:491      thread_handler
       sdk/execution_time/task_runner.py:514  get_task_states
       triggerer_job_runner.py:772   send           ← TriggerCommsDecoder.send()
       asgiref/sync.py:262      __call__           ← async_to_sync()
       concurrent/futures/_base.py:449  result
       concurrent/futures/_base.py:401  __get_result
       asgiref/sync.py:300      main_wrap
       triggerer_job_runner.py:801   asend
       triggerer_job_runner.py:791   _aget_response ← frame.id mismatch raised 
here
   ```
   
   After the fatal 05:43 crash, the triggerer pod served only HTTP `404 Not 
Found` responses for trigger log requests — confirming no triggers were being 
executed — for the remainder of the observation window (07:30–07:53+ UTC).
   
   ## Root Cause Analysis
   
   The bug is a **thread-safety violation in `TriggerCommsDecoder.asend()`**.
   
   ```python
   # triggerer_job_runner.py
   async def asend(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
       frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
       bytes = frame.as_bytes()
   
       async with self._async_lock:          # ← asyncio.Lock: only safe within 
ONE event loop
           self._async_writer.write(bytes)
           return await self._aget_response(frame.id)
   ```
   
   `TriggerCommsDecoder.send()` calls `async_to_sync(self.asend)(msg)`. When 
`async_to_sync` is called from a thread (e.g., from `asgiref.sync_to_async` 
running a synchronous method of a trigger), it spins up a **new asyncio event 
loop in that thread**. The `asyncio.Lock` (`self._async_lock`) is bound to a 
single event loop and provides **no mutual exclusion across threads** — it only 
serializes coroutines within the same event loop.
   
   When two concurrent callers invoke `TriggerCommsDecoder.send()`:
   1. **Caller A**: `sync_state_to_supervisor()` from the main 
`TriggerRunner.arun()` event loop
   2. **Caller B**: a BigQuery trigger's `get_task_state()` via `sync_to_async` 
→ thread → `async_to_sync(asend)` on a separate event loop
   
   Both callers write their request frames and then await the response for 
their own frame ID. Because the writes and reads are not mutually exclusive 
across threads, Caller A reads the response intended for Caller B (frame IDs 
arrive out of order), raising `RuntimeError: Response read out of order! Got 
frame.id=N, expect_id=N+1`.
   
   The specific trigger type that initiates the cross-thread send is any 
trigger that:
   - calls a synchronous method from its `async def run()` loop (via 
`asgiref.sync_to_async` or `greenback`)
   - and that synchronous method calls `TriggerCommsDecoder.send()` (e.g., via 
`task_runner.get_task_states`)
   
   In the observed incidents, `BigQueryTableExistenceTrigger` calling 
`safe_to_cancel` → `get_task_state` → `task_runner.get_task_states` is the 
initiating trigger. But the victim can be any trigger running concurrently in 
the same `TriggerRunner` subprocess.
   
   ## What you think should happen instead
   
   `TriggerCommsDecoder.asend()` should be safe to call from multiple threads 
simultaneously. The `asyncio.Lock` should be replaced with a `threading.Lock` 
(or a `threading.RLock`) that provides mutual exclusion across threads, not 
just within a single event loop. Alternatively, the communication channel could 
be restructured so that cross-thread sends use a different mechanism (e.g., 
`asyncio.run_coroutine_threadsafe` with the parent event loop rather than 
`async_to_sync`).
   
   A minimal fix would be:
   
   ```python
   async def asend(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
       frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
       bytes = frame.as_bytes()
   
       with self._thread_lock:             # threading.Lock — cross-thread safe
           self._async_writer.write(bytes)
           return await self._aget_response(frame.id)
   ```
   
   where `self._thread_lock = threading.Lock()` is added alongside the existing 
`asyncio.Lock`.
   
   Note: `self.id_counter` (a `itertools.count`) is also shared across threads; 
it should be verified as thread-safe (it is in CPython due to the GIL, but 
worth noting).
   
   ## How to reproduce
   
   1. Run a deployment with multiple deferrable operators active concurrently, 
including at least one trigger type that calls `task_runner.get_task_states` 
(or any synchronous SDK method) from inside `async def run()` via 
`sync_to_async` or `greenback`.
   2. Set `AIRFLOW__TRIGGERER__DEFAULT_CAPACITY` high (e.g., 1000) to maximize 
concurrency.
   3. Observe `RuntimeError: Response read out of order!` in triggerer logs 
when the race condition fires.
   4. Observe `Trigger runner failed` with the same error logged from `arun` 
when it propagates to the main loop.
   5. After the fatal crash, all deferred tasks remain stuck in `DEFERRED` 
state indefinitely; the triggerer pod is alive but serves only `404 Not Found` 
for trigger log requests.
   
   ## Relationship to existing issues
   
   This is related to but distinct from 
[#64213](https://github.com/apache/airflow/issues/64213), which covers a 
different `RuntimeError` from the same `TriggerCommsDecoder.send()` path (`Task 
got Future attached to a different loop`). Both issues share the same root 
cause (thread-unsafe `asyncio.Lock` in `asend`), but produce different error 
messages depending on which async/thread boundary is crossed first.
   
   ## Anything else
   
   - The error repeats across multiple `TriggerRunner` subprocess restarts 
(observed ~12 times in 11 hours before the fatal crash).
   - The fatal variant (`Trigger runner failed` propagating through `arun`) is 
more severe than non-fatal variants: the `TriggerRunner` subprocess does not 
restart, requiring a full triggerer pod restart to recover.
   - `AIRFLOW__TRIGGERER__DEFAULT_CAPACITY=1000` on a 0.5 vCPU pod was a 
contributing factor — higher concurrency increases the probability of the race 
condition.
   - Confirmed on Airflow 3.1.8 / Python 3.12 / `asgiref` 3.x / `greenback` 
installed.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to