This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 930f733f270 Optimize public get_dag_runs endpoint (#63940)
930f733f270 is described below

commit 930f733f27011c319c679f3ef42f44091ab423cb
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Mon Mar 23 13:01:21 2026 +0100

    Optimize public get_dag_runs endpoint (#63940)
---
 .../src/airflow/api_fastapi/common/db/dag_runs.py  | 109 +++++++++++++++++----
 .../api_fastapi/core_api/routes/public/dag_run.py  |  15 ++-
 airflow-core/src/airflow/models/dagrun.py          |  21 ++--
 3 files changed, 114 insertions(+), 31 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py 
b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
index 750a4ae61aa..72565c45ef2 100644
--- a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
@@ -17,8 +17,13 @@
 
 from __future__ import annotations
 
-from sqlalchemy import func, select
-from sqlalchemy.orm import joinedload, selectinload
+from collections import defaultdict
+from collections.abc import Sequence
+from typing import TYPE_CHECKING
+from uuid import UUID
+
+from sqlalchemy import func, select, tuple_, union_all
+from sqlalchemy.orm import joinedload
 from sqlalchemy.orm.interfaces import LoaderOption
 
 from airflow.models.dag import DagModel
@@ -27,6 +32,9 @@ from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
 from airflow.models.taskinstancehistory import TaskInstanceHistory
 
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
+
 dagruns_select_with_state_count = (
     select(
         DagRun.__table__.c.dag_id,
@@ -40,26 +48,89 @@ dagruns_select_with_state_count = (
 )
 
 
-def eager_load_dag_run_for_validation() -> tuple[LoaderOption, ...]:
+def eager_load_dag_run_for_list() -> tuple[LoaderOption, ...]:
     """
-    Construct the eager loading options necessary for a DagRunResponse object.
-
-    For the list endpoint (get_dag_runs), loading all task instance columns is
-    wasteful because we only need the dag_version_id FK to traverse to 
DagVersion.
-    Using load_only() on TaskInstance and TaskInstanceHistory restricts the 
SELECT
-    to just the identity columns and dag_version_id, avoiding large 
intermediate
-    result sets caused by loading heavyweight columns (executor_config, etc.) 
for
-    every task instance across every DAG run returned by the query.
+    Lightweight eager loading for the DagRun list endpoint.
+
+    Only loads the direct relationships needed for serialization (dag_model,
+    dag_run_note, created_dag_version).  The dag_versions property — which
+    requires iterating every TI and TIH — is populated separately by
+    :func:`attach_dag_versions_to_runs` using a single DISTINCT query.
     """
     return (
         joinedload(DagRun.dag_model),
-        selectinload(DagRun.task_instances)
-        .load_only(TaskInstance.dag_version_id)
-        .joinedload(TaskInstance.dag_version)
-        .joinedload(DagVersion.bundle),
-        selectinload(DagRun.task_instances_histories)
-        .load_only(TaskInstanceHistory.dag_version_id)
-        .joinedload(TaskInstanceHistory.dag_version)
-        .joinedload(DagVersion.bundle),
         joinedload(DagRun.dag_run_note),
+        joinedload(DagRun.created_dag_version).joinedload(DagVersion.bundle),
     )
+
+
+def attach_dag_versions_to_runs(dag_runs: Sequence[DagRun], *, session: 
Session) -> None:
+    """
+    Prefetch distinct dag_version_ids for each DagRun via a lightweight query.
+
+    Instead of loading all TI and TIH rows (potentially thousands per run)
+    through the ORM relationship just to extract distinct dag_version_ids,
+    this issues a single query that returns only the distinct
+    (dag_id, run_id, dag_version_id) tuples for the given runs.
+
+    The result is attached to each DagRun as ``_prefetched_dag_version_ids``
+    (a dict mapping version_id -> DagVersion), which the ``dag_versions``
+    property reads as an optimized substitute for traversing TI/TIH
+    relationships.  All business logic (bundle_version shortcut, sorting,
+    deduplication) remains solely in ``DagRun.dag_versions``.
+    """
+    if not dag_runs:
+        return
+
+    # Only runs without a bundle_version need TI/TIH traversal;
+    # runs with bundle_version use created_dag_version directly
+    # (handled by the dag_versions property).
+    runs_needing_versions = [dr for dr in dag_runs if not dr.bundle_version]
+    if not runs_needing_versions:
+        return
+
+    run_key_values = [(dr.dag_id, dr.run_id) for dr in runs_needing_versions]
+
+    ti_sub = (
+        select(
+            TaskInstance.dag_id,
+            TaskInstance.run_id,
+            TaskInstance.dag_version_id,
+        )
+        .where(TaskInstance.dag_version_id.isnot(None))
+        .where(tuple_(TaskInstance.dag_id, 
TaskInstance.run_id).in_(run_key_values))
+        .distinct()
+    )
+    tih_sub = (
+        select(
+            TaskInstanceHistory.dag_id,
+            TaskInstanceHistory.run_id,
+            TaskInstanceHistory.dag_version_id,
+        )
+        .where(TaskInstanceHistory.dag_version_id.isnot(None))
+        .where(tuple_(TaskInstanceHistory.dag_id, 
TaskInstanceHistory.run_id).in_(run_key_values))
+        .distinct()
+    )
+    combined = union_all(ti_sub, tih_sub).subquery()
+    rows = session.execute(
+        select(combined.c.dag_id, combined.c.run_id, 
combined.c.dag_version_id).distinct()
+    ).all()
+
+    all_version_ids = {r.dag_version_id for r in rows}
+    versions_by_id: dict[UUID, DagVersion] = {}
+    if all_version_ids:
+        dv_query = (
+            select(DagVersion)
+            .where(DagVersion.id.in_(all_version_ids))
+            .options(joinedload(DagVersion.bundle))
+        )
+        versions_by_id = {dv.id: dv for dv in 
session.scalars(dv_query).unique()}
+
+    versions_per_run: dict[tuple[str, str], dict[UUID, DagVersion]] = 
defaultdict(dict)
+    for row in rows:
+        dv = versions_by_id.get(row.dag_version_id)
+        if dv:
+            versions_per_run[(row.dag_id, row.run_id)][dv.id] = dv
+
+    for dr in runs_needing_versions:
+        dr._prefetched_dag_version_ids = versions_per_run.get((dr.dag_id, 
dr.run_id), {})
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 359809458cf..67f76307124 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -36,7 +36,10 @@ from airflow.api.common.mark_tasks import (
 from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
 from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run, 
get_latest_version_of_dag
 from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
-from airflow.api_fastapi.common.db.dag_runs import 
eager_load_dag_run_for_validation
+from airflow.api_fastapi.common.db.dag_runs import (
+    attach_dag_versions_to_runs,
+    eager_load_dag_run_for_list,
+)
 from airflow.api_fastapi.common.parameters import (
     FilterOptionEnum,
     FilterParam,
@@ -387,7 +390,7 @@ def get_dag_runs(
 
     This endpoint allows specifying `~` as the dag_id to retrieve Dag Runs for 
all DAGs.
     """
-    query = select(DagRun).options(*eager_load_dag_run_for_validation())
+    query = select(DagRun).options(*eager_load_dag_run_for_list())
 
     if dag_id != "~":
         get_latest_version_of_dag(dag_bag, dag_id, session)  # Check if the 
DAG exists.
@@ -422,7 +425,8 @@ def get_dag_runs(
         limit=limit,
         session=session,
     )
-    dag_runs = session.scalars(dag_run_select)
+    dag_runs = list(session.scalars(dag_run_select).unique())
+    attach_dag_versions_to_runs(dag_runs, session=session)
 
     return DAGRunCollectionResponse(
         dag_runs=dag_runs,
@@ -635,7 +639,7 @@ def get_list_dag_runs_batch(
         {"dag_run_id": "run_id"},
     ).set_value([body.order_by] if body.order_by else None)
 
-    base_query = select(DagRun).options(*eager_load_dag_run_for_validation())
+    base_query = select(DagRun).options(*eager_load_dag_run_for_list())
 
     dag_runs_select, total_entries = paginated_select(
         statement=base_query,
@@ -656,7 +660,8 @@ def get_list_dag_runs_batch(
         session=session,
     )
 
-    dag_runs = session.scalars(dag_runs_select)
+    dag_runs = list(session.scalars(dag_runs_select).unique())
+    attach_dag_versions_to_runs(dag_runs, session=session)
 
     return DAGRunCollectionResponse(
         dag_runs=dag_runs,
diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index c37713da4d8..d32a54c15d6 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -315,6 +315,10 @@ class DagRun(Base, LoggingMixin):
     _ti_dag_versions = association_proxy("task_instances", "dag_version")
     _tih_dag_versions = association_proxy("task_instances_histories", 
"dag_version")
 
+    # Can be set by manual prefetch, such as attach_dag_versions_to_runs()
+    # as an alternative to traversing all TI/TIH ORM relationships. Maps 
version_id -> DagVersion.
+    _prefetched_dag_version_ids: dict[UUID, DagVersion] | None = None
+
     def __init__(
         self,
         dag_id: str | None = None,
@@ -406,13 +410,16 @@ class DagRun(Base, LoggingMixin):
         # when the dag is in a versioned bundle, we keep the dag version fixed
         if self.bundle_version:
             return [self.created_dag_version] if self.created_dag_version is 
not None else []
-        dag_versions = [
-            dv
-            for dv in dict.fromkeys(list(self._tih_dag_versions) + 
list(self._ti_dag_versions))
-            if dv is not None
-        ]
-        sorted_ = sorted(dag_versions, key=lambda dv: dv.id)
-        return sorted_
+
+        if self._prefetched_dag_version_ids is not None:
+            dag_version_objects = 
list(self._prefetched_dag_version_ids.values())
+        else:
+            dag_version_objects = [
+                dv
+                for dv in dict.fromkeys(list(self._tih_dag_versions) + 
list(self._ti_dag_versions))
+                if dv is not None
+            ]
+        return sorted(dag_version_objects, key=lambda dv: dv.id)
 
     @property
     def version_number(self) -> int | None:

Reply via email to