This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 19045270ad1 Fix n+1 query to fetch tags in the dags list page (#57270)
(#57570)
19045270ad1 is described below
commit 19045270ad1f48770e19a8b4b9051344a347ac79
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Thu Oct 30 21:43:14 2025 +0100
Fix n+1 query to fetch tags in the dags list page (#57270) (#57570)
(cherry picked from commit 5a28c4499da3057d1bda91e7e6f699dde1f5e1f1)
Co-authored-by: Kaxil Naik <[email protected]>
---
.../src/airflow/api_fastapi/common/db/dags.py | 3 +-
.../core_api/routes/public/test_dags.py | 66 +++++++++++++++++++++
.../api_fastapi/core_api/routes/ui/test_dags.py | 67 ++++++++++++++++++++++
3 files changed, 135 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/api_fastapi/common/db/dags.py
b/airflow-core/src/airflow/api_fastapi/common/db/dags.py
index cefce662798..7707f78d419 100644
--- a/airflow-core/src/airflow/api_fastapi/common/db/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/common/db/dags.py
@@ -20,6 +20,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import func, select
+from sqlalchemy.orm import selectinload
from airflow.api_fastapi.common.db.common import (
apply_filters_to_select,
@@ -33,7 +34,7 @@ if TYPE_CHECKING:
def generate_dag_with_latest_run_query(max_run_filters: list[BaseParam],
order_by: SortParam) -> Select:
- query = select(DagModel)
+ query = select(DagModel).options(selectinload(DagModel.tags))
max_run_id_query = ( # ordering by id will not always be "latest run",
but it's a simplifying assumption
select(DagRun.dag_id, func.max(DagRun.id).label("max_dag_run_id"))
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
index 7633cb94d03..96fa5e3e673 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
@@ -31,6 +31,7 @@ from airflow.providers.standard.operators.empty import
EmptyOperator
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
+from tests_common.test_utils.asserts import count_queries
from tests_common.test_utils.db import (
clear_db_assets,
clear_db_connections,
@@ -524,6 +525,71 @@ class TestGetDags(TestDagEndpoint):
assert body["total_entries"] == 1
assert [dag["dag_id"] for dag in body["dags"]] == expected_ids
+ def test_get_dags_no_n_plus_one_queries(self, session, test_client):
+ """Test that fetching DAGs with tags doesn't trigger n+1 queries."""
+ num_dags = 5
+ for i in range(num_dags):
+ dag_id = f"test_dag_queries_{i}"
+ dag_model = DagModel(
+ dag_id=dag_id,
+ bundle_name="dag_maker",
+ fileloc=f"/tmp/{dag_id}.py",
+ is_stale=False,
+ )
+ session.add(dag_model)
+ session.flush()
+
+ for j in range(3):
+ tag = DagTag(name=f"tag_{i}_{j}", dag_id=dag_id)
+ session.add(tag)
+
+ session.commit()
+ session.expire_all()
+
+ with count_queries() as result:
+ response = test_client.get("/dags", params={"limit": 10})
+
+ assert response.status_code == 200
+ body = response.json()
+ dags_with_our_prefix = [d for d in body["dags"] if
d["dag_id"].startswith("test_dag_queries_")]
+ assert len(dags_with_our_prefix) == num_dags
+ for dag in dags_with_our_prefix:
+ assert len(dag["tags"]) == 3
+
+ first_query_count = sum(result.values())
+
+ # Add more DAGs and verify query count doesn't scale linearly
+ for i in range(num_dags, num_dags + 3):
+ dag_id = f"test_dag_queries_{i}"
+ dag_model = DagModel(
+ dag_id=dag_id,
+ bundle_name="dag_maker",
+ fileloc=f"/tmp/{dag_id}.py",
+ is_stale=False,
+ )
+ session.add(dag_model)
+ session.flush()
+
+ for j in range(3):
+ tag = DagTag(name=f"tag_{i}_{j}", dag_id=dag_id)
+ session.add(tag)
+
+ session.commit()
+ session.expire_all()
+
+ with count_queries() as result2:
+ response = test_client.get("/dags", params={"limit": 15})
+
+ assert response.status_code == 200
+ second_query_count = sum(result2.values())
+
+ # With n+1, adding 3 DAGs would add ~3 tag queries
+ # Without n+1, query count should remain nearly identical
+ assert second_query_count - first_query_count < 3, (
+ f"Added 3 DAGs but query count increased by {second_query_count -
first_query_count} "
+ f"({first_query_count} → {second_query_count}), suggesting n+1
queries for tags"
+ )
+
class TestPatchDag(TestDagEndpoint):
"""Unit tests for Patch DAG."""
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py
index 2134a565ffc..328643362a1 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py
@@ -26,6 +26,7 @@ from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from airflow.models import DagRun
+from airflow.models.dag import DagModel, DagTag
from airflow.models.dag_favorite import DagFavorite
from airflow.models.hitl import HITLDetail
from airflow.sdk.timezone import utcnow
@@ -33,6 +34,7 @@ from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
+from tests_common.test_utils.asserts import count_queries
from unit.api_fastapi.core_api.routes.public.test_dags import (
DAG1_ID,
DAG2_ID,
@@ -233,6 +235,71 @@ class TestGetDagRuns(TestPublicDagEndpoint):
response = unauthorized_test_client.get("/dags", params={})
assert response.status_code == 403
+ def test_get_dags_no_n_plus_one_queries(self, session, test_client):
+ """Test that fetching DAGs with tags doesn't trigger n+1 queries."""
+ num_dags = 5
+ for i in range(num_dags):
+ dag_id = f"test_dag_queries_ui_{i}"
+ dag_model = DagModel(
+ dag_id=dag_id,
+ bundle_name="dag_maker",
+ fileloc=f"/tmp/{dag_id}.py",
+ is_stale=False,
+ )
+ session.add(dag_model)
+ session.flush()
+
+ for j in range(3):
+ tag = DagTag(name=f"tag_ui_{i}_{j}", dag_id=dag_id)
+ session.add(tag)
+
+ session.commit()
+ session.expire_all()
+
+ with count_queries() as result:
+ response = test_client.get("/dags", params={"limit": 10})
+
+ assert response.status_code == 200
+ body = response.json()
+ dags_with_our_prefix = [d for d in body["dags"] if
d["dag_id"].startswith("test_dag_queries_ui_")]
+ assert len(dags_with_our_prefix) == num_dags
+ for dag in dags_with_our_prefix:
+ assert len(dag["tags"]) == 3
+
+ first_query_count = sum(result.values())
+
+ # Add more DAGs and verify query count doesn't scale linearly
+ for i in range(num_dags, num_dags + 3):
+ dag_id = f"test_dag_queries_ui_{i}"
+ dag_model = DagModel(
+ dag_id=dag_id,
+ bundle_name="dag_maker",
+ fileloc=f"/tmp/{dag_id}.py",
+ is_stale=False,
+ )
+ session.add(dag_model)
+ session.flush()
+
+ for j in range(3):
+ tag = DagTag(name=f"tag_ui_{i}_{j}", dag_id=dag_id)
+ session.add(tag)
+
+ session.commit()
+ session.expire_all()
+
+ with count_queries() as result2:
+ response = test_client.get("/dags", params={"limit": 15})
+
+ assert response.status_code == 200
+ second_query_count = sum(result2.values())
+
+ # With n+1, adding 3 DAGs would add ~3 tag queries
+ # Without n+1, query count should remain nearly identical
+ assert second_query_count - first_query_count < 3, (
+ f"Added 3 DAGs but query count increased by {second_query_count -
first_query_count} "
+ f"({first_query_count} → {second_query_count}), suggesting n+1
queries for tags"
+ )
+
@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_latest_run_should_return_200(self, test_client):
response = test_client.get(f"/dags/{DAG1_ID}/latest_run")