This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit deea370923c019bb3d91d8ef8fdb451f9b97ea31
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Thu Sep 18 02:50:51 2025 +0200

    Fix UI stats endpoint (#55733)
    
    * Fix UI stats endpoint
    
    * Address PR comments
    
    (cherry picked from commit a30d72c64661caa352852aef9710f89fc2ba11ba)
---
 .../api_fastapi/core_api/routes/ui/dashboard.py    | 32 +++++++++++++++-------
 .../core_api/routes/ui/test_dashboard.py           | 30 +++++++++++++++++++-
 2 files changed, 51 insertions(+), 11 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py
index dbbada8cf39..26c1b6582d7 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py
@@ -127,7 +127,18 @@ def dag_stats(
         .subquery()
     )
 
-    latest_runs = (
+    # Active Dags need another query from DagModel, as a Dag may not have any 
runs but still be active
+    active_count_query = (
+        select(func.count())
+        .select_from(DagModel)
+        .where(DagModel.is_stale == false())
+        .where(DagModel.is_paused == false())
+        .where(DagModel.dag_id.in_(permitted_dag_ids))
+    )
+    active_count = session.execute(active_count_query).scalar_one()
+
+    # Other metrics are based on latest DagRun states
+    latest_runs_cte = (
         select(
             DagModel.dag_id,
             DagModel.is_paused,
@@ -139,21 +150,22 @@ def dag_stats(
             (DagRun.dag_id == latest_dates_subq.c.dag_id)
             & (DagRun.logical_date == latest_dates_subq.c.max_logical_date),
         )
+        .where(DagModel.is_stale == false())
         .where(DagRun.dag_id.in_(permitted_dag_ids))
         .cte()
     )
+    combined_runs_query = select(
+        func.coalesce(func.sum(case((latest_runs_cte.c.state == 
DagRunState.FAILED, 1))), 0).label("failed"),
+        func.coalesce(func.sum(case((latest_runs_cte.c.state == 
DagRunState.RUNNING, 1))), 0).label(
+            "running"
+        ),
+        func.coalesce(func.sum(case((latest_runs_cte.c.state == 
DagRunState.QUEUED, 1))), 0).label("queued"),
+    ).select_from(latest_runs_cte)
 
-    combined_query = select(
-        func.coalesce(func.sum(case((latest_runs.c.is_paused == false(), 1))), 
0).label("active"),
-        func.coalesce(func.sum(case((latest_runs.c.state == 
DagRunState.FAILED, 1))), 0).label("failed"),
-        func.coalesce(func.sum(case((latest_runs.c.state == 
DagRunState.RUNNING, 1))), 0).label("running"),
-        func.coalesce(func.sum(case((latest_runs.c.state == 
DagRunState.QUEUED, 1))), 0).label("queued"),
-    ).select_from(latest_runs)
-
-    counts = session.execute(combined_query).first()
+    counts = session.execute(combined_runs_query).first()
 
     return DashboardDagStatsResponse(
-        active_dag_count=counts.active,
+        active_dag_count=active_count,
         failed_dag_count=counts.failed,
         running_dag_count=counts.running,
         queued_dag_count=counts.queued,
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
index 74aed76ff72..1c29990fc9c 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
@@ -22,6 +22,7 @@ from datetime import timedelta
 import pendulum
 import pytest
 
+from airflow.models.dag import DagModel
 from airflow.models.dagbag import DBDagBag
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.utils.state import DagRunState, TaskInstanceState
@@ -204,8 +205,35 @@ def make_multiple_dags(dag_maker, session):
         start_date=date,
     )
 
+    with dag_maker(
+        dag_id="no_dag_runs",
+        serialized=True,
+        session=session,
+        start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC),
+    ):
+        EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2")
+
+    with dag_maker(
+        dag_id="stale_dag",
+        serialized=True,
+        session=session,
+        start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC),
+    ):
+        EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2")
+
+    date = dag_maker.dag.start_date
+    dag_maker.create_dagrun(
+        run_id="run_1",
+        state=DagRunState.QUEUED,
+        run_type=DagRunType.SCHEDULED,
+        logical_date=date,
+        start_date=date,
+    )
     dag_maker.sync_dagbag_to_db()
 
+    session.get(DagModel, "stale_dag").is_stale = True
+    session.commit()
+
 
 class TestHistoricalMetricsDataEndpoint:
     @pytest.mark.parametrize(
@@ -304,7 +332,7 @@ class TestDagStatsEndpoint:
         response = test_client.get("/dashboard/dag_stats")
         assert response.status_code == 200
         assert response.json() == {
-            "active_dag_count": 3,
+            "active_dag_count": 4,
             "failed_dag_count": 1,
             "running_dag_count": 1,
             "queued_dag_count": 1,

Reply via email to