hkc-8010 opened a new issue, #63929:
URL: https://github.com/apache/airflow/issues/63929
## Description
When deadline callbacks run in the **triggerer** process, connection lookups
via the async path (`Connection.async_get()` → `_async_get_connection()`) fail
with `AirflowNotFoundException` for connection IDs that **do** exist in the
Airflow metadata database (and are visible via `airflow connections get`). The
same connections resolve successfully from the **scheduler** when using the
sync CLI, but the async path used by deadline callback notifiers (e.g.
PagerDuty, SMTP) does not see them in the triggerer context.
This causes deadline alerts (e.g. "notify on DAG run timeout") to fail with
"The conn_id `X` isn't defined" even though the connection is defined in the
Airflow UI / metastore.
## Use case / impact
- Users configure DAG-level deadline callbacks (e.g.
`dag=DatasetTriggeredDAG(..., deadline=[pagerduty_deadline_alert(...)])`) to
get PagerDuty/email alerts when a run exceeds a time limit.
- The callback runs in the triggerer. It calls the provider’s async hook
(e.g. PagerDuty), which calls `get_async_connection(conn_id)` →
`BaseHook.aget_connection()` → `Connection.async_get()` →
`_async_get_connection()` in `airflow/sdk/execution_time/context.py`.
- `_async_get_connection()` uses `ensure_secrets_backend_loaded()` to decide
which backends to query. In the triggerer process, the effective context is the
**fallback** chain (no `SUPERVISOR_COMMS`, and triggerer does not set
`_AIRFLOW_PROCESS_CONTEXT=server`), so only `EnvironmentVariablesBackend` and
any configured external backends (e.g. AWS Secrets Manager) are
used—**MetastoreBackend is not included**.
- Connections stored only in the metadata DB (e.g. created/edited in the
Airflow UI) are therefore never found by the async path in the triggerer, and
the callback raises `AirflowNotFoundException`.
## What you expected to happen
Connections that exist in the Airflow metadata database (and are visible via
`airflow connections get`) should be resolvable when deadline callbacks run in
the triggerer, so that notifiers (PagerDuty, SMTP, etc.) can use the same
connection configuration as the rest of the deployment.
## What actually happened
- **Scheduler (sync path):** `airflow connections get <conn_id>` succeeds
and returns the connection (metastore is used by the sync path / server
context).
- **Triggerer (async path):** Inside the triggerer container,
`Connection.async_get(conn_id)` for the same `conn_id` raises
`AirflowNotFoundException: The conn_id '<conn_id>' isn't defined`.
- Deadline callbacks that use these connection IDs (e.g.
`pagerduty_events_conn_id`, `smtp_conn_id`) fail with the same exception when
the triggerer runs the callback.
## How to reproduce
1. Deploy Airflow 3.x with a triggerer and a secrets backend configuration
that includes both:
- MetastoreBackend (default for “server” context), and
- An optional external backend (e.g. AWS Secrets Manager).
2. Create a connection in the Airflow UI (metastore only), e.g.
`my_pagerduty` (type `pagerduty_events`).
3. Define a DAG with a deadline callback that uses that connection, e.g.:
- `deadline=[PagerDutyNotifier(pagerduty_events_conn_id="my_pagerduty",
...)]`
4. Trigger a run that hits the deadline so the triggerer executes the
callback.
5. Observe: the callback fails with `AirflowNotFoundException: The conn_id
'my_pagerduty' isn't defined`.
**In-container check (triggerer):**
```python
# In triggerer container (same process context as deadline callbacks):
from airflow.sdk.definitions.connection import Connection
import asyncio
asyncio.run(Connection.async_get("my_pagerduty")) # ->
AirflowNotFoundException
```
**CLI in same container:**
```bash
airflow connections get my_pagerduty # -> returns connection (CLI may use
different backend resolution)
```
So the async code path used by the triggerer does not see metastore-backed
connections.
## Environment
- **Airflow version:** 3.1.7
- **Runtime:** Astronomer Astro (Runtime 3.1-13); triggerer runs as separate
K8s deployment.
- **Secrets:**
`AIRFLOW__SECRETS__BACKEND=airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend`
with backend kwargs; connections also exist in metastore (Airflow UI).
- **Components:** Scheduler (sync/CLI can see metastore); Triggerer (async
path used by deadline callbacks cannot).
## Code references
- Async connection lookup used by deadline callbacks:
`airflow/task-sdk/src/airflow/sdk/execution_time/context.py` —
`_async_get_connection()` (lines ~180–225). It calls
`ensure_secrets_backend_loaded()` and iterates only over the returned backends;
if none return the connection, it raises `AirflowNotFoundException`.
- Backend selection:
`airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py` —
`ensure_secrets_backend_loaded()` (around 1933–1975).
- If `_AIRFLOW_PROCESS_CONTEXT=server`: uses default server chain
(includes MetastoreBackend).
- Otherwise (e.g. triggerer): uses “fallback” chain: only
`EnvironmentVariablesBackend` plus configured external backends;
**MetastoreBackend is not in the list**.
- Provider call path:
`airflow/providers/pagerduty/hooks/pagerduty_events.py` —
`get_integration_key()` calls
`get_async_connection(self.pagerduty_events_conn_id)` (around line 288). Same
pattern for SMTP and other notifiers used in deadline callbacks.
## Possible solutions
1. **Treat triggerer as server-like for secrets:** When loading secrets in
the process that runs the triggerer job, include MetastoreBackend in the
backend chain (e.g. by setting `_AIRFLOW_PROCESS_CONTEXT=server` for the
triggerer process or by explicitly including metastore in the “fallback” chain
for triggerer).
2. **Document the gap:** If the current behavior is intentional (e.g.
triggerer is considered a “worker” context), document that connections used by
deadline callbacks must be available from an external secrets backend (e.g. AWS
Secrets Manager), not only from the Airflow UI/metastore.
3. **Unify behavior:** Ensure that any context that runs user callbacks
(including triggerer) can resolve connections from the same sources as the
scheduler/webserver (including metastore), so that UI-defined connections work
for deadline notifiers without requiring duplication in an external backend.
## Additional context
- Multiple connection IDs observed failing in the same way: PagerDuty conns
and `smtp_default` (referenced by `AIRFLOW__EMAIL__EMAIL_CONN_ID`). All exist
in metastore and are visible via `airflow connections get` from
scheduler/triggerer CLI, but `Connection.async_get()` fails in the triggerer
process.
- Stack trace from logs points to:
`airflow/triggers/deadline.py` → notifier `async_notify()` → provider hook
`get_integration_key()` / `aget_connection()` → `Connection.async_get()` →
`_async_get_connection()` → raise `AirflowNotFoundException`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]