This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 20544a444e903cc153992764d304356c6810e57b
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Mon Nov 3 14:20:58 2025 +0100

    Add number of queries guard in public dag runs list endpoints  (#57450) 
(#57736)
    
    * WIP to revert
    
    * Fix batch endpoint
    
    * Fix CI
    
    * Address review comments
    
    * Fix CI
    
    * Address PR comments
    
    (cherry picked from commit fb9dea50dea1b5438cb7e9607bea87caedbf08cc)
---
 .../src/airflow/api_fastapi/common/db/dag_runs.py  | 19 +++++++++++
 .../core_api/datamodels/dag_versions.py            | 28 ++--------------
 .../api_fastapi/core_api/datamodels/dags.py        |  4 ++-
 .../api_fastapi/core_api/openapi/_private_ui.yaml  |  1 -
 .../core_api/openapi/v2-rest-api-generated.yaml    |  1 -
 .../api_fastapi/core_api/routes/public/dag_run.py  | 13 +++++---
 airflow-core/src/airflow/models/dag_version.py     | 39 ++++++++++++++++++++--
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts |  3 +-
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |  2 +-
 .../core_api/routes/public/test_dag_run.py         | 25 ++++++++++----
 .../core_api/routes/public/test_dag_versions.py    | 10 ++----
 .../core_api/routes/public/test_dags.py            |  2 +-
 .../core_api/routes/public/test_task_instances.py  |  2 +-
 13 files changed, 95 insertions(+), 54 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py 
b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
index 1ca5f1d261b..2a5fb76c8b8 100644
--- a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
@@ -18,9 +18,14 @@
 from __future__ import annotations
 
 from sqlalchemy import func, select
+from sqlalchemy.orm import joinedload, selectinload
+from sqlalchemy.orm.interfaces import LoaderOption
 
 from airflow.models.dag import DagModel
+from airflow.models.dag_version import DagVersion
 from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
 
 dagruns_select_with_state_count = (
     select(
@@ -33,3 +38,17 @@ dagruns_select_with_state_count = (
     .group_by(DagRun.dag_id, DagRun.state, DagModel.dag_display_name)
     .order_by(DagRun.dag_id)
 )
+
+
+def eager_load_dag_run_for_validation() -> tuple[LoaderOption, ...]:
+    """Construct the eager loading options necessary for a DagRunResponse 
object."""
+    return (
+        joinedload(DagRun.dag_model),
+        selectinload(DagRun.task_instances)
+        .joinedload(TaskInstance.dag_version)
+        .joinedload(DagVersion.bundle),
+        selectinload(DagRun.task_instances_histories)
+        .joinedload(TaskInstanceHistory.dag_version)
+        .joinedload(DagVersion.bundle),
+        joinedload(DagRun.dag_run_note),
+    )
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
index 2475d49031f..ac0ebf56f44 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
@@ -19,11 +19,9 @@ from __future__ import annotations
 from datetime import datetime
 from uuid import UUID
 
-from pydantic import AliasPath, Field, computed_field
-from sqlalchemy import select
+from pydantic import AliasPath, Field
 
 from airflow.api_fastapi.core_api.base import BaseModel
-from airflow.dag_processing.bundles.manager import DagBundlesManager
 
 
 class DagVersionResponse(BaseModel):
@@ -37,29 +35,7 @@ class DagVersionResponse(BaseModel):
     created_at: datetime
     dag_display_name: str = Field(validation_alias=AliasPath("dag_model", 
"dag_display_name"))
 
-    # Mypy issue https://github.com/python/mypy/issues/1362
-    @computed_field  # type: ignore[prop-decorator]
-    @property
-    def bundle_url(self) -> str | None:
-        if self.bundle_name:
-            # Get the bundle model from the database and render the URL
-            from airflow.models.dagbundle import DagBundleModel
-            from airflow.utils.session import create_session
-
-            with create_session() as session:
-                bundle_model = session.scalar(
-                    select(DagBundleModel).where(DagBundleModel.name == 
self.bundle_name)
-                )
-
-                if bundle_model and hasattr(bundle_model, 
"signed_url_template"):
-                    return bundle_model.render_url(self.bundle_version)
-                # fallback to the deprecated option if the bundle model does 
not have a signed_url_template
-                # attribute
-                try:
-                    return DagBundlesManager().view_url(self.bundle_name, 
self.bundle_version)
-                except ValueError:
-                    return None
-        return None
+    bundle_url: str | None = Field(validation_alias="bundle_url")
 
 
 class DAGVersionCollectionResponse(BaseModel):
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
index 2d14f29c41a..2a805da7abd 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
@@ -199,7 +199,9 @@ class DAGDetailsResponse(DAGResponse):
     @property
     def latest_dag_version(self) -> DagVersionResponse | None:
         """Return the latest DagVersion."""
-        latest_dag_version = DagVersion.get_latest_version(self.dag_id, 
load_dag_model=True)
+        latest_dag_version = DagVersion.get_latest_version(
+            self.dag_id, load_dag_model=True, load_bundle_model=True
+        )
         if latest_dag_version is None:
             return latest_dag_version
         return DagVersionResponse.model_validate(latest_dag_version)
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index 77d4ed18a84..0d4aec69997 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -1676,7 +1676,6 @@ components:
           - type: string
           - type: 'null'
           title: Bundle Url
-          readOnly: true
       type: object
       required:
       - id
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 3c1729d1c97..a8bf0d2336d 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -10717,7 +10717,6 @@ components:
           - type: string
           - type: 'null'
           title: Bundle Url
-          readOnly: true
       type: object
       required:
       - id
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 657d2df6955..44c5be5f281 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -36,6 +36,7 @@ from airflow.api.common.mark_tasks import (
 from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
 from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run, 
get_latest_version_of_dag
 from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.db.dag_runs import 
eager_load_dag_run_for_validation
 from airflow.api_fastapi.common.parameters import (
     FilterOptionEnum,
     FilterParam,
@@ -81,6 +82,7 @@ from airflow.api_fastapi.logging.decorators import 
action_logging
 from airflow.exceptions import ParamValidationError
 from airflow.listeners.listener import get_listener_manager
 from airflow.models import DagModel, DagRun
+from airflow.models.asset import AssetEvent
 from airflow.models.dag_version import DagVersion
 from airflow.utils.state import DagRunState
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -232,10 +234,12 @@ def get_upstream_asset_events(
 ) -> AssetEventCollectionResponse:
     """If dag run is asset-triggered, return the asset events that triggered 
it."""
     dag_run: DagRun | None = session.scalar(
-        select(DagRun).where(
+        select(DagRun)
+        .where(
             DagRun.dag_id == dag_id,
             DagRun.run_id == dag_run_id,
         )
+        
.options(joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.asset))
     )
     if dag_run is None:
         raise HTTPException(
@@ -357,11 +361,11 @@ def get_dag_runs(
 
     This endpoint allows specifying `~` as the dag_id to retrieve Dag Runs for 
all DAGs.
     """
-    query = select(DagRun)
+    query = select(DagRun).options(*eager_load_dag_run_for_validation())
 
     if dag_id != "~":
         get_latest_version_of_dag(dag_bag, dag_id, session)  # Check if the 
DAG exists.
-        query = query.filter(DagRun.dag_id == 
dag_id).options(joinedload(DagRun.dag_model))
+        query = query.filter(DagRun.dag_id == dag_id).options()
 
     # Add join with DagVersion if dag_version filter is active
     if dag_version.value:
@@ -585,7 +589,8 @@ def get_list_dag_runs_batch(
         {"dag_run_id": "run_id"},
     ).set_value([body.order_by] if body.order_by else None)
 
-    base_query = select(DagRun).options(joinedload(DagRun.dag_model))
+    base_query = select(DagRun).options(*eager_load_dag_run_for_validation())
+
     dag_runs_select, total_entries = paginated_select(
         statement=base_query,
         filters=[dag_ids, logical_date, run_after, start_date, end_date, 
state, readable_dag_runs_filter],
diff --git a/airflow-core/src/airflow/models/dag_version.py 
b/airflow-core/src/airflow/models/dag_version.py
index a51f3cb3018..3f57333333b 100644
--- a/airflow-core/src/airflow/models/dag_version.py
+++ b/airflow-core/src/airflow/models/dag_version.py
@@ -26,6 +26,7 @@ from sqlalchemy.orm import joinedload, relationship
 from sqlalchemy_utils import UUIDType
 
 from airflow._shared.timezones import timezone
+from airflow.dag_processing.bundles.manager import DagBundlesManager
 from airflow.models.base import Base, StringID
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import UtcDateTime, with_row_locks
@@ -47,6 +48,12 @@ class DagVersion(Base):
     dag_model = relationship("DagModel", back_populates="dag_versions")
     bundle_name = Column(StringID(), nullable=True)
     bundle_version = Column(StringID())
+    bundle = relationship(
+        "DagBundleModel",
+        primaryjoin="foreign(DagVersion.bundle_name) == DagBundleModel.name",
+        uselist=False,
+        viewonly=True,
+    )
     dag_code = relationship(
         "DagCode",
         back_populates="dag_version",
@@ -73,6 +80,20 @@ class DagVersion(Base):
         """Represent the object as a string."""
         return f"<DagVersion {self.dag_id} {self.version}>"
 
+    @property
+    def bundle_url(self) -> str | None:
+        """Render the bundle URL using the joined bundle metadata if 
available."""
+        # Prefer using the joined bundle relationship when present to avoid 
extra queries
+        if getattr(self, "bundle", None) is not None and hasattr(self.bundle, 
"signed_url_template"):
+            return self.bundle.render_url(self.bundle_version)
+
+        # fallback to the deprecated option if the bundle model does not have 
a signed_url_template
+        # attribute
+        try:
+            return DagBundlesManager().view_url(self.bundle_name, 
self.bundle_version)
+        except ValueError:
+            return None
+
     @classmethod
     @provide_session
     def write_dag(
@@ -114,7 +135,11 @@ class DagVersion(Base):
 
     @classmethod
     def _latest_version_select(
-        cls, dag_id: str, bundle_version: str | None = None, load_dag_model: 
bool = False
+        cls,
+        dag_id: str,
+        bundle_version: str | None = None,
+        load_dag_model: bool = False,
+        load_bundle_model: bool = False,
     ) -> Select:
         """
         Get the select object to get the latest version of the DAG.
@@ -129,6 +154,9 @@ class DagVersion(Base):
         if load_dag_model:
             query = query.options(joinedload(cls.dag_model))
 
+        if load_bundle_model:
+            query = query.options(joinedload(cls.bundle))
+
         query = query.order_by(cls.created_at.desc()).limit(1)
         return query
 
@@ -140,6 +168,7 @@ class DagVersion(Base):
         *,
         bundle_version: str | None = None,
         load_dag_model: bool = False,
+        load_bundle_model: bool = False,
         session: Session = NEW_SESSION,
     ) -> DagVersion | None:
         """
@@ -148,10 +177,16 @@ class DagVersion(Base):
         :param dag_id: The DAG ID.
         :param session: The database session.
         :param load_dag_model: Whether to load the DAG model.
+        :param load_bundle_model: Whether to load the DagBundle model.
         :return: The latest version of the DAG or None if not found.
         """
         return session.scalar(
-            cls._latest_version_select(dag_id, bundle_version=bundle_version, 
load_dag_model=load_dag_model)
+            cls._latest_version_select(
+                dag_id,
+                bundle_version=bundle_version,
+                load_dag_model=load_dag_model,
+                load_bundle_model=load_bundle_model,
+            )
         )
 
     @classmethod
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 4f503ef9492..02747de08e4 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -3185,8 +3185,7 @@ export const $DagVersionResponse = {
                     type: 'null'
                 }
             ],
-            title: 'Bundle Url',
-            readOnly: true
+            title: 'Bundle Url'
         }
     },
     type: 'object',
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index f2e2c84f812..9f43a11b8dc 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -827,7 +827,7 @@ export type DagVersionResponse = {
     bundle_version: string | null;
     created_at: string;
     dag_display_name: string;
-    readonly bundle_url: string | null;
+    bundle_url: string | null;
 };
 
 /**
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 262efdccf6c..8648a25d8de 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -39,6 +39,7 @@ from airflow.utils.state import DagRunState, State
 from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
 from tests_common.test_utils.api_fastapi import _check_dag_run_note, 
_check_last_log
+from tests_common.test_utils.asserts import assert_queries_count
 from tests_common.test_utils.db import (
     clear_db_connections,
     clear_db_dag_bundles,
@@ -307,7 +308,10 @@ class TestGetDagRun:
 
 
 class TestGetDagRuns:
-    @pytest.mark.parametrize("dag_id, total_entries", [(DAG1_ID, 2), (DAG2_ID, 
2), ("~", 4)])
+    @pytest.mark.parametrize(
+        "dag_id, total_entries",
+        [(DAG1_ID, 2), (DAG2_ID, 2), ("~", 4)],
+    )
     @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
     def test_get_dag_runs(self, test_client, session, dag_id, total_entries):
         response = test_client.get(f"/dags/{dag_id}/dagRuns")
@@ -364,7 +368,10 @@ class TestGetDagRuns:
     @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
     def test_return_correct_results_with_order_by(self, test_client, order_by, 
expected_order):
         # Test ascending order
-        response = test_client.get("/dags/test_dag1/dagRuns", 
params={"order_by": order_by})
+
+        with assert_queries_count(7):
+            response = test_client.get("/dags/test_dag1/dagRuns", 
params={"order_by": order_by})
+
         assert response.status_code == 200
         body = response.json()
         assert body["total_entries"] == 2
@@ -750,7 +757,8 @@ class TestGetDagRuns:
 class TestListDagRunsBatch:
     @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
     def test_list_dag_runs_return_200(self, test_client, session):
-        response = test_client.post("/dags/~/dagRuns/list", json={})
+        with assert_queries_count(5):
+            response = test_client.post("/dags/~/dagRuns/list", json={})
         assert response.status_code == 200
         body = response.json()
         assert body["total_entries"] == 4
@@ -791,7 +799,8 @@ class TestListDagRunsBatch:
     )
     @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
     def test_list_dag_runs_with_dag_ids_filter(self, test_client, dag_ids, 
status_code, expected_dag_id_list):
-        response = test_client.post("/dags/~/dagRuns/list", json={"dag_ids": 
dag_ids})
+        with assert_queries_count(5):
+            response = test_client.post("/dags/~/dagRuns/list", 
json={"dag_ids": dag_ids})
         assert response.status_code == status_code
         assert set([each["dag_run_id"] for each in 
response.json()["dag_runs"]]) == set(expected_dag_id_list)
 
@@ -1293,9 +1302,10 @@ class TestGetDagRunAssetTriggerEvents:
         session.commit()
         assert event.timestamp
 
-        response = test_client.get(
-            "/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamAssetEvents",
-        )
+        with assert_queries_count(3):
+            response = test_client.get(
+                
"/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamAssetEvents",
+            )
         assert response.status_code == 200
         expected_response = {
             "asset_events": [
@@ -1504,6 +1514,7 @@ class TestTriggerDagRun:
         run = (
             session.query(DagRun).where(DagRun.dag_id == DAG1_ID, 
DagRun.run_id == expected_dag_run_id).one()
         )
+
         expected_response_json = {
             "bundle_version": None,
             "conf": {},
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py
index 287c33d7717..50307f8502d 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py
@@ -104,9 +104,7 @@ class TestGetDagVersion(TestDagVersionEndpoint):
         ],
     )
     @pytest.mark.usefixtures("make_dag_with_multiple_versions")
-    @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr")
-    def test_get_dag_version(self, mock_hasattr, test_client, dag_id, 
dag_version, expected_response):
-        mock_hasattr.return_value = False
+    def test_get_dag_version(self, test_client, dag_id, dag_version, 
expected_response):
         response = test_client.get(f"/dags/{dag_id}/dagVersions/{dag_version}")
         assert response.status_code == 200
         assert response.json() == expected_response
@@ -180,7 +178,7 @@ class TestGetDagVersion(TestDagVersionEndpoint):
 
     @pytest.mark.usefixtures("make_dag_with_multiple_versions")
     
@mock.patch("airflow.dag_processing.bundles.manager.DagBundlesManager.view_url")
-    @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr")
+    @mock.patch("airflow.models.dag_version.hasattr")
     def test_get_dag_version_with_unconfigured_bundle(
         self, mock_hasattr, mock_view_url, test_client, dag_maker, session
     ):
@@ -305,9 +303,7 @@ class TestGetDagVersions(TestDagVersionEndpoint):
         ],
     )
     @pytest.mark.usefixtures("make_dag_with_multiple_versions")
-    @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr")
-    def test_get_dag_versions(self, mock_hasattr, test_client, dag_id, 
expected_response):
-        mock_hasattr.return_value = False
+    def test_get_dag_versions(self, test_client, dag_id, expected_response):
         response = test_client.get(f"/dags/{dag_id}/dagVersions")
         assert response.status_code == 200
         assert response.json() == expected_response
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
index fad5d39c7d8..80a645ba699 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
@@ -907,7 +907,7 @@ class TestDagDetails(TestDagEndpoint):
             "is_paused_upon_creation": None,
             "latest_dag_version": {
                 "bundle_name": "dag_maker",
-                "bundle_url": None,
+                "bundle_url": "http://test_host.github.com/tree/None/dags";,
                 "bundle_version": None,
                 "created_at": mock.ANY,
                 "dag_id": "test_dag2",
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index 55db52a0e52..fd8662aea4f 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -1260,7 +1260,7 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint):
             update_extras=update_extras,
             task_instances=task_instances,
         )
-        with 
mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.DagBundlesManager"):
+        with mock.patch("airflow.models.dag_version.DagBundlesManager"):
             # Mock DagBundlesManager to avoid checking if dags-folder bundle 
is configured
             response = test_client.get(url, params=params)
         if params == {"task_id_pattern": "task_match_id"}:

Reply via email to