hkc-8010 opened a new issue, #63913:
URL: https://github.com/apache/airflow/issues/63913

   ### Apache Airflow version
   
   3.1.6
   
   ### What happened?
   
   When running multiple deferrable triggers that use `GoogleBaseAsyncHook` 
(e.g. BigQuery, GCS, Dataflow, or any custom trigger inheriting from it), the 
triggerer subprocess can enter a lock contention state where all deferred tasks 
become stuck indefinitely. The triggerer logs show triggers as "running" but no 
events are ever fired. After a triggerer restart, the problem immediately 
recurs because all triggers simultaneously need fresh credentials.
   
   **Observed behavior:**
   - 14+ deferrable triggers using `GoogleBaseAsyncHook` running in a single 
triggerer
   - After a triggerer pod restart, ALL deferred tasks become stuck
   - No errors appear in triggerer logs (consistent with a hang, not a crash)
   - Clearing and re-running tasks results in them getting stuck again in 
deferred state
   - The triggerer enters a restart loop (3 restarts in ~11 minutes) until 
manual intervention
   - Event loop blocking of 3-4 seconds detected at each startup via 
`block_watchdog`
   
   ### Root cause analysis
   
   The issue is lock contention on the shared `asyncio.Lock` inside 
`TriggerCommsDecoder.asend()` when multiple triggers concurrently call 
`get_connection()` through `SUPERVISOR_COMMS`.
   
   **The call chain:**
   
   1. Trigger calls `GoogleBaseAsyncHook.get_sync_hook()`:
      ```python
      # GoogleBaseAsyncHook.get_sync_hook()
      self._sync_hook = await 
sync_to_async(self.sync_hook_class)(**self._hook_kwargs)
      ```
   
   2. `GoogleBaseHook.__init__()` calls `get_connection()` **synchronously** in 
the thread pool:
      ```python
      # GoogleBaseHook.__init__
      self.extras: dict = self.get_connection(self.gcp_conn_id).extra_dejson
      ```
   
   3. In the triggerer subprocess, `get_connection()` routes through the Task 
SDK to `SUPERVISOR_COMMS.send()`:
      ```python
      # TriggerCommsDecoder.send
      def send(self, msg):
          from asgiref.sync import async_to_sync
          return async_to_sync(self.asend)(msg)
      ```
   
   4. `asend()` holds a **shared lock** for the entire request-response 
round-trip to the supervisor process:
      ```python
      # TriggerCommsDecoder.asend
      async def asend(self, msg):
          frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
          bytes = frame.as_bytes()
          async with self._lock:                          # lock held for 
entire round-trip
              self._async_writer.write(bytes)
              return await self._aget_response(frame.id)  # waits for 
supervisor response
      ```
   
   **After a triggerer restart**, all trigger instances have empty credential 
caches. They all call `get_sync_hook()` → `get_connection()` → 
`SUPERVISOR_COMMS.send()` concurrently. Since `asend()` serializes on 
`self._lock`, the requests queue up.
   
   Each round-trip takes the supervisor's processing interval (~1 second via 
`_service_subprocess(1)`). With N triggers, the queue takes **~N seconds** to 
drain. During this window:
   
   - Each trigger also subsequently calls 
`sync_to_async(sync_hook.get_credentials)()` which internally calls 
`_get_field()` → more `SUPERVISOR_COMMS` round-trips
   - The trigger runner's `sync_state_to_supervisor()` also needs the same lock 
to send state changes back to the supervisor — it's blocked behind the queued 
`GetConnection` requests
   - The supervisor process cannot complete its heartbeat cycle while blocked 
serving requests
   - If the contention window exceeds the liveness probe threshold (30s by 
default), Kubernetes restarts the pod, and the cycle repeats
   
   ### What you think should happen instead?
   
   Triggers using `GoogleBaseAsyncHook` should be able to refresh credentials 
concurrently without blocking the triggerer's internal communication channel. 
The lock contention should not be able to starve `sync_state_to_supervisor()`.
   
   ### Suggested fixes
   
   **Option A: Make `GoogleBaseAsyncHook` pre-fetch the connection 
asynchronously**
   
   Instead of creating the sync hook via `sync_to_async(GoogleBaseHook)(...)` 
(which calls `get_connection()` synchronously in a thread), fetch the 
connection on the main event loop first using `SUPERVISOR_COMMS.asend()`, then 
inject it into the sync hook to bypass the synchronous `get_connection()` call 
entirely.
   
   **Option B: Per-request future pattern instead of global lock in 
`TriggerCommsDecoder`**
   
   Replace the global `_lock` in `asend()` with a per-request `Future` pattern. 
Write requests without holding a lock, tag each with a request ID, and dispatch 
responses to the correct `Future` based on the ID. This was suggested in the 
discussion on #50185 by @x42005e1f.
   
   **Option C: Connection caching in `GoogleBaseAsyncHook`**
   
   Cache the connection result at the class level (keyed by `conn_id`) so that 
only the first trigger instance pays the `SUPERVISOR_COMMS` round-trip cost. 
Subsequent instances reuse the cached connection.
   
   ### How to reproduce
   
   1. Create 15+ DAGs each with a deferrable task that uses any 
`GoogleBaseAsyncHook`-based trigger (e.g. `BigQueryInsertJobTrigger`, or a 
custom trigger inheriting from `GoogleBaseAsyncHook`)
   2. Run all DAGs so that 15+ triggers are active simultaneously
   3. Restart the triggerer pod (or kill it to simulate a crash)
   4. Observe that after restart, all deferred tasks become stuck — triggers 
show as "running" in the status log but no events are ever fired
   5. Clearing individual tasks and re-running them results in them entering 
deferred state and getting stuck again
   
   The issue is probabilistic and depends on the number of concurrent triggers 
and the supervisor's processing speed. With ~14 triggers it reproduces 
consistently.
   
   ### Related issues
   
   - #50185 — "Trigger runner process locked with multiple Workflow triggers" 
(closed, milestone 3.0.3). The fix in #51924 changed the communication protocol 
from line-based to length-prefixed, which addressed `LimitOverrunError` but did 
**not** address the `_lock` contention in `asend()`.
   - #63760 — "DatabricksExecutionTrigger raises RuntimeError: AsyncToSync in 
the same thread as an async event loop in Airflow 3" (open). Same class of bug 
— synchronous connection lookups from within the triggerer's async event loop.
   
   ### Operating System
   
   Linux (GKE)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-google (version included with Runtime 3.1-11 / 
Airflow 3.1.6)
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   - Airflow 3.1.6 (Astro Runtime 3.1-11)
   - Single triggerer pod (1 replica)
   - 14+ deferrable triggers using GoogleBaseAsyncHook
   - GKE with workload identity
   - greenback 1.2.1, asgiref 3.9.2
   - Default thread pool: 12 workers (8 CPU)
   - Liveness probe: 30s period, 30s heartbeat threshold, 10 failure threshold
   
   ### Anything else?
   
   The `_lock` contention window scales linearly with the number of triggers. 
For deployments with many deferrable Google-based tasks (which is the 
recommended pattern), this becomes a reliability issue — any triggerer restart 
causes a cascading failure where all deferred tasks hang until manual 
intervention.
   
   The workaround that partially helps is running multiple triggerer replicas 
(`AIRFLOW__TRIGGERER__DEFAULT_CAPACITY` split across pods), which distributes 
the lock contention across pods and provides redundancy when one pod hits the 
contention window.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to