This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fb9dea50dea Add number of queries guard in public dag runs list 
endpoints  (#57450)
fb9dea50dea is described below

commit fb9dea50dea1b5438cb7e9607bea87caedbf08cc
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Fri Oct 31 19:25:56 2025 +0100

    Add number of queries guard in public dag runs list endpoints  (#57450)
    
    * WIP to revert
    
    * Fix batch endpoint
    
    * Fix CI
    
    * Address review comments
    
    * Fix CI
    
    * Address PR comments
---
 .../src/airflow/api_fastapi/common/db/dag_runs.py  | 19 +++++++++++
 .../core_api/datamodels/dag_versions.py            | 28 ++--------------
 .../api_fastapi/core_api/datamodels/dags.py        |  4 ++-
 .../api_fastapi/core_api/openapi/_private_ui.yaml  |  1 -
 .../core_api/openapi/v2-rest-api-generated.yaml    |  1 -
 .../api_fastapi/core_api/routes/public/dag_run.py  | 13 +++++---
 airflow-core/src/airflow/models/dag_version.py     | 39 ++++++++++++++++++++--
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts |  3 +-
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |  2 +-
 .../core_api/routes/public/test_dag_run.py         | 25 ++++++++++----
 .../core_api/routes/public/test_dag_versions.py    | 10 ++----
 .../core_api/routes/public/test_dags.py            |  2 +-
 .../core_api/routes/public/test_task_instances.py  |  3 +-
 13 files changed, 96 insertions(+), 54 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py 
b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
index 2efb6967653..f68f3ab2186 100644
--- a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
@@ -20,10 +20,15 @@ from __future__ import annotations
 from typing import cast
 
 from sqlalchemy import func, select
+from sqlalchemy.orm import joinedload, selectinload
+from sqlalchemy.orm.interfaces import LoaderOption
 from sqlalchemy.sql import ColumnElement
 
 from airflow.models.dag import DagModel
+from airflow.models.dag_version import DagVersion
 from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 
 dagruns_select_with_state_count = (
     select(
@@ -36,3 +41,17 @@ dagruns_select_with_state_count = (
     .group_by(DagRun.dag_id, DagRun.state, DagModel.dag_display_name)
     .order_by(DagRun.dag_id)
 )
+
+
+def eager_load_dag_run_for_validation() -> tuple[LoaderOption, ...]:
+    """Construct the eager loading options necessary for a DagRunResponse 
object."""
+    return (
+        joinedload(DagRun.dag_model),
+        selectinload(DagRun.task_instances)
+        .joinedload(TaskInstance.dag_version)
+        .joinedload(DagVersion.bundle),
+        selectinload(DagRun.task_instances_histories)
+        .joinedload(TaskInstanceHistory.dag_version)
+        .joinedload(DagVersion.bundle),
+        joinedload(DagRun.dag_run_note),
+    )
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
index 2475d49031f..ac0ebf56f44 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
@@ -19,11 +19,9 @@ from __future__ import annotations
 from datetime import datetime
 from uuid import UUID
 
-from pydantic import AliasPath, Field, computed_field
-from sqlalchemy import select
+from pydantic import AliasPath, Field
 
 from airflow.api_fastapi.core_api.base import BaseModel
-from airflow.dag_processing.bundles.manager import DagBundlesManager
 
 
 class DagVersionResponse(BaseModel):
@@ -37,29 +35,7 @@ class DagVersionResponse(BaseModel):
     created_at: datetime
     dag_display_name: str = Field(validation_alias=AliasPath("dag_model", 
"dag_display_name"))
 
-    # Mypy issue https://github.com/python/mypy/issues/1362
-    @computed_field  # type: ignore[prop-decorator]
-    @property
-    def bundle_url(self) -> str | None:
-        if self.bundle_name:
-            # Get the bundle model from the database and render the URL
-            from airflow.models.dagbundle import DagBundleModel
-            from airflow.utils.session import create_session
-
-            with create_session() as session:
-                bundle_model = session.scalar(
-                    select(DagBundleModel).where(DagBundleModel.name == 
self.bundle_name)
-                )
-
-                if bundle_model and hasattr(bundle_model, 
"signed_url_template"):
-                    return bundle_model.render_url(self.bundle_version)
-                # fallback to the deprecated option if the bundle model does 
not have a signed_url_template
-                # attribute
-                try:
-                    return DagBundlesManager().view_url(self.bundle_name, 
self.bundle_version)
-                except ValueError:
-                    return None
-        return None
+    bundle_url: str | None = Field(validation_alias="bundle_url")
 
 
 class DAGVersionCollectionResponse(BaseModel):
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
index 1fea41919ff..7a32ca6bb89 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
@@ -202,7 +202,9 @@ class DAGDetailsResponse(DAGResponse):
     @property
     def latest_dag_version(self) -> DagVersionResponse | None:
         """Return the latest DagVersion."""
-        latest_dag_version = DagVersion.get_latest_version(self.dag_id, 
load_dag_model=True)
+        latest_dag_version = DagVersion.get_latest_version(
+            self.dag_id, load_dag_model=True, load_bundle_model=True
+        )
         if latest_dag_version is None:
             return latest_dag_version
         return DagVersionResponse.model_validate(latest_dag_version)
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index b85d7646390..4061b761003 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -1745,7 +1745,6 @@ components:
           - type: string
           - type: 'null'
           title: Bundle Url
-          readOnly: true
       type: object
       required:
       - id
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index eae45b1f1c1..28b0291c1f7 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -10846,7 +10846,6 @@ components:
           - type: string
           - type: 'null'
           title: Bundle Url
-          readOnly: true
       type: object
       required:
       - id
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index f6c1f016604..9c070da17d9 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -36,6 +36,7 @@ from airflow.api.common.mark_tasks import (
 from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
 from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run, 
get_latest_version_of_dag
 from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.db.dag_runs import 
eager_load_dag_run_for_validation
 from airflow.api_fastapi.common.parameters import (
     FilterOptionEnum,
     FilterParam,
@@ -82,6 +83,7 @@ from airflow.api_fastapi.core_api.services.public.dag_run 
import DagRunWaiter
 from airflow.api_fastapi.logging.decorators import action_logging
 from airflow.listeners.listener import get_listener_manager
 from airflow.models import DagModel, DagRun
+from airflow.models.asset import AssetEvent
 from airflow.models.dag_version import DagVersion
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -235,10 +237,12 @@ def get_upstream_asset_events(
 ) -> AssetEventCollectionResponse:
     """If dag run is asset-triggered, return the asset events that triggered 
it."""
     dag_run: DagRun | None = session.scalar(
-        select(DagRun).where(
+        select(DagRun)
+        .where(
             DagRun.dag_id == dag_id,
             DagRun.run_id == dag_run_id,
         )
+        
.options(joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.asset))
     )
     if dag_run is None:
         raise HTTPException(
@@ -368,11 +372,11 @@ def get_dag_runs(
 
     This endpoint allows specifying `~` as the dag_id to retrieve Dag Runs for 
all DAGs.
     """
-    query = select(DagRun)
+    query = select(DagRun).options(*eager_load_dag_run_for_validation())
 
     if dag_id != "~":
         get_latest_version_of_dag(dag_bag, dag_id, session)  # Check if the 
DAG exists.
-        query = query.filter(DagRun.dag_id == 
dag_id).options(joinedload(DagRun.dag_model))
+        query = query.filter(DagRun.dag_id == dag_id).options()
 
     # Add join with DagVersion if dag_version filter is active
     if dag_version.value:
@@ -607,7 +611,8 @@ def get_list_dag_runs_batch(
         {"dag_run_id": "run_id"},
     ).set_value([body.order_by] if body.order_by else None)
 
-    base_query = select(DagRun).options(joinedload(DagRun.dag_model))
+    base_query = select(DagRun).options(*eager_load_dag_run_for_validation())
+
     dag_runs_select, total_entries = paginated_select(
         statement=base_query,
         filters=[
diff --git a/airflow-core/src/airflow/models/dag_version.py 
b/airflow-core/src/airflow/models/dag_version.py
index fbc04f9171e..49768449732 100644
--- a/airflow-core/src/airflow/models/dag_version.py
+++ b/airflow-core/src/airflow/models/dag_version.py
@@ -27,6 +27,7 @@ from sqlalchemy.orm import Mapped, joinedload, relationship
 from sqlalchemy_utils import UUIDType
 
 from airflow._shared.timezones import timezone
+from airflow.dag_processing.bundles.manager import DagBundlesManager
 from airflow.models.base import Base, StringID
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import UtcDateTime, mapped_column, with_row_locks
@@ -51,6 +52,12 @@ class DagVersion(Base):
     dag_model = relationship("DagModel", back_populates="dag_versions")
     bundle_name: Mapped[str | None] = mapped_column(StringID(), nullable=True)
     bundle_version: Mapped[str | None] = mapped_column(StringID(), 
nullable=True)
+    bundle = relationship(
+        "DagBundleModel",
+        primaryjoin="foreign(DagVersion.bundle_name) == DagBundleModel.name",
+        uselist=False,
+        viewonly=True,
+    )
     dag_code = relationship(
         "DagCode",
         back_populates="dag_version",
@@ -79,6 +86,20 @@ class DagVersion(Base):
         """Represent the object as a string."""
         return f"<DagVersion {self.dag_id} {self.version}>"
 
+    @property
+    def bundle_url(self) -> str | None:
+        """Render the bundle URL using the joined bundle metadata if 
available."""
+        # Prefer using the joined bundle relationship when present to avoid 
extra queries
+        if getattr(self, "bundle", None) is not None and hasattr(self.bundle, 
"signed_url_template"):
+            return self.bundle.render_url(self.bundle_version)
+
+        # fallback to the deprecated option if the bundle model does not have 
a signed_url_template
+        # attribute
+        try:
+            return DagBundlesManager().view_url(self.bundle_name, 
self.bundle_version)
+        except ValueError:
+            return None
+
     @classmethod
     @provide_session
     def write_dag(
@@ -120,7 +141,11 @@ class DagVersion(Base):
 
     @classmethod
     def _latest_version_select(
-        cls, dag_id: str, bundle_version: str | None = None, load_dag_model: 
bool = False
+        cls,
+        dag_id: str,
+        bundle_version: str | None = None,
+        load_dag_model: bool = False,
+        load_bundle_model: bool = False,
     ) -> Select:
         """
         Get the select object to get the latest version of the DAG.
@@ -135,6 +160,9 @@ class DagVersion(Base):
         if load_dag_model:
             query = query.options(joinedload(cls.dag_model))
 
+        if load_bundle_model:
+            query = query.options(joinedload(cls.bundle))
+
         query = query.order_by(cls.created_at.desc()).limit(1)
         return query
 
@@ -146,6 +174,7 @@ class DagVersion(Base):
         *,
         bundle_version: str | None = None,
         load_dag_model: bool = False,
+        load_bundle_model: bool = False,
         session: Session = NEW_SESSION,
     ) -> DagVersion | None:
         """
@@ -154,10 +183,16 @@ class DagVersion(Base):
         :param dag_id: The DAG ID.
         :param session: The database session.
         :param load_dag_model: Whether to load the DAG model.
+        :param load_bundle_model: Whether to load the DagBundle model.
         :return: The latest version of the DAG or None if not found.
         """
         return session.scalar(
-            cls._latest_version_select(dag_id, bundle_version=bundle_version, 
load_dag_model=load_dag_model)
+            cls._latest_version_select(
+                dag_id,
+                bundle_version=bundle_version,
+                load_dag_model=load_dag_model,
+                load_bundle_model=load_bundle_model,
+            )
         )
 
     @classmethod
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 7a8898b1b72..1da1f6f8826 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -3325,8 +3325,7 @@ export const $DagVersionResponse = {
                     type: 'null'
                 }
             ],
-            title: 'Bundle Url',
-            readOnly: true
+            title: 'Bundle Url'
         }
     },
     type: 'object',
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index efd3f0406d3..04b7ff436cd 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -858,7 +858,7 @@ export type DagVersionResponse = {
     bundle_version: string | null;
     created_at: string;
     dag_display_name: string;
-    readonly bundle_url: string | null;
+    bundle_url: string | null;
 };
 
 /**
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 40e75998b73..3f8ddbe581a 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -39,6 +39,7 @@ from airflow.utils.state import DagRunState, State
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
 from tests_common.test_utils.api_fastapi import _check_dag_run_note, 
_check_last_log
+from tests_common.test_utils.asserts import assert_queries_count
 from tests_common.test_utils.db import (
     clear_db_connections,
     clear_db_dag_bundles,
@@ -327,7 +328,10 @@ class TestGetDagRun:
 
 
 class TestGetDagRuns:
-    @pytest.mark.parametrize("dag_id, total_entries", [(DAG1_ID, 2), (DAG2_ID, 
2), ("~", 4)])
+    @pytest.mark.parametrize(
+        "dag_id, total_entries",
+        [(DAG1_ID, 2), (DAG2_ID, 2), ("~", 4)],
+    )
     @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
     def test_get_dag_runs(self, test_client, session, dag_id, total_entries):
         response = test_client.get(f"/dags/{dag_id}/dagRuns")
@@ -384,7 +388,10 @@ class TestGetDagRuns:
     @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
     def test_return_correct_results_with_order_by(self, test_client, order_by, 
expected_order):
         # Test ascending order
-        response = test_client.get("/dags/test_dag1/dagRuns", 
params={"order_by": order_by})
+
+        with assert_queries_count(7):
+            response = test_client.get("/dags/test_dag1/dagRuns", 
params={"order_by": order_by})
+
         assert response.status_code == 200
         body = response.json()
         assert body["total_entries"] == 2
@@ -807,7 +814,8 @@ class TestGetDagRuns:
 class TestListDagRunsBatch:
     @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
     def test_list_dag_runs_return_200(self, test_client, session):
-        response = test_client.post("/dags/~/dagRuns/list", json={})
+        with assert_queries_count(5):
+            response = test_client.post("/dags/~/dagRuns/list", json={})
         assert response.status_code == 200
         body = response.json()
         assert body["total_entries"] == 4
@@ -848,7 +856,8 @@ class TestListDagRunsBatch:
     )
     @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
     def test_list_dag_runs_with_dag_ids_filter(self, test_client, dag_ids, 
status_code, expected_dag_id_list):
-        response = test_client.post("/dags/~/dagRuns/list", json={"dag_ids": 
dag_ids})
+        with assert_queries_count(5):
+            response = test_client.post("/dags/~/dagRuns/list", 
json={"dag_ids": dag_ids})
         assert response.status_code == status_code
         assert set([each["dag_run_id"] for each in 
response.json()["dag_runs"]]) == set(expected_dag_id_list)
 
@@ -1352,9 +1361,10 @@ class TestGetDagRunAssetTriggerEvents:
         session.commit()
         assert event.timestamp
 
-        response = test_client.get(
-            "/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamAssetEvents",
-        )
+        with assert_queries_count(3):
+            response = test_client.get(
+                
"/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamAssetEvents",
+            )
         assert response.status_code == 200
         expected_response = {
             "asset_events": [
@@ -1563,6 +1573,7 @@ class TestTriggerDagRun:
         run = (
             session.query(DagRun).where(DagRun.dag_id == DAG1_ID, 
DagRun.run_id == expected_dag_run_id).one()
         )
+
         expected_response_json = {
             "bundle_version": None,
             "conf": {},
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py
index 287c33d7717..50307f8502d 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py
@@ -104,9 +104,7 @@ class TestGetDagVersion(TestDagVersionEndpoint):
         ],
     )
     @pytest.mark.usefixtures("make_dag_with_multiple_versions")
-    @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr")
-    def test_get_dag_version(self, mock_hasattr, test_client, dag_id, 
dag_version, expected_response):
-        mock_hasattr.return_value = False
+    def test_get_dag_version(self, test_client, dag_id, dag_version, 
expected_response):
         response = test_client.get(f"/dags/{dag_id}/dagVersions/{dag_version}")
         assert response.status_code == 200
         assert response.json() == expected_response
@@ -180,7 +178,7 @@ class TestGetDagVersion(TestDagVersionEndpoint):
 
     @pytest.mark.usefixtures("make_dag_with_multiple_versions")
     
@mock.patch("airflow.dag_processing.bundles.manager.DagBundlesManager.view_url")
-    @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr")
+    @mock.patch("airflow.models.dag_version.hasattr")
     def test_get_dag_version_with_unconfigured_bundle(
         self, mock_hasattr, mock_view_url, test_client, dag_maker, session
     ):
@@ -305,9 +303,7 @@ class TestGetDagVersions(TestDagVersionEndpoint):
         ],
     )
     @pytest.mark.usefixtures("make_dag_with_multiple_versions")
-    @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr")
-    def test_get_dag_versions(self, mock_hasattr, test_client, dag_id, 
expected_response):
-        mock_hasattr.return_value = False
+    def test_get_dag_versions(self, test_client, dag_id, expected_response):
         response = test_client.get(f"/dags/{dag_id}/dagVersions")
         assert response.status_code == 200
         assert response.json() == expected_response
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
index cb59ec3ec43..198c0fa5d3f 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
@@ -907,7 +907,7 @@ class TestDagDetails(TestDagEndpoint):
             "is_paused_upon_creation": None,
             "latest_dag_version": {
                 "bundle_name": "dag_maker",
-                "bundle_url": None,
+                "bundle_url": "http://test_host.github.com/tree/None/dags";,
                 "bundle_version": None,
                 "created_at": mock.ANY,
                 "dag_id": "test_dag2",
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index d92ecf559a6..6a49c9fb63b 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -1304,7 +1304,8 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint):
                 update_extras=update_extras,
                 task_instances=task_instances,
             )
-        with 
mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.DagBundlesManager"):
+        with mock.patch("airflow.models.dag_version.DagBundlesManager") as 
dag_bundle_manager_mock:
+            dag_bundle_manager_mock.return_value.view_url.return_value = 
"some_url"
             # Mock DagBundlesManager to avoid checking if dags-folder bundle 
is configured
             response = test_client.get(url, params=params)
         if params == {"task_id_pattern": "task_match_id"}:

Reply via email to