This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fb9dea50dea Add number of queries guard in public dag runs list
endpoints (#57450)
fb9dea50dea is described below
commit fb9dea50dea1b5438cb7e9607bea87caedbf08cc
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Fri Oct 31 19:25:56 2025 +0100
Add number of queries guard in public dag runs list endpoints (#57450)
* WIP to revert
* Fix batch endpoint
* Fix CI
* Address review comments
* Fix CI
* Address PR comments
---
.../src/airflow/api_fastapi/common/db/dag_runs.py | 19 +++++++++++
.../core_api/datamodels/dag_versions.py | 28 ++--------------
.../api_fastapi/core_api/datamodels/dags.py | 4 ++-
.../api_fastapi/core_api/openapi/_private_ui.yaml | 1 -
.../core_api/openapi/v2-rest-api-generated.yaml | 1 -
.../api_fastapi/core_api/routes/public/dag_run.py | 13 +++++---
airflow-core/src/airflow/models/dag_version.py | 39 ++++++++++++++++++++--
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 3 +-
.../airflow/ui/openapi-gen/requests/types.gen.ts | 2 +-
.../core_api/routes/public/test_dag_run.py | 25 ++++++++++----
.../core_api/routes/public/test_dag_versions.py | 10 ++----
.../core_api/routes/public/test_dags.py | 2 +-
.../core_api/routes/public/test_task_instances.py | 3 +-
13 files changed, 96 insertions(+), 54 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
index 2efb6967653..f68f3ab2186 100644
--- a/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/common/db/dag_runs.py
@@ -20,10 +20,15 @@ from __future__ import annotations
from typing import cast
from sqlalchemy import func, select
+from sqlalchemy.orm import joinedload, selectinload
+from sqlalchemy.orm.interfaces import LoaderOption
from sqlalchemy.sql import ColumnElement
from airflow.models.dag import DagModel
+from airflow.models.dag_version import DagVersion
from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
+from airflow.models.taskinstancehistory import TaskInstanceHistory
dagruns_select_with_state_count = (
select(
@@ -36,3 +41,17 @@ dagruns_select_with_state_count = (
.group_by(DagRun.dag_id, DagRun.state, DagModel.dag_display_name)
.order_by(DagRun.dag_id)
)
+
+
+def eager_load_dag_run_for_validation() -> tuple[LoaderOption, ...]:
+ """Construct the eager loading options necessary for a DagRunResponse
object."""
+ return (
+ joinedload(DagRun.dag_model),
+ selectinload(DagRun.task_instances)
+ .joinedload(TaskInstance.dag_version)
+ .joinedload(DagVersion.bundle),
+ selectinload(DagRun.task_instances_histories)
+ .joinedload(TaskInstanceHistory.dag_version)
+ .joinedload(DagVersion.bundle),
+ joinedload(DagRun.dag_run_note),
+ )
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
index 2475d49031f..ac0ebf56f44 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_versions.py
@@ -19,11 +19,9 @@ from __future__ import annotations
from datetime import datetime
from uuid import UUID
-from pydantic import AliasPath, Field, computed_field
-from sqlalchemy import select
+from pydantic import AliasPath, Field
from airflow.api_fastapi.core_api.base import BaseModel
-from airflow.dag_processing.bundles.manager import DagBundlesManager
class DagVersionResponse(BaseModel):
@@ -37,29 +35,7 @@ class DagVersionResponse(BaseModel):
created_at: datetime
dag_display_name: str = Field(validation_alias=AliasPath("dag_model",
"dag_display_name"))
- # Mypy issue https://github.com/python/mypy/issues/1362
- @computed_field # type: ignore[prop-decorator]
- @property
- def bundle_url(self) -> str | None:
- if self.bundle_name:
- # Get the bundle model from the database and render the URL
- from airflow.models.dagbundle import DagBundleModel
- from airflow.utils.session import create_session
-
- with create_session() as session:
- bundle_model = session.scalar(
- select(DagBundleModel).where(DagBundleModel.name ==
self.bundle_name)
- )
-
- if bundle_model and hasattr(bundle_model,
"signed_url_template"):
- return bundle_model.render_url(self.bundle_version)
- # fallback to the deprecated option if the bundle model does
not have a signed_url_template
- # attribute
- try:
- return DagBundlesManager().view_url(self.bundle_name,
self.bundle_version)
- except ValueError:
- return None
- return None
+ bundle_url: str | None = Field(validation_alias="bundle_url")
class DAGVersionCollectionResponse(BaseModel):
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
index 1fea41919ff..7a32ca6bb89 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
@@ -202,7 +202,9 @@ class DAGDetailsResponse(DAGResponse):
@property
def latest_dag_version(self) -> DagVersionResponse | None:
"""Return the latest DagVersion."""
- latest_dag_version = DagVersion.get_latest_version(self.dag_id,
load_dag_model=True)
+ latest_dag_version = DagVersion.get_latest_version(
+ self.dag_id, load_dag_model=True, load_bundle_model=True
+ )
if latest_dag_version is None:
return latest_dag_version
return DagVersionResponse.model_validate(latest_dag_version)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index b85d7646390..4061b761003 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -1745,7 +1745,6 @@ components:
- type: string
- type: 'null'
title: Bundle Url
- readOnly: true
type: object
required:
- id
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index eae45b1f1c1..28b0291c1f7 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -10846,7 +10846,6 @@ components:
- type: string
- type: 'null'
title: Bundle Url
- readOnly: true
type: object
required:
- id
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index f6c1f016604..9c070da17d9 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -36,6 +36,7 @@ from airflow.api.common.mark_tasks import (
from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run,
get_latest_version_of_dag
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.db.dag_runs import
eager_load_dag_run_for_validation
from airflow.api_fastapi.common.parameters import (
FilterOptionEnum,
FilterParam,
@@ -82,6 +83,7 @@ from airflow.api_fastapi.core_api.services.public.dag_run
import DagRunWaiter
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagModel, DagRun
+from airflow.models.asset import AssetEvent
from airflow.models.dag_version import DagVersion
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -235,10 +237,12 @@ def get_upstream_asset_events(
) -> AssetEventCollectionResponse:
"""If dag run is asset-triggered, return the asset events that triggered
it."""
dag_run: DagRun | None = session.scalar(
- select(DagRun).where(
+ select(DagRun)
+ .where(
DagRun.dag_id == dag_id,
DagRun.run_id == dag_run_id,
)
+
.options(joinedload(DagRun.consumed_asset_events).joinedload(AssetEvent.asset))
)
if dag_run is None:
raise HTTPException(
@@ -368,11 +372,11 @@ def get_dag_runs(
This endpoint allows specifying `~` as the dag_id to retrieve Dag Runs for
all DAGs.
"""
- query = select(DagRun)
+ query = select(DagRun).options(*eager_load_dag_run_for_validation())
if dag_id != "~":
get_latest_version_of_dag(dag_bag, dag_id, session) # Check if the
DAG exists.
- query = query.filter(DagRun.dag_id ==
dag_id).options(joinedload(DagRun.dag_model))
+ query = query.filter(DagRun.dag_id == dag_id).options()
# Add join with DagVersion if dag_version filter is active
if dag_version.value:
@@ -607,7 +611,8 @@ def get_list_dag_runs_batch(
{"dag_run_id": "run_id"},
).set_value([body.order_by] if body.order_by else None)
- base_query = select(DagRun).options(joinedload(DagRun.dag_model))
+ base_query = select(DagRun).options(*eager_load_dag_run_for_validation())
+
dag_runs_select, total_entries = paginated_select(
statement=base_query,
filters=[
diff --git a/airflow-core/src/airflow/models/dag_version.py
b/airflow-core/src/airflow/models/dag_version.py
index fbc04f9171e..49768449732 100644
--- a/airflow-core/src/airflow/models/dag_version.py
+++ b/airflow-core/src/airflow/models/dag_version.py
@@ -27,6 +27,7 @@ from sqlalchemy.orm import Mapped, joinedload, relationship
from sqlalchemy_utils import UUIDType
from airflow._shared.timezones import timezone
+from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.models.base import Base, StringID
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime, mapped_column, with_row_locks
@@ -51,6 +52,12 @@ class DagVersion(Base):
dag_model = relationship("DagModel", back_populates="dag_versions")
bundle_name: Mapped[str | None] = mapped_column(StringID(), nullable=True)
bundle_version: Mapped[str | None] = mapped_column(StringID(),
nullable=True)
+ bundle = relationship(
+ "DagBundleModel",
+ primaryjoin="foreign(DagVersion.bundle_name) == DagBundleModel.name",
+ uselist=False,
+ viewonly=True,
+ )
dag_code = relationship(
"DagCode",
back_populates="dag_version",
@@ -79,6 +86,20 @@ class DagVersion(Base):
"""Represent the object as a string."""
return f"<DagVersion {self.dag_id} {self.version}>"
+ @property
+ def bundle_url(self) -> str | None:
+ """Render the bundle URL using the joined bundle metadata if
available."""
+ # Prefer using the joined bundle relationship when present to avoid
extra queries
+ if getattr(self, "bundle", None) is not None and hasattr(self.bundle,
"signed_url_template"):
+ return self.bundle.render_url(self.bundle_version)
+
+ # fallback to the deprecated option if the bundle model does not have
a signed_url_template
+ # attribute
+ try:
+ return DagBundlesManager().view_url(self.bundle_name,
self.bundle_version)
+ except ValueError:
+ return None
+
@classmethod
@provide_session
def write_dag(
@@ -120,7 +141,11 @@ class DagVersion(Base):
@classmethod
def _latest_version_select(
- cls, dag_id: str, bundle_version: str | None = None, load_dag_model:
bool = False
+ cls,
+ dag_id: str,
+ bundle_version: str | None = None,
+ load_dag_model: bool = False,
+ load_bundle_model: bool = False,
) -> Select:
"""
Get the select object to get the latest version of the DAG.
@@ -135,6 +160,9 @@ class DagVersion(Base):
if load_dag_model:
query = query.options(joinedload(cls.dag_model))
+ if load_bundle_model:
+ query = query.options(joinedload(cls.bundle))
+
query = query.order_by(cls.created_at.desc()).limit(1)
return query
@@ -146,6 +174,7 @@ class DagVersion(Base):
*,
bundle_version: str | None = None,
load_dag_model: bool = False,
+ load_bundle_model: bool = False,
session: Session = NEW_SESSION,
) -> DagVersion | None:
"""
@@ -154,10 +183,16 @@ class DagVersion(Base):
:param dag_id: The DAG ID.
:param session: The database session.
:param load_dag_model: Whether to load the DAG model.
+ :param load_bundle_model: Whether to load the DagBundle model.
:return: The latest version of the DAG or None if not found.
"""
return session.scalar(
- cls._latest_version_select(dag_id, bundle_version=bundle_version,
load_dag_model=load_dag_model)
+ cls._latest_version_select(
+ dag_id,
+ bundle_version=bundle_version,
+ load_dag_model=load_dag_model,
+ load_bundle_model=load_bundle_model,
+ )
)
@classmethod
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 7a8898b1b72..1da1f6f8826 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -3325,8 +3325,7 @@ export const $DagVersionResponse = {
type: 'null'
}
],
- title: 'Bundle Url',
- readOnly: true
+ title: 'Bundle Url'
}
},
type: 'object',
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index efd3f0406d3..04b7ff436cd 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -858,7 +858,7 @@ export type DagVersionResponse = {
bundle_version: string | null;
created_at: string;
dag_display_name: string;
- readonly bundle_url: string | null;
+ bundle_url: string | null;
};
/**
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 40e75998b73..3f8ddbe581a 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -39,6 +39,7 @@ from airflow.utils.state import DagRunState, State
from airflow.utils.types import DagRunTriggeredByType, DagRunType
from tests_common.test_utils.api_fastapi import _check_dag_run_note,
_check_last_log
+from tests_common.test_utils.asserts import assert_queries_count
from tests_common.test_utils.db import (
clear_db_connections,
clear_db_dag_bundles,
@@ -327,7 +328,10 @@ class TestGetDagRun:
class TestGetDagRuns:
- @pytest.mark.parametrize("dag_id, total_entries", [(DAG1_ID, 2), (DAG2_ID,
2), ("~", 4)])
+ @pytest.mark.parametrize(
+ "dag_id, total_entries",
+ [(DAG1_ID, 2), (DAG2_ID, 2), ("~", 4)],
+ )
@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_get_dag_runs(self, test_client, session, dag_id, total_entries):
response = test_client.get(f"/dags/{dag_id}/dagRuns")
@@ -384,7 +388,10 @@ class TestGetDagRuns:
@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_return_correct_results_with_order_by(self, test_client, order_by,
expected_order):
# Test ascending order
- response = test_client.get("/dags/test_dag1/dagRuns",
params={"order_by": order_by})
+
+ with assert_queries_count(7):
+ response = test_client.get("/dags/test_dag1/dagRuns",
params={"order_by": order_by})
+
assert response.status_code == 200
body = response.json()
assert body["total_entries"] == 2
@@ -807,7 +814,8 @@ class TestGetDagRuns:
class TestListDagRunsBatch:
@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_list_dag_runs_return_200(self, test_client, session):
- response = test_client.post("/dags/~/dagRuns/list", json={})
+ with assert_queries_count(5):
+ response = test_client.post("/dags/~/dagRuns/list", json={})
assert response.status_code == 200
body = response.json()
assert body["total_entries"] == 4
@@ -848,7 +856,8 @@ class TestListDagRunsBatch:
)
@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_list_dag_runs_with_dag_ids_filter(self, test_client, dag_ids,
status_code, expected_dag_id_list):
- response = test_client.post("/dags/~/dagRuns/list", json={"dag_ids":
dag_ids})
+ with assert_queries_count(5):
+ response = test_client.post("/dags/~/dagRuns/list",
json={"dag_ids": dag_ids})
assert response.status_code == status_code
assert set([each["dag_run_id"] for each in
response.json()["dag_runs"]]) == set(expected_dag_id_list)
@@ -1352,9 +1361,10 @@ class TestGetDagRunAssetTriggerEvents:
session.commit()
assert event.timestamp
- response = test_client.get(
- "/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamAssetEvents",
- )
+ with assert_queries_count(3):
+ response = test_client.get(
+
"/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamAssetEvents",
+ )
assert response.status_code == 200
expected_response = {
"asset_events": [
@@ -1563,6 +1573,7 @@ class TestTriggerDagRun:
run = (
session.query(DagRun).where(DagRun.dag_id == DAG1_ID,
DagRun.run_id == expected_dag_run_id).one()
)
+
expected_response_json = {
"bundle_version": None,
"conf": {},
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py
index 287c33d7717..50307f8502d 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_versions.py
@@ -104,9 +104,7 @@ class TestGetDagVersion(TestDagVersionEndpoint):
],
)
@pytest.mark.usefixtures("make_dag_with_multiple_versions")
- @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr")
- def test_get_dag_version(self, mock_hasattr, test_client, dag_id,
dag_version, expected_response):
- mock_hasattr.return_value = False
+ def test_get_dag_version(self, test_client, dag_id, dag_version,
expected_response):
response = test_client.get(f"/dags/{dag_id}/dagVersions/{dag_version}")
assert response.status_code == 200
assert response.json() == expected_response
@@ -180,7 +178,7 @@ class TestGetDagVersion(TestDagVersionEndpoint):
@pytest.mark.usefixtures("make_dag_with_multiple_versions")
@mock.patch("airflow.dag_processing.bundles.manager.DagBundlesManager.view_url")
- @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr")
+ @mock.patch("airflow.models.dag_version.hasattr")
def test_get_dag_version_with_unconfigured_bundle(
self, mock_hasattr, mock_view_url, test_client, dag_maker, session
):
@@ -305,9 +303,7 @@ class TestGetDagVersions(TestDagVersionEndpoint):
],
)
@pytest.mark.usefixtures("make_dag_with_multiple_versions")
- @mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr")
- def test_get_dag_versions(self, mock_hasattr, test_client, dag_id,
expected_response):
- mock_hasattr.return_value = False
+ def test_get_dag_versions(self, test_client, dag_id, expected_response):
response = test_client.get(f"/dags/{dag_id}/dagVersions")
assert response.status_code == 200
assert response.json() == expected_response
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
index cb59ec3ec43..198c0fa5d3f 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
@@ -907,7 +907,7 @@ class TestDagDetails(TestDagEndpoint):
"is_paused_upon_creation": None,
"latest_dag_version": {
"bundle_name": "dag_maker",
- "bundle_url": None,
+ "bundle_url": "http://test_host.github.com/tree/None/dags",
"bundle_version": None,
"created_at": mock.ANY,
"dag_id": "test_dag2",
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index d92ecf559a6..6a49c9fb63b 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -1304,7 +1304,8 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint):
update_extras=update_extras,
task_instances=task_instances,
)
- with
mock.patch("airflow.api_fastapi.core_api.datamodels.dag_versions.DagBundlesManager"):
+ with mock.patch("airflow.models.dag_version.DagBundlesManager") as
dag_bundle_manager_mock:
+ dag_bundle_manager_mock.return_value.view_url.return_value =
"some_url"
# Mock DagBundlesManager to avoid checking if dags-folder bundle
is configured
response = test_client.get(url, params=params)
if params == {"task_id_pattern": "task_match_id"}: