This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit deea370923c019bb3d91d8ef8fdb451f9b97ea31 Author: Pierre Jeambrun <[email protected]> AuthorDate: Thu Sep 18 02:50:51 2025 +0200 Fix UI stats endpoint (#55733) * Fix UI stats endpoint * Address PR comments (cherry picked from commit a30d72c64661caa352852aef9710f89fc2ba11ba) --- .../api_fastapi/core_api/routes/ui/dashboard.py | 32 +++++++++++++++------- .../core_api/routes/ui/test_dashboard.py | 30 +++++++++++++++++++- 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py index dbbada8cf39..26c1b6582d7 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py @@ -127,7 +127,18 @@ def dag_stats( .subquery() ) - latest_runs = ( + # Active Dags need another query from DagModel, as a Dag may not have any runs but still be active + active_count_query = ( + select(func.count()) + .select_from(DagModel) + .where(DagModel.is_stale == false()) + .where(DagModel.is_paused == false()) + .where(DagModel.dag_id.in_(permitted_dag_ids)) + ) + active_count = session.execute(active_count_query).scalar_one() + + # Other metrics are based on latest DagRun states + latest_runs_cte = ( select( DagModel.dag_id, DagModel.is_paused, @@ -139,21 +150,22 @@ def dag_stats( (DagRun.dag_id == latest_dates_subq.c.dag_id) & (DagRun.logical_date == latest_dates_subq.c.max_logical_date), ) + .where(DagModel.is_stale == false()) .where(DagRun.dag_id.in_(permitted_dag_ids)) .cte() ) + combined_runs_query = select( + func.coalesce(func.sum(case((latest_runs_cte.c.state == DagRunState.FAILED, 1))), 0).label("failed"), + func.coalesce(func.sum(case((latest_runs_cte.c.state == DagRunState.RUNNING, 1))), 0).label( + "running" + ), + func.coalesce(func.sum(case((latest_runs_cte.c.state == DagRunState.QUEUED, 1))), 0).label("queued"), + ).select_from(latest_runs_cte) - combined_query = select( - func.coalesce(func.sum(case((latest_runs.c.is_paused == false(), 1))), 0).label("active"), - func.coalesce(func.sum(case((latest_runs.c.state == DagRunState.FAILED, 1))), 0).label("failed"), - func.coalesce(func.sum(case((latest_runs.c.state == DagRunState.RUNNING, 1))), 0).label("running"), - func.coalesce(func.sum(case((latest_runs.c.state == DagRunState.QUEUED, 1))), 0).label("queued"), - ).select_from(latest_runs) - - counts = session.execute(combined_query).first() + counts = session.execute(combined_runs_query).first() return DashboardDagStatsResponse( - active_dag_count=counts.active, + active_dag_count=active_count, failed_dag_count=counts.failed, running_dag_count=counts.running, queued_dag_count=counts.queued, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py index 74aed76ff72..1c29990fc9c 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py @@ -22,6 +22,7 @@ from datetime import timedelta import pendulum import pytest +from airflow.models.dag import DagModel from airflow.models.dagbag import DBDagBag from airflow.providers.standard.operators.empty import EmptyOperator from airflow.utils.state import DagRunState, TaskInstanceState @@ -204,8 +205,35 @@ def make_multiple_dags(dag_maker, session): start_date=date, ) + with dag_maker( + dag_id="no_dag_runs", + serialized=True, + session=session, + start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC), + ): + EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2") + + with dag_maker( + dag_id="stale_dag", + serialized=True, + session=session, + start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC), + ): + EmptyOperator(task_id="task_1") >> EmptyOperator(task_id="task_2") + + date = dag_maker.dag.start_date + dag_maker.create_dagrun( + run_id="run_1", + state=DagRunState.QUEUED, + run_type=DagRunType.SCHEDULED, + logical_date=date, + start_date=date, + ) dag_maker.sync_dagbag_to_db() + session.get(DagModel, "stale_dag").is_stale = True + session.commit() + class TestHistoricalMetricsDataEndpoint: @pytest.mark.parametrize( @@ -304,7 +332,7 @@ class TestDagStatsEndpoint: response = test_client.get("/dashboard/dag_stats") assert response.status_code == 200 assert response.json() == { - "active_dag_count": 3, + "active_dag_count": 4, "failed_dag_count": 1, "running_dag_count": 1, "queued_dag_count": 1,
