yuseok89 commented on code in PR #67242:
URL: https://github.com/apache/airflow/pull/67242#discussion_r3320887501


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py:
##########
@@ -308,3 +311,45 @@ def get_latest_run_info(dag_id: str, session: SessionDep) 
-> DAGRunLightResponse
     latest_run_info = session.execute(latest_run_info_select).one_or_none()
 
     return DAGRunLightResponse(**latest_run_info._mapping) if latest_run_info 
else None
+
+
+@dags_router.get(
+    "/run_state_counts",
+    dependencies=[
+        Depends(requires_access_dag(method="GET")),
+        Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.RUN)),
+    ],
+    operation_id="get_dag_run_state_counts_ui",
+)
+def get_dag_run_state_counts(
+    session: SessionDep,
+    readable_dags_filter: ReadableDagsFilterDep,
+    dag_ids: Annotated[list[str], Query(min_length=1)],
+    run_after_gte: datetime | None = None,
+) -> DAGsRunStateCountsCollectionResponse:
+    """Return per-Dag DagRun state counts (zero-filled) for the Dag list 
page."""
+    permitted_dag_ids = readable_dags_filter.value or set()
+    requested_dag_ids = [dag_id for dag_id in dict.fromkeys(dag_ids) if dag_id 
in permitted_dag_ids]
+    counts_by_dag: dict[str, dict[DagRunState, int]] = {
+        dag_id: {state: 0 for state in DagRunState} for dag_id in 
requested_dag_ids
+    }
+
+    if requested_dag_ids:
+        count_query = (
+            select(DagRun.dag_id, DagRun.state, func.count().label("cnt"))
+            .where(DagRun.dag_id.in_(requested_dag_ids))
+            .group_by(DagRun.dag_id, DagRun.state)
+        )
+        if run_after_gte is not None:
+            count_query = count_query.where(DagRun.run_after >= run_after_gte)
+        for row in session.execute(count_query):
+            if row.state is None:
+                continue
+            counts_by_dag[row.dag_id][DagRunState(row.state)] = row.cnt
+

Review Comment:
   @pierrejeambrun 
   
   Quick summary of what I found digging into the scalability question. Two 
separate slow spots, both pre-existing and outside this PR.
   
   1. Home page (historical_metrics).
   It filters the time window on start_date and end_date, which are not indexed 
on dag_run, and the coalesce wrapping is not sargable either. So under a time 
window the per-state LIMIT cap never fills and it falls back to a near full 
scan. The plan is to move the window to run_after, which is already indexed. I 
will take this in a separate PR.
   2. Dags list page.
   The slow part is the existing recent-runs query, which ranks every run per 
dag and then keeps only the top N, so it scales with the total run count (about 
34s on 5M). The plan is to rewrite it as a per-dag ORDER BY run_after DESC 
LIMIT N so it stops after N rows using the run_after index. This is also a 
separate PR.
   
   After those land I will re-measure this PR's state-count change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to