hkc-8010 opened a new issue, #63913:
URL: https://github.com/apache/airflow/issues/63913
### Apache Airflow version
3.1.6
### What happened?
When running multiple deferrable triggers that use `GoogleBaseAsyncHook`
(e.g. BigQuery, GCS, Dataflow, or any custom trigger inheriting from it), the
triggerer subprocess can enter a lock contention state where all deferred tasks
become stuck indefinitely. The triggerer logs show triggers as "running" but no
events are ever fired. After a triggerer restart, the problem immediately
recurs because all triggers simultaneously need fresh credentials.
**Observed behavior:**
- 14+ deferrable triggers using `GoogleBaseAsyncHook` running in a single
triggerer
- After a triggerer pod restart, ALL deferred tasks become stuck
- No errors appear in triggerer logs (consistent with a hang, not a crash)
- Clearing and re-running tasks results in them getting stuck again in
deferred state
- The triggerer enters a restart loop (3 restarts in ~11 minutes) until
manual intervention
- Event loop blocking of 3-4 seconds detected at each startup via
`block_watchdog`
### Root cause analysis
The issue is lock contention on the shared `asyncio.Lock` inside
`TriggerCommsDecoder.asend()` when multiple triggers concurrently call
`get_connection()` through `SUPERVISOR_COMMS`.
**The call chain:**
1. Trigger calls `GoogleBaseAsyncHook.get_sync_hook()`:
```python
# GoogleBaseAsyncHook.get_sync_hook()
self._sync_hook = await
sync_to_async(self.sync_hook_class)(**self._hook_kwargs)
```
2. `GoogleBaseHook.__init__()` calls `get_connection()` **synchronously** in
the thread pool:
```python
# GoogleBaseHook.__init__
self.extras: dict = self.get_connection(self.gcp_conn_id).extra_dejson
```
3. In the triggerer subprocess, `get_connection()` routes through the Task
SDK to `SUPERVISOR_COMMS.send()`:
```python
# TriggerCommsDecoder.send
def send(self, msg):
from asgiref.sync import async_to_sync
return async_to_sync(self.asend)(msg)
```
4. `asend()` holds a **shared lock** for the entire request-response
round-trip to the supervisor process:
```python
# TriggerCommsDecoder.asend
async def asend(self, msg):
frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
bytes = frame.as_bytes()
async with self._lock: # lock held for
entire round-trip
self._async_writer.write(bytes)
return await self._aget_response(frame.id) # waits for
supervisor response
```
**After a triggerer restart**, all trigger instances have empty credential
caches. They all call `get_sync_hook()` → `get_connection()` →
`SUPERVISOR_COMMS.send()` concurrently. Since `asend()` serializes on
`self._lock`, the requests queue up.
Each round-trip takes the supervisor's processing interval (~1 second via
`_service_subprocess(1)`). With N triggers, the queue takes **~N seconds** to
drain. During this window:
- Each trigger also subsequently calls
`sync_to_async(sync_hook.get_credentials)()` which internally calls
`_get_field()` → more `SUPERVISOR_COMMS` round-trips
- The trigger runner's `sync_state_to_supervisor()` also needs the same lock
to send state changes back to the supervisor — it's blocked behind the queued
`GetConnection` requests
- The supervisor process cannot complete its heartbeat cycle while blocked
serving requests
- If the contention window exceeds the liveness probe threshold (30s by
default), Kubernetes restarts the pod, and the cycle repeats
### What you think should happen instead?
Triggers using `GoogleBaseAsyncHook` should be able to refresh credentials
concurrently without blocking the triggerer's internal communication channel.
The lock contention should not be able to starve `sync_state_to_supervisor()`.
### Suggested fixes
**Option A: Make `GoogleBaseAsyncHook` pre-fetch the connection
asynchronously**
Instead of creating the sync hook via `sync_to_async(GoogleBaseHook)(...)`
(which calls `get_connection()` synchronously in a thread), fetch the
connection on the main event loop first using `SUPERVISOR_COMMS.asend()`, then
inject it into the sync hook to bypass the synchronous `get_connection()` call
entirely.
**Option B: Per-request future pattern instead of global lock in
`TriggerCommsDecoder`**
Replace the global `_lock` in `asend()` with a per-request `Future` pattern.
Write requests without holding a lock, tag each with a request ID, and dispatch
responses to the correct `Future` based on the ID. This was suggested in the
discussion on #50185 by @x42005e1f.
**Option C: Connection caching in `GoogleBaseAsyncHook`**
Cache the connection result at the class level (keyed by `conn_id`) so that
only the first trigger instance pays the `SUPERVISOR_COMMS` round-trip cost.
Subsequent instances reuse the cached connection.
### How to reproduce
1. Create 15+ DAGs each with a deferrable task that uses any
`GoogleBaseAsyncHook`-based trigger (e.g. `BigQueryInsertJobTrigger`, or a
custom trigger inheriting from `GoogleBaseAsyncHook`)
2. Run all DAGs so that 15+ triggers are active simultaneously
3. Restart the triggerer pod (or kill it to simulate a crash)
4. Observe that after restart, all deferred tasks become stuck — triggers
show as "running" in the status log but no events are ever fired
5. Clearing individual tasks and re-running them results in them entering
deferred state and getting stuck again
The issue is probabilistic and depends on the number of concurrent triggers
and the supervisor's processing speed. With ~14 triggers it reproduces
consistently.
### Related issues
- #50185 — "Trigger runner process locked with multiple Workflow triggers"
(closed, milestone 3.0.3). The fix in #51924 changed the communication protocol
from line-based to length-prefixed, which addressed `LimitOverrunError` but did
**not** address the `_lock` contention in `asend()`.
- #63760 — "DatabricksExecutionTrigger raises RuntimeError: AsyncToSync in
the same thread as an async event loop in Airflow 3" (open). Same class of bug
— synchronous connection lookups from within the triggerer's async event loop.
### Operating System
Linux (GKE)
### Versions of Apache Airflow Providers
apache-airflow-providers-google (version included with Runtime 3.1-11 /
Airflow 3.1.6)
### Deployment
Astronomer
### Deployment details
- Airflow 3.1.6 (Astro Runtime 3.1-11)
- Single triggerer pod (1 replica)
- 14+ deferrable triggers using GoogleBaseAsyncHook
- GKE with workload identity
- greenback 1.2.1, asgiref 3.9.2
- Default thread pool: 12 workers (8 CPU)
- Liveness probe: 30s period, 30s heartbeat threshold, 10 failure threshold
### Anything else?
The `_lock` contention window scales linearly with the number of triggers.
For deployments with many deferrable Google-based tasks (which is the
recommended pattern), this becomes a reliability issue — any triggerer restart
causes a cascading failure where all deferred tasks hang until manual
intervention.
The workaround that partially helps is running multiple triggerer replicas
(`AIRFLOW__TRIGGERER__DEFAULT_CAPACITY` split across pods), which distributes
the lock contention across pods and provides redundancy when one pod hits the
contention window.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]