hkc-8010 opened a new issue, #64620:
URL: https://github.com/apache/airflow/issues/64620
## Apache Airflow Provider(s)
core (triggerer)
## Versions of Apache Airflow Providers
Observed in Apache Airflow 3.1.8 (Astronomer Runtime 3.1-14).
## Apache Airflow version
3.1.8
## Operating System
Debian GNU/Linux 12 (bookworm)
## Deployment
Astronomer (managed)
## Deployment details
Managed Astronomer deployment with triggerer enabled, using deferrable
operators including `CloudDataTransferServiceS3ToGCSOperator` (Google Cloud
Storage Transfer Service) and BigQuery deferrable operators. Single triggerer
replica, `DEFAULT_CAPACITY=1000`, 0.5 vCPU / 1.92 GiB memory.
## What happened
The triggerer's `TriggerRunner` subprocess crashes with `RuntimeError:
Response read out of order! Got frame.id=N, expect_id=N+1` raised from
`TriggerCommsDecoder._aget_response`. When this exception propagates up through
`TriggerRunner.arun()` → `sync_state_to_supervisor()`, it kills the entire
`TriggerRunner` subprocess rather than just the individual failing trigger.
After this fatal crash, the triggerer pod remains alive (its HTTP server
continues serving requests) but the `TriggerRunner` subprocess does not
restart, leaving all deferred tasks stuck indefinitely.
The error was observed repeatedly — at least a dozen times over an 11-hour
window — across multiple `TriggerRunner` subprocess restarts. The final fatal
crash propagated to `arun()` itself and the subprocess did not recover:
```
2026-04-02T05:43:00.310571Z [error] Trigger runner failed
[airflow.jobs.triggerer_job_runner]
RuntimeError: Response read out of order! Got frame.id=18314, expect_id=18315
File triggerer_job_runner.py:880, in arun
File triggerer_job_runner.py:1090, in sync_state_to_supervisor
File triggerer_job_runner.py:1101, in asend
File triggerer_job_runner.py:801, in asend
File triggerer_job_runner.py:791, in _aget_response
```
An earlier instance at 02:37:35 UTC shows the full call chain, originating
from a BigQuery trigger calling a synchronous method from within an async
context:
```
RuntimeError: Response read out of order! Got frame.id=25902, expect_id=25903
task: <Task finished
name='kingpd-dimensions/manual__2026-04-01T00:00:00+00:00/d_kingpd_flavour_f_act_sum.insert-step-1/-1/1
(ID 158353)'
exception=RuntimeError('Response read out of order! Got
frame.id=25902, expect_id=25903')>
Full traceback:
greenback/_impl.py:116 greenback_shim
greenback/_impl.py:201 _greenback_shim
greenback/_impl.py:81 trampoline
outcome/_impl.py:185 send
triggerer_job_runner.py:1152 run_trigger
providers/google/cloud/triggers/bigquery.py:199 run
providers/google/cloud/triggers/bigquery.py:157 safe_to_cancel
providers/google/cloud/triggers/bigquery.py:131 get_task_state
asgiref/sync.py:439 __call__ ← sync_to_async wrapping a
sync fn
greenback/_impl.py:210 _greenback_shim
concurrent/futures/thread.py:59 run ← runs in thread pool
asgiref/sync.py:491 thread_handler
sdk/execution_time/task_runner.py:514 get_task_states
triggerer_job_runner.py:772 send ← TriggerCommsDecoder.send()
asgiref/sync.py:262 __call__ ← async_to_sync()
concurrent/futures/_base.py:449 result
concurrent/futures/_base.py:401 __get_result
asgiref/sync.py:300 main_wrap
triggerer_job_runner.py:801 asend
triggerer_job_runner.py:791 _aget_response ← frame.id mismatch raised
here
```
After the fatal 05:43 crash, the triggerer pod served only HTTP `404 Not
Found` responses for trigger log requests — confirming no triggers were being
executed — for the remainder of the observation window (07:30–07:53+ UTC).
## Root Cause Analysis
The bug is a **thread-safety violation in `TriggerCommsDecoder.asend()`**.
```python
# triggerer_job_runner.py
async def asend(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
bytes = frame.as_bytes()
async with self._async_lock: # ← asyncio.Lock: only safe within
ONE event loop
self._async_writer.write(bytes)
return await self._aget_response(frame.id)
```
`TriggerCommsDecoder.send()` calls `async_to_sync(self.asend)(msg)`. When
`async_to_sync` is called from a thread (e.g., from `asgiref.sync_to_async`
running a synchronous method of a trigger), it spins up a **new asyncio event
loop in that thread**. The `asyncio.Lock` (`self._async_lock`) is bound to a
single event loop and provides **no mutual exclusion across threads** — it only
serializes coroutines within the same event loop.
When two concurrent callers invoke `TriggerCommsDecoder.send()`:
1. **Caller A**: `sync_state_to_supervisor()` from the main
`TriggerRunner.arun()` event loop
2. **Caller B**: a BigQuery trigger's `get_task_state()` via `sync_to_async`
→ thread → `async_to_sync(asend)` on a separate event loop
Both callers write their request frames and then await the response for
their own frame ID. Because the writes and reads are not mutually exclusive
across threads, Caller A reads the response intended for Caller B (frame IDs
arrive out of order), raising `RuntimeError: Response read out of order! Got
frame.id=N, expect_id=N+1`.
The specific trigger type that initiates the cross-thread send is any
trigger that:
- calls a synchronous method from its `async def run()` loop (via
`asgiref.sync_to_async` or `greenback`)
- and that synchronous method calls `TriggerCommsDecoder.send()` (e.g., via
`task_runner.get_task_states`)
In the observed incidents, `BigQueryTableExistenceTrigger` calling
`safe_to_cancel` → `get_task_state` → `task_runner.get_task_states` is the
initiating trigger. But the victim can be any trigger running concurrently in
the same `TriggerRunner` subprocess.
## What you think should happen instead
`TriggerCommsDecoder.asend()` should be safe to call from multiple threads
simultaneously. The `asyncio.Lock` should be replaced with a `threading.Lock`
(or a `threading.RLock`) that provides mutual exclusion across threads, not
just within a single event loop. Alternatively, the communication channel could
be restructured so that cross-thread sends use a different mechanism (e.g.,
`asyncio.run_coroutine_threadsafe` with the parent event loop rather than
`async_to_sync`).
A minimal fix would be:
```python
async def asend(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
bytes = frame.as_bytes()
with self._thread_lock: # threading.Lock — cross-thread safe
self._async_writer.write(bytes)
return await self._aget_response(frame.id)
```
where `self._thread_lock = threading.Lock()` is added alongside the existing
`asyncio.Lock`.
Note: `self.id_counter` (a `itertools.count`) is also shared across threads;
it should be verified as thread-safe (it is in CPython due to the GIL, but
worth noting).
## How to reproduce
1. Run a deployment with multiple deferrable operators active concurrently,
including at least one trigger type that calls `task_runner.get_task_states`
(or any synchronous SDK method) from inside `async def run()` via
`sync_to_async` or `greenback`.
2. Set `AIRFLOW__TRIGGERER__DEFAULT_CAPACITY` high (e.g., 1000) to maximize
concurrency.
3. Observe `RuntimeError: Response read out of order!` in triggerer logs
when the race condition fires.
4. Observe `Trigger runner failed` with the same error logged from `arun`
when it propagates to the main loop.
5. After the fatal crash, all deferred tasks remain stuck in `DEFERRED`
state indefinitely; the triggerer pod is alive but serves only `404 Not Found`
for trigger log requests.
## Relationship to existing issues
This is related to but distinct from
[#64213](https://github.com/apache/airflow/issues/64213), which covers a
different `RuntimeError` from the same `TriggerCommsDecoder.send()` path (`Task
got Future attached to a different loop`). Both issues share the same root
cause (thread-unsafe `asyncio.Lock` in `asend`), but produce different error
messages depending on which async/thread boundary is crossed first.
## Anything else
- The error repeats across multiple `TriggerRunner` subprocess restarts
(observed ~12 times in 11 hours before the fatal crash).
- The fatal variant (`Trigger runner failed` propagating through `arun`) is
more severe than non-fatal variants: the `TriggerRunner` subprocess does not
restart, requiring a full triggerer pod restart to recover.
- `AIRFLOW__TRIGGERER__DEFAULT_CAPACITY=1000` on a 0.5 vCPU pod was a
contributing factor — higher concurrency increases the probability of the race
condition.
- Confirmed on Airflow 3.1.8 / Python 3.12 / `asgiref` 3.x / `greenback`
installed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]