hkc-8010 opened a new issue, #63929:
URL: https://github.com/apache/airflow/issues/63929

   ## Description
   
   When deadline callbacks run in the **triggerer** process, connection lookups 
via the async path (`Connection.async_get()` → `_async_get_connection()`) fail 
with `AirflowNotFoundException` for connection IDs that **do** exist in the 
Airflow metadata database (and are visible via `airflow connections get`). The 
same connections resolve successfully from the **scheduler** when using the 
sync CLI, but the async path used by deadline callback notifiers (e.g. 
PagerDuty, SMTP) does not see them in the triggerer context.
   
   This causes deadline alerts (e.g. "notify on DAG run timeout") to fail with 
"The conn_id `X` isn't defined" even though the connection is defined in the 
Airflow UI / metastore.
   
   ## Use case / impact
   
   - Users configure DAG-level deadline callbacks (e.g. 
`dag=DatasetTriggeredDAG(..., deadline=[pagerduty_deadline_alert(...)])`) to 
get PagerDuty/email alerts when a run exceeds a time limit.
   - The callback runs in the triggerer. It calls the provider’s async hook 
(e.g. PagerDuty), which calls `get_async_connection(conn_id)` → 
`BaseHook.aget_connection()` → `Connection.async_get()` → 
`_async_get_connection()` in `airflow/sdk/execution_time/context.py`.
   - `_async_get_connection()` uses `ensure_secrets_backend_loaded()` to decide 
which backends to query. In the triggerer process, the effective context is the 
**fallback** chain (no `SUPERVISOR_COMMS`, and triggerer does not set 
`_AIRFLOW_PROCESS_CONTEXT=server`), so only `EnvironmentVariablesBackend` and 
any configured external backends (e.g. AWS Secrets Manager) are 
used—**MetastoreBackend is not included**.
   - Connections stored only in the metadata DB (e.g. created/edited in the 
Airflow UI) are therefore never found by the async path in the triggerer, and 
the callback raises `AirflowNotFoundException`.
   
   ## What you expected to happen
   
   Connections that exist in the Airflow metadata database (and are visible via 
`airflow connections get`) should be resolvable when deadline callbacks run in 
the triggerer, so that notifiers (PagerDuty, SMTP, etc.) can use the same 
connection configuration as the rest of the deployment.
   
   ## What actually happened
   
   - **Scheduler (sync path):** `airflow connections get <conn_id>` succeeds 
and returns the connection (metastore is used by the sync path / server 
context).
   - **Triggerer (async path):** Inside the triggerer container, 
`Connection.async_get(conn_id)` for the same `conn_id` raises 
`AirflowNotFoundException: The conn_id '<conn_id>' isn't defined`.
   - Deadline callbacks that use these connection IDs (e.g. 
`pagerduty_events_conn_id`, `smtp_conn_id`) fail with the same exception when 
the triggerer runs the callback.
   
   ## How to reproduce
   
   1. Deploy Airflow 3.x with a triggerer and a secrets backend configuration 
that includes both:
      - MetastoreBackend (default for “server” context), and
      - An optional external backend (e.g. AWS Secrets Manager).
   2. Create a connection in the Airflow UI (metastore only), e.g. 
`my_pagerduty` (type `pagerduty_events`).
   3. Define a DAG with a deadline callback that uses that connection, e.g.:
      - `deadline=[PagerDutyNotifier(pagerduty_events_conn_id="my_pagerduty", 
...)]`
   4. Trigger a run that hits the deadline so the triggerer executes the 
callback.
   5. Observe: the callback fails with `AirflowNotFoundException: The conn_id 
'my_pagerduty' isn't defined`.
   
   **In-container check (triggerer):**
   
   ```python
   # In triggerer container (same process context as deadline callbacks):
   from airflow.sdk.definitions.connection import Connection
   import asyncio
   asyncio.run(Connection.async_get("my_pagerduty"))  # -> 
AirflowNotFoundException
   ```
   
   **CLI in same container:**
   
   ```bash
   airflow connections get my_pagerduty   # -> returns connection (CLI may use 
different backend resolution)
   ```
   
   So the async code path used by the triggerer does not see metastore-backed 
connections.
   
   ## Environment
   
   - **Airflow version:** 3.1.7
   - **Runtime:** Astronomer Astro (Runtime 3.1-13); triggerer runs as separate 
K8s deployment.
   - **Secrets:** 
`AIRFLOW__SECRETS__BACKEND=airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend`
 with backend kwargs; connections also exist in metastore (Airflow UI).
   - **Components:** Scheduler (sync/CLI can see metastore); Triggerer (async 
path used by deadline callbacks cannot).
   
   ## Code references
   
   - Async connection lookup used by deadline callbacks:  
     `airflow/task-sdk/src/airflow/sdk/execution_time/context.py` — 
`_async_get_connection()` (lines ~180–225). It calls 
`ensure_secrets_backend_loaded()` and iterates only over the returned backends; 
if none return the connection, it raises `AirflowNotFoundException`.
   - Backend selection:  
     `airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py` — 
`ensure_secrets_backend_loaded()` (around 1933–1975).  
     - If `_AIRFLOW_PROCESS_CONTEXT=server`: uses default server chain 
(includes MetastoreBackend).  
     - Otherwise (e.g. triggerer): uses “fallback” chain: only 
`EnvironmentVariablesBackend` plus configured external backends; 
**MetastoreBackend is not in the list**.
   - Provider call path:  
     `airflow/providers/pagerduty/hooks/pagerduty_events.py` — 
`get_integration_key()` calls 
`get_async_connection(self.pagerduty_events_conn_id)` (around line 288). Same 
pattern for SMTP and other notifiers used in deadline callbacks.
   
   ## Possible solutions
   
   1. **Treat triggerer as server-like for secrets:** When loading secrets in 
the process that runs the triggerer job, include MetastoreBackend in the 
backend chain (e.g. by setting `_AIRFLOW_PROCESS_CONTEXT=server` for the 
triggerer process or by explicitly including metastore in the “fallback” chain 
for triggerer).
   2. **Document the gap:** If the current behavior is intentional (e.g. 
triggerer is considered a “worker” context), document that connections used by 
deadline callbacks must be available from an external secrets backend (e.g. AWS 
Secrets Manager), not only from the Airflow UI/metastore.
   3. **Unify behavior:** Ensure that any context that runs user callbacks 
(including triggerer) can resolve connections from the same sources as the 
scheduler/webserver (including metastore), so that UI-defined connections work 
for deadline notifiers without requiring duplication in an external backend.
   
   ## Additional context
   
   - Multiple connection IDs observed failing in the same way: PagerDuty conns 
and `smtp_default` (referenced by `AIRFLOW__EMAIL__EMAIL_CONN_ID`). All exist 
in metastore and are visible via `airflow connections get` from 
scheduler/triggerer CLI, but `Connection.async_get()` fails in the triggerer 
process.
   - Stack trace from logs points to:  
     `airflow/triggers/deadline.py` → notifier `async_notify()` → provider hook 
`get_integration_key()` / `aget_connection()` → `Connection.async_get()` → 
`_async_get_connection()` → raise `AirflowNotFoundException`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to