This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 20544a444e903cc153992764d304356c6810e57b Author: Pierre Jeambrun <[email protected]> AuthorDate: Mon Nov 3 14:20:58 2025 +0100 Add number of queries guard in public dag runs list endpoints (#57450) (#57736) * WIP to revert * Fix batch endpoint * Fix CI * Address review comments * Fix CI * Address PR comments (cherry picked from commit fb9dea50dea1b5438cb7e9607bea87caedbf08cc) --- .../src/airflow/api_fastapi/common/db/dag_runs.py | 19 +++++++++++ .../core_api/datamodels/dag_versions.py | 28 ++-------------- .../api_fastapi/core_api/datamodels/dags.py | 4 ++- .../api_fastapi/core_api/openapi/_private_ui.yaml | 1 - .../core_api/openapi/v2-rest-api-generated.yaml | 1 - .../api_fastapi/core_api/routes/public/dag_run.py | 13 +++++--- airflow-core/src/airflow/models/dag_version.py | 39 ++++++++++++++++++++-- .../airflow/ui/openapi-gen/requests/schemas.gen.ts | 3 +- .../airflow/ui/openapi-gen/requests/types.gen.ts | 2 +- .../core_api/routes/public/test_dag_run.py | 25 ++++++++++---- .../core_api/routes/public/test_dag_versions.py | 10 ++---- .../core_api/routes/public/test_dags.py | 2 +- .../core_api/routes/public/test_task_instances.py | 2 +- 13 files changed, 95 insertions(+), 54 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py index 1ca5f1d261b..2a5fb76c8b8 100644 --- a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py +++ b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py @@ -18,9 +18,14 @@ from __future__ import annotations from sqlalchemy import func, select +from sqlalchemy.orm import joinedload, selectinload +from sqlalchemy.orm.interfaces import LoaderOption from airflow.models.dag import DagModel +from airflow.models.dag_version import DagVersion from airflow.models.dagrun import DagRun +from airflow.models.taskinstance import TaskInstance +from airflow.models.taskinstancehistory import TaskInstanceHistory dagruns_select_with_state_count = ( select( @@ -33,3 +38,17 @@ dagruns_select_with_state_count = ( .group_by(DagRun.dag_id, DagRun.state, DagModel.dag_display_name) .order_by(DagRun.dag_id) ) + + +def eager_load_dag_run_for_validation() -> tuple[LoaderOption, ...]: + """Construct the eager loading options necessary for a DagRunResponse object.""" + return ( + joinedload(DagRun.dag_model), + selectinload(DagRun.task_instances) + .joinedload(TaskInstance.dag_version) + .joinedload(DagVersion.bundle), + selectinload(DagRun.task_instances_histories) + .joinedload(TaskInstanceHistory.dag_version) + .joinedload(DagVersion.bundle), + joinedload(DagRun.dag_run_note), + ) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py index 2475d49031f..ac0ebf56f44 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py @@ -19,11 +19,9 @@ from __future__ import annotations from datetime import datetime from uuid import UUID -from pydantic import AliasPath, Field, computed_field -from sqlalchemy import select +from pydantic import AliasPath, Field from airflow.api_fastapi.core_api.base import BaseModel -from airflow.dag_processing.bundles.manager import DagBundlesManager class DagVersionResponse(BaseModel): @@ -37,29 +35,7 @@ class DagVersionResponse(BaseModel): created_at: datetime dag_display_name: str = Field(validation_alias=AliasPath("dag_model", "dag_display_name")) - # Mypy issue https://github.com/python/mypy/issues/1362 - @computed_field # type: ignore[prop-decorator] - @property - def bundle_url(self) -> str | None: - if self.bundle_name: - # Get the bundle model from the database and render the URL - from airflow.models.dagbundle import DagBundleModel - from airflow.utils.session import create_session - - with create_session() as session: - bundle_model = session.scalar( - select(DagBundleModel).where(DagBundleModel.name == self.bundle_name) - ) - - if bundle_model and hasattr(bundle_model, "signed_url_template"): - return bundle_model.render_url(self.bundle_version) - # fallback to the deprecated option if the bundle model does not have a signed_url_template - # attribute - try: - return DagBundlesManager().view_url(self.bundle_name, self.bundle_version) - except ValueError: - return None - return None + bundle_url: str | None = Field(validation_alias="bundle_url") class DAGVersionCollectionResponse(BaseModel): diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py index 2d14f29c41a..2a805da7abd 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py @@ -199,7 +199,9 @@ class DAGDetailsResponse(DAGResponse): @property def latest_dag_version(self) -> DagVersionResponse | None: """Return the latest DagVersion.""" - latest_dag_version = DagVersion.get_latest_version(self.dag_id, load_dag_model=True) + latest_dag_version = DagVersion.get_latest_version( + self.dag_id, load_dag_model=True, load_bundle_model=True + ) if latest_dag_version is None: return latest_dag_version return DagVersionResponse.model_validate(latest_dag_version) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index 77d4ed18a84..0d4aec69997 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -1676,7 +1676,6 @@ components: - type: string - type: 'null' title: Bundle Url - readOnly: true type: object required: - id diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 3c1729d1c97..a8bf0d2336d 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -10717,7 +10717,6 @@ components: - type: string - type: 'null' title: Bundle Url - readOnly: true type: object required: - id diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 657d2df6955..44c5be5f281 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -36,6 +36,7 @@ from airflow.api.common.mark_tasks import ( from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run, get_latest_version_of_dag from airflow.api_fastapi.common.db.common import SessionDep, paginated_select +from airflow.api_fastapi.common.db.dag_runs import eager_load_dag_run_for_validation from airflow.api_fastapi.common.parameters import ( FilterOptionEnum, FilterParam, @@ -81,6 +82,7 @@ from airflow.api_fastapi.logging.decorators import action_logging from airflow.exceptions import ParamValidationError from airflow.listeners.listener import get_listener_manager from airflow.models import DagModel, DagRun +from airflow.models.asset import AssetEvent from airflow.models.dag_version import DagVersion from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -232,10 +234,12 @@ def get_upstream_asset_events( ) -> AssetEventCollectionResponse: """If dag run is asset-triggered, return the asset events that triggered it.""" dag_run: DagRun | None = session.scalar( - select(DagRun).where( + select(DagRun) + .where( DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id, ) + .options(joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.asset)) ) if dag_run is None: raise HTTPException( @@ -357,11 +361,11 @@ def get_dag_runs( This endpoint allows specifying `~` as the dag_id to retrieve Dag Runs for all DAGs. """ - query = select(DagRun) + query = select(DagRun).options(*eager_load_dag_run_for_validation()) if dag_id != "~": get_latest_version_of_dag(dag_bag, dag_id, session) # Check if the DAG exists. - query = query.filter(DagRun.dag_id == dag_id).options(joinedload(DagRun.dag_model)) + query = query.filter(DagRun.dag_id == dag_id).options() # Add join with DagVersion if dag_version filter is active if dag_version.value: @@ -585,7 +589,8 @@ def get_list_dag_runs_batch( {"dag_run_id": "run_id"}, ).set_value([body.order_by] if body.order_by else None) - base_query = select(DagRun).options(joinedload(DagRun.dag_model)) + base_query = select(DagRun).options(*eager_load_dag_run_for_validation()) + dag_runs_select, total_entries = paginated_select( statement=base_query, filters=[dag_ids, logical_date, run_after, start_date, end_date, state, readable_dag_runs_filter], diff --git a/airflow-core/src/airflow/models/dag_version.py b/airflow-core/src/airflow/models/dag_version.py index a51f3cb3018..3f57333333b 100644 --- a/airflow-core/src/airflow/models/dag_version.py +++ b/airflow-core/src/airflow/models/dag_version.py @@ -26,6 +26,7 @@ from sqlalchemy.orm import joinedload, relationship from sqlalchemy_utils import UUIDType from airflow._shared.timezones import timezone +from airflow.dag_processing.bundles.manager import DagBundlesManager from airflow.models.base import Base, StringID from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.sqlalchemy import UtcDateTime, with_row_locks @@ -47,6 +48,12 @@ class DagVersion(Base): dag_model = relationship("DagModel", back_populates="dag_versions") bundle_name = Column(StringID(), nullable=True) bundle_version = Column(StringID()) + bundle = relationship( + "DagBundleModel", + primaryjoin="foreign(DagVersion.bundle_name) == DagBundleModel.name", + uselist=False, + viewonly=True, + ) dag_code = relationship( "DagCode", back_populates="dag_version", @@ -73,6 +80,20 @@ class DagVersion(Base): """Represent the object as a string.""" return f"<DagVersion {self.dag_id} {self.version}>" + @property + def bundle_url(self) -> str | None: + """Render the bundle URL using the joined bundle metadata if available.""" + # Prefer using the joined bundle relationship when present to avoid extra queries + if getattr(self, "bundle", None) is not None and hasattr(self.bundle, "signed_url_template"): + return self.bundle.render_url(self.bundle_version) + + # fallback to the deprecated option if the bundle model does not have a signed_url_template + # attribute + try: + return DagBundlesManager().view_url(self.bundle_name, self.bundle_version) + except ValueError: + return None + @classmethod @provide_session def write_dag( @@ -114,7 +135,11 @@ class DagVersion(Base): @classmethod def _latest_version_select( - cls, dag_id: str, bundle_version: str | None = None, load_dag_model: bool = False + cls, + dag_id: str, + bundle_version: str | None = None, + load_dag_model: bool = False, + load_bundle_model: bool = False, ) -> Select: """ Get the select object to get the latest version of the DAG. @@ -129,6 +154,9 @@ class DagVersion(Base): if load_dag_model: query = query.options(joinedload(cls.dag_model)) + if load_bundle_model: + query = query.options(joinedload(cls.bundle)) + query = query.order_by(cls.created_at.desc()).limit(1) return query @@ -140,6 +168,7 @@ class DagVersion(Base): *, bundle_version: str | None = None, load_dag_model: bool = False, + load_bundle_model: bool = False, session: Session = NEW_SESSION, ) -> DagVersion | None: """ @@ -148,10 +177,16 @@ class DagVersion(Base): :param dag_id: The DAG ID. :param session: The database session. :param load_dag_model: Whether to load the DAG model. + :param load_bundle_model: Whether to load the DagBundle model. :return: The latest version of the DAG or None if not found. """ return session.scalar( - cls._latest_version_select(dag_id, bundle_version=bundle_version, load_dag_model=load_dag_model) + cls._latest_version_select( + dag_id, + bundle_version=bundle_version, + load_dag_model=load_dag_model, + load_bundle_model=load_bundle_model, + ) ) @classmethod diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 4f503ef9492..02747de08e4 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3185,8 +3185,7 @@ export const $DagVersionResponse = { type: 'null' } ], - title: 'Bundle Url', - readOnly: true + title: 'Bundle Url' } }, type: 'object', diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index f2e2c84f812..9f43a11b8dc 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -827,7 +827,7 @@ export type DagVersionResponse = { bundle_version: string | null; created_at: string; dag_display_name: string; - readonly bundle_url: string | null; + bundle_url: string | null; }; /** diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 262efdccf6c..8648a25d8de 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -39,6 +39,7 @@ from airflow.utils.state import DagRunState, State from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils.api_fastapi import _check_dag_run_note, _check_last_log +from tests_common.test_utils.asserts import assert_queries_count from tests_common.test_utils.db import ( clear_db_connections, clear_db_dag_bundles, @@ -307,7 +308,10 @@ class TestGetDagRun: class TestGetDagRuns: - @pytest.mark.parametrize("dag_id, total_entries", [(DAG1_ID, 2), (DAG2_ID, 2), ("~", 4)]) + @pytest.mark.parametrize( + "dag_id, total_entries", + [(DAG1_ID, 2), (DAG2_ID, 2), ("~", 4)], + ) @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") def test_get_dag_runs(self, test_client, session, dag_id, total_entries): response = test_client.get(f"/dags/{dag_id}/dagRuns") @@ -364,7 +368,10 @@ class TestGetDagRuns: @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") def test_return_correct_results_with_order_by(self, test_client, order_by, expected_order): # Test ascending order - response = test_client.get("/dags/test_dag1/dagRuns", params={"order_by": order_by}) + + with assert_queries_count(7): + response = test_client.get("/dags/test_dag1/dagRuns", params={"order_by": order_by}) + assert response.status_code == 200 body = response.json() assert body["total_entries"] == 2 @@ -750,7 +757,8 @@ class TestGetDagRuns: class TestListDagRunsBatch: @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") def test_list_dag_runs_return_200(self, test_client, session): - response = test_client.post("/dags/~/dagRuns/list", json={}) + with assert_queries_count(5): + response = test_client.post("/dags/~/dagRuns/list", json={}) assert response.status_code == 200 body = response.json() assert body["total_entries"] == 4 @@ -791,7 +799,8 @@ class TestListDagRunsBatch: ) @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") def test_list_dag_runs_with_dag_ids_filter(self, test_client, dag_ids, status_code, expected_dag_id_list): - response = test_client.post("/dags/~/dagRuns/list", json={"dag_ids": dag_ids}) + with assert_queries_count(5): + response = test_client.post("/dags/~/dagRuns/list", json={"dag_ids": dag_ids}) assert response.status_code == status_code assert set([each["dag_run_id"] for each in response.json()["dag_runs"]]) == set(expected_dag_id_list) @@ -1293,9 +1302,10 @@ class TestGetDagRunAssetTriggerEvents: session.commit() assert event.timestamp - response = test_client.get( - "/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamAssetEvents", - ) + with assert_queries_count(3): + response = test_client.get( + "/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamAssetEvents", + ) assert response.status_code == 200 expected_response = { "asset_events": [ @@ -1504,6 +1514,7 @@ class TestTriggerDagRun: run = ( session.query(DagRun).where(DagRun.dag_id == DAG1_ID, DagRun.run_id == expected_dag_run_id).one() ) + expected_response_json = { "bundle_version": None, "conf": {}, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py index 287c33d7717..50307f8502d 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py @@ -104,9 +104,7 @@ class TestGetDagVersion(TestDagVersionEndpoint): ], ) @pytest.mark.usefixtures("make_dag_with_multiple_versions") - @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr") - def test_get_dag_version(self, mock_hasattr, test_client, dag_id, dag_version, expected_response): - mock_hasattr.return_value = False + def test_get_dag_version(self, test_client, dag_id, dag_version, expected_response): response = test_client.get(f"/dags/{dag_id}/dagVersions/{dag_version}") assert response.status_code == 200 assert response.json() == expected_response @@ -180,7 +178,7 @@ class TestGetDagVersion(TestDagVersionEndpoint): @pytest.mark.usefixtures("make_dag_with_multiple_versions") @mock.patch("airflow.dag_processing.bundles.manager.DagBundlesManager.view_url") - @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr") + @mock.patch("airflow.models.dag_version.hasattr") def test_get_dag_version_with_unconfigured_bundle( self, mock_hasattr, mock_view_url, test_client, dag_maker, session ): @@ -305,9 +303,7 @@ class TestGetDagVersions(TestDagVersionEndpoint): ], ) @pytest.mark.usefixtures("make_dag_with_multiple_versions") - @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr") - def test_get_dag_versions(self, mock_hasattr, test_client, dag_id, expected_response): - mock_hasattr.return_value = False + def test_get_dag_versions(self, test_client, dag_id, expected_response): response = test_client.get(f"/dags/{dag_id}/dagVersions") assert response.status_code == 200 assert response.json() == expected_response diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py index fad5d39c7d8..80a645ba699 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py @@ -907,7 +907,7 @@ class TestDagDetails(TestDagEndpoint): "is_paused_upon_creation": None, "latest_dag_version": { "bundle_name": "dag_maker", - "bundle_url": None, + "bundle_url": "http://test_host.github.com/tree/None/dags", "bundle_version": None, "created_at": mock.ANY, "dag_id": "test_dag2", diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 55db52a0e52..fd8662aea4f 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -1260,7 +1260,7 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint): update_extras=update_extras, task_instances=task_instances, ) - with mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.DagBundlesManager"): + with mock.patch("airflow.models.dag_version.DagBundlesManager"): # Mock DagBundlesManager to avoid checking if dags-folder bundle is configured response = test_client.get(url, params=params) if params == {"task_id_pattern": "task_match_id"}:
