This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f271f2bd052 Optimize DAG list query for users with limited access 
(#57460)
f271f2bd052 is described below

commit f271f2bd05210b33d1d8faf24b93943aff7fdc4c
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Oct 29 13:02:49 2025 +0000

    Optimize DAG list query for users with limited access (#57460)
    
    When users have limited DAG access, the DAG list query was inefficiently
    grouping all DagRuns in the database before filtering. This caused severe
    performance degradation in large deployments where a user might access
    only a few DAGs out of hundreds or thousands.
    
    The fix filters both the main DAG query and the DagRun subquery by
    accessible dag_ids before performing the expensive GROUP BY operation.
    
    Before (queries all dagruns):
    
    ```sql
      SELECT ... FROM dag
      LEFT OUTER JOIN (
        SELECT dag_run.dag_id, max(dag_run.id) AS max_dag_run_id
        FROM dag_run
        GROUP BY dag_run.dag_id
      ) AS mrq ON dag.dag_id = mrq.dag_id
    ```
    
    After (filters to accessible dags):
    
    ```sql
      SELECT ... FROM dag
      LEFT OUTER JOIN (
        SELECT dag_run.dag_id, max(dag_run.id) AS max_dag_run_id
        FROM dag_run
        WHERE dag_run.dag_id IN ('accessible_dag_1', 'accessible_dag_2')
        GROUP BY dag_run.dag_id
      ) AS mrq ON dag.dag_id = mrq.dag_id
      WHERE dag.dag_id IN ('accessible_dag_1', 'accessible_dag_2')
    ```
    
    Performance impact: In a deployment with 100 DAGs (100 runs each) where
    a user has access to only 2 DAGs, this reduces the subquery from grouping
    10,000 rows down to 200 rows (50x improvement), and eliminates fetching
    98 unnecessary DAG models.
    
    Fixes #57427
---
 .../src/airflow/api_fastapi/common/db/dags.py      | 28 +++++++--
 .../api_fastapi/core_api/routes/public/dags.py     |  1 +
 .../airflow/api_fastapi/core_api/routes/ui/dags.py |  1 +
 .../tests/unit/api_fastapi/common/db/test_dags.py  | 69 ++++++++++++++++++++++
 4 files changed, 93 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/common/db/dags.py 
b/airflow-core/src/airflow/api_fastapi/common/db/dags.py
index 7707f78d419..f1104f839e3 100644
--- a/airflow-core/src/airflow/api_fastapi/common/db/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/common/db/dags.py
@@ -33,14 +33,30 @@ if TYPE_CHECKING:
     from sqlalchemy.sql import Select
 
 
-def generate_dag_with_latest_run_query(max_run_filters: list[BaseParam], 
order_by: SortParam) -> Select:
+def generate_dag_with_latest_run_query(
+    max_run_filters: list[BaseParam], order_by: SortParam, *, dag_ids: 
set[str] | None = None
+) -> Select:
+    """
+    Generate a query to fetch DAGs with their latest run.
+
+    :param max_run_filters: List of filters to apply to the latest run
+    :param order_by: Sort parameter for ordering results
+    :param dag_ids: Optional set of DAG IDs to limit the query to. When 
provided, both the main
+        DAG query and the subquery for finding the latest runs will be 
filtered to
+        only these DAG IDs, improving performance when users have limited DAG 
access.
+    :return: SQLAlchemy Select statement
+    """
     query = select(DagModel).options(selectinload(DagModel.tags))
 
-    max_run_id_query = (  # ordering by id will not always be "latest run", 
but it's a simplifying assumption
-        select(DagRun.dag_id, func.max(DagRun.id).label("max_dag_run_id"))
-        .group_by(DagRun.dag_id)
-        .subquery(name="mrq")
-    )
+    # Filter main query by dag_ids if provided
+    if dag_ids is not None:
+        query = query.where(DagModel.dag_id.in_(dag_ids or set()))
+
+    # Also filter the subquery for finding latest runs
+    max_run_id_query_stmt = select(DagRun.dag_id, 
func.max(DagRun.id).label("max_dag_run_id"))
+    if dag_ids is not None:
+        max_run_id_query_stmt = 
max_run_id_query_stmt.where(DagRun.dag_id.in_(dag_ids or set()))
+    max_run_id_query = 
max_run_id_query_stmt.group_by(DagRun.dag_id).subquery(name="mrq")
 
     has_max_run_filter = False
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
index 5b3b154ce3c..228ddc3660d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
@@ -137,6 +137,7 @@ def get_dags(
             last_dag_run_state,
         ],
         order_by=order_by,
+        dag_ids=readable_dags_filter.value,
     )
 
     dags_select, total_entries = paginated_select(
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py
index e57b0096c4e..217389c9f14 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py
@@ -125,6 +125,7 @@ def get_dags(
             last_dag_run_state,
         ],
         order_by=order_by,
+        dag_ids=readable_dags_filter.value,
     )
 
     dags_select, total_entries = paginated_select(
diff --git a/airflow-core/tests/unit/api_fastapi/common/db/test_dags.py 
b/airflow-core/tests/unit/api_fastapi/common/db/test_dags.py
index c6128a93236..a0747747670 100644
--- a/airflow-core/tests/unit/api_fastapi/common/db/test_dags.py
+++ b/airflow-core/tests/unit/api_fastapi/common/db/test_dags.py
@@ -273,3 +273,72 @@ class TestGenerateDagWithLatestRunQuery:
             "This suggests the WHERE start_date IS NOT NULL condition is 
excluding it."
         )
         assert running_dagrun_state is not None, "Running DAG should have 
DagRun state joined"
+
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def test_filters_by_dag_ids_when_provided(self, session):
+        """
+        Verify that when dag_ids is provided, only those DAGs and their runs 
are queried.
+
+        This is a performance optimization: both the main DAG query and the 
DagRun subquery
+        should only process accessible DAGs when the user has limited access.
+        """
+        dag_ids = ["dag_accessible_1", "dag_accessible_2", 
"dag_inaccessible_3"]
+
+        for dag_id in dag_ids:
+            dag_model = DagModel(
+                dag_id=dag_id,
+                bundle_name="testing",
+                is_stale=False,
+                is_paused=False,
+                fileloc=f"/tmp/{dag_id}.py",
+            )
+            session.add(dag_model)
+            session.flush()
+
+            # Create 2 runs for each DAG
+            for run_idx in range(2):
+                dagrun = DagRun(
+                    dag_id=dag_id,
+                    run_id=f"manual__{run_idx}",
+                    run_type="manual",
+                    logical_date=datetime(2024, 1, 1 + run_idx, 
tzinfo=timezone.utc),
+                    state=DagRunState.SUCCESS,
+                    start_date=datetime(2024, 1, 1 + run_idx, 1, 
tzinfo=timezone.utc),
+                )
+                session.add(dagrun)
+        session.commit()
+
+        # User has access to only 2 DAGs
+        accessible_dag_ids = {"dag_accessible_1", "dag_accessible_2"}
+
+        # Query with dag_ids filter
+        query_filtered = generate_dag_with_latest_run_query(
+            max_run_filters=[],
+            order_by=SortParam(allowed_attrs=["last_run_state"], 
model=DagModel).set_value(
+                ["last_run_state"]
+            ),
+            dag_ids=accessible_dag_ids,
+        )
+
+        # Query without dag_ids filter
+        query_unfiltered = generate_dag_with_latest_run_query(
+            max_run_filters=[],
+            order_by=SortParam(allowed_attrs=["last_run_state"], 
model=DagModel).set_value(
+                ["last_run_state"]
+            ),
+        )
+
+        result_filtered = 
session.execute(query_filtered.add_columns(DagRun.state)).fetchall()
+        result_unfiltered = 
session.execute(query_unfiltered.add_columns(DagRun.state)).fetchall()
+
+        # Filtered query should only return accessible DAGs
+        filtered_dag_ids = {row[0].dag_id for row in result_filtered}
+        assert filtered_dag_ids == accessible_dag_ids
+
+        # Unfiltered query returns all DAGs
+        unfiltered_dag_ids = {row[0].dag_id for row in result_unfiltered}
+        assert unfiltered_dag_ids == set(dag_ids)
+
+        # All accessible DAGs should have DagRun info
+        filtered_dags_with_runs = {row[0].dag_id for row in result_filtered if 
row[1] is not None}
+        assert filtered_dags_with_runs == accessible_dag_ids

Reply via email to