This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 930f733f270 Optimize public get_dag_runs endpoint (#63940)
930f733f270 is described below
commit 930f733f27011c319c679f3ef42f44091ab423cb
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Mon Mar 23 13:01:21 2026 +0100
Optimize public get_dag_runs endpoint (#63940)
---
.../src/airflow/api_fastapi/common/db/dag_runs.py | 109 +++++++++++++++++----
.../api_fastapi/core_api/routes/public/dag_run.py | 15 ++-
airflow-core/src/airflow/models/dagrun.py | 21 ++--
3 files changed, 114 insertions(+), 31 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
index 750a4ae61aa..72565c45ef2 100644
--- a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
@@ -17,8 +17,13 @@
from __future__ import annotations
-from sqlalchemy import func, select
-from sqlalchemy.orm import joinedload, selectinload
+from collections import defaultdict
+from collections.abc import Sequence
+from typing import TYPE_CHECKING
+from uuid import UUID
+
+from sqlalchemy import func, select, tuple_, union_all
+from sqlalchemy.orm import joinedload
from sqlalchemy.orm.interfaces import LoaderOption
from airflow.models.dag import DagModel
@@ -27,6 +32,9 @@ from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancehistory import TaskInstanceHistory
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
dagruns_select_with_state_count = (
select(
DagRun.__table__.c.dag_id,
@@ -40,26 +48,89 @@ dagruns_select_with_state_count = (
)
-def eager_load_dag_run_for_validation() -> tuple[LoaderOption, ...]:
+def eager_load_dag_run_for_list() -> tuple[LoaderOption, ...]:
"""
- Construct the eager loading options necessary for a DagRunResponse object.
-
- For the list endpoint (get_dag_runs), loading all task instance columns is
- wasteful because we only need the dag_version_id FK to traverse to
DagVersion.
- Using load_only() on TaskInstance and TaskInstanceHistory restricts the
SELECT
- to just the identity columns and dag_version_id, avoiding large
intermediate
- result sets caused by loading heavyweight columns (executor_config, etc.)
for
- every task instance across every DAG run returned by the query.
+ Lightweight eager loading for the DagRun list endpoint.
+
+ Only loads the direct relationships needed for serialization (dag_model,
+ dag_run_note, created_dag_version). The dag_versions property — which
+ requires iterating every TI and TIH — is populated separately by
+ :func:`attach_dag_versions_to_runs` using a single DISTINCT query.
"""
return (
joinedload(DagRun.dag_model),
- selectinload(DagRun.task_instances)
- .load_only(TaskInstance.dag_version_id)
- .joinedload(TaskInstance.dag_version)
- .joinedload(DagVersion.bundle),
- selectinload(DagRun.task_instances_histories)
- .load_only(TaskInstanceHistory.dag_version_id)
- .joinedload(TaskInstanceHistory.dag_version)
- .joinedload(DagVersion.bundle),
joinedload(DagRun.dag_run_note),
+ joinedload(DagRun.created_dag_version).joinedload(DagVersion.bundle),
)
+
+
+def attach_dag_versions_to_runs(dag_runs: Sequence[DagRun], *, session:
Session) -> None:
+ """
+ Prefetch distinct dag_version_ids for each DagRun via a lightweight query.
+
+ Instead of loading all TI and TIH rows (potentially thousands per run)
+ through the ORM relationship just to extract distinct dag_version_ids,
+ this issues a single query that returns only the distinct
+ (dag_id, run_id, dag_version_id) tuples for the given runs.
+
+ The result is attached to each DagRun as ``_prefetched_dag_version_ids``
+ (a dict mapping version_id -> DagVersion), which the ``dag_versions``
+ property reads as an optimized substitute for traversing TI/TIH
+ relationships. All business logic (bundle_version shortcut, sorting,
+ deduplication) remains solely in ``DagRun.dag_versions``.
+ """
+ if not dag_runs:
+ return
+
+ # Only runs without a bundle_version need TI/TIH traversal;
+ # runs with bundle_version use created_dag_version directly
+ # (handled by the dag_versions property).
+ runs_needing_versions = [dr for dr in dag_runs if not dr.bundle_version]
+ if not runs_needing_versions:
+ return
+
+ run_key_values = [(dr.dag_id, dr.run_id) for dr in runs_needing_versions]
+
+ ti_sub = (
+ select(
+ TaskInstance.dag_id,
+ TaskInstance.run_id,
+ TaskInstance.dag_version_id,
+ )
+ .where(TaskInstance.dag_version_id.isnot(None))
+ .where(tuple_(TaskInstance.dag_id,
TaskInstance.run_id).in_(run_key_values))
+ .distinct()
+ )
+ tih_sub = (
+ select(
+ TaskInstanceHistory.dag_id,
+ TaskInstanceHistory.run_id,
+ TaskInstanceHistory.dag_version_id,
+ )
+ .where(TaskInstanceHistory.dag_version_id.isnot(None))
+ .where(tuple_(TaskInstanceHistory.dag_id,
TaskInstanceHistory.run_id).in_(run_key_values))
+ .distinct()
+ )
+ combined = union_all(ti_sub, tih_sub).subquery()
+ rows = session.execute(
+ select(combined.c.dag_id, combined.c.run_id,
combined.c.dag_version_id).distinct()
+ ).all()
+
+ all_version_ids = {r.dag_version_id for r in rows}
+ versions_by_id: dict[UUID, DagVersion] = {}
+ if all_version_ids:
+ dv_query = (
+ select(DagVersion)
+ .where(DagVersion.id.in_(all_version_ids))
+ .options(joinedload(DagVersion.bundle))
+ )
+ versions_by_id = {dv.id: dv for dv in
session.scalars(dv_query).unique()}
+
+ versions_per_run: dict[tuple[str, str], dict[UUID, DagVersion]] =
defaultdict(dict)
+ for row in rows:
+ dv = versions_by_id.get(row.dag_version_id)
+ if dv:
+ versions_per_run[(row.dag_id, row.run_id)][dv.id] = dv
+
+ for dr in runs_needing_versions:
+ dr._prefetched_dag_version_ids = versions_per_run.get((dr.dag_id,
dr.run_id), {})
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 359809458cf..67f76307124 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -36,7 +36,10 @@ from airflow.api.common.mark_tasks import (
from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run,
get_latest_version_of_dag
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
-from airflow.api_fastapi.common.db.dag_runs import
eager_load_dag_run_for_validation
+from airflow.api_fastapi.common.db.dag_runs import (
+ attach_dag_versions_to_runs,
+ eager_load_dag_run_for_list,
+)
from airflow.api_fastapi.common.parameters import (
FilterOptionEnum,
FilterParam,
@@ -387,7 +390,7 @@ def get_dag_runs(
This endpoint allows specifying `~` as the dag_id to retrieve Dag Runs for
all DAGs.
"""
- query = select(DagRun).options(*eager_load_dag_run_for_validation())
+ query = select(DagRun).options(*eager_load_dag_run_for_list())
if dag_id != "~":
get_latest_version_of_dag(dag_bag, dag_id, session) # Check if the
DAG exists.
@@ -422,7 +425,8 @@ def get_dag_runs(
limit=limit,
session=session,
)
- dag_runs = session.scalars(dag_run_select)
+ dag_runs = list(session.scalars(dag_run_select).unique())
+ attach_dag_versions_to_runs(dag_runs, session=session)
return DAGRunCollectionResponse(
dag_runs=dag_runs,
@@ -635,7 +639,7 @@ def get_list_dag_runs_batch(
{"dag_run_id": "run_id"},
).set_value([body.order_by] if body.order_by else None)
- base_query = select(DagRun).options(*eager_load_dag_run_for_validation())
+ base_query = select(DagRun).options(*eager_load_dag_run_for_list())
dag_runs_select, total_entries = paginated_select(
statement=base_query,
@@ -656,7 +660,8 @@ def get_list_dag_runs_batch(
session=session,
)
- dag_runs = session.scalars(dag_runs_select)
+ dag_runs = list(session.scalars(dag_runs_select).unique())
+ attach_dag_versions_to_runs(dag_runs, session=session)
return DAGRunCollectionResponse(
dag_runs=dag_runs,
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index c37713da4d8..d32a54c15d6 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -315,6 +315,10 @@ class DagRun(Base, LoggingMixin):
_ti_dag_versions = association_proxy("task_instances", "dag_version")
_tih_dag_versions = association_proxy("task_instances_histories",
"dag_version")
+ # Can be set by manual prefetch, such as attach_dag_versions_to_runs()
+ # as an alternative to traversing all TI/TIH ORM relationships. Maps
version_id -> DagVersion.
+ _prefetched_dag_version_ids: dict[UUID, DagVersion] | None = None
+
def __init__(
self,
dag_id: str | None = None,
@@ -406,13 +410,16 @@ class DagRun(Base, LoggingMixin):
# when the dag is in a versioned bundle, we keep the dag version fixed
if self.bundle_version:
return [self.created_dag_version] if self.created_dag_version is
not None else []
- dag_versions = [
- dv
- for dv in dict.fromkeys(list(self._tih_dag_versions) +
list(self._ti_dag_versions))
- if dv is not None
- ]
- sorted_ = sorted(dag_versions, key=lambda dv: dv.id)
- return sorted_
+
+ if self._prefetched_dag_version_ids is not None:
+ dag_version_objects =
list(self._prefetched_dag_version_ids.values())
+ else:
+ dag_version_objects = [
+ dv
+ for dv in dict.fromkeys(list(self._tih_dag_versions) +
list(self._ti_dag_versions))
+ if dv is not None
+ ]
+ return sorted(dag_version_objects, key=lambda dv: dv.id)
@property
def version_number(self) -> int | None: