yuseok89 commented on code in PR #67242:
URL: https://github.com/apache/airflow/pull/67242#discussion_r3320887501
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py:
##########
@@ -308,3 +311,45 @@ def get_latest_run_info(dag_id: str, session: SessionDep)
-> DAGRunLightResponse
latest_run_info = session.execute(latest_run_info_select).one_or_none()
return DAGRunLightResponse(**latest_run_info._mapping) if latest_run_info
else None
+
+
+@dags_router.get(
+ "/run_state_counts",
+ dependencies=[
+ Depends(requires_access_dag(method="GET")),
+ Depends(requires_access_dag(method="GET",
access_entity=DagAccessEntity.RUN)),
+ ],
+ operation_id="get_dag_run_state_counts_ui",
+)
+def get_dag_run_state_counts(
+ session: SessionDep,
+ readable_dags_filter: ReadableDagsFilterDep,
+ dag_ids: Annotated[list[str], Query(min_length=1)],
+ run_after_gte: datetime | None = None,
+) -> DAGsRunStateCountsCollectionResponse:
+ """Return per-Dag DagRun state counts (zero-filled) for the Dag list
page."""
+ permitted_dag_ids = readable_dags_filter.value or set()
+ requested_dag_ids = [dag_id for dag_id in dict.fromkeys(dag_ids) if dag_id
in permitted_dag_ids]
+ counts_by_dag: dict[str, dict[DagRunState, int]] = {
+ dag_id: {state: 0 for state in DagRunState} for dag_id in
requested_dag_ids
+ }
+
+ if requested_dag_ids:
+ count_query = (
+ select(DagRun.dag_id, DagRun.state, func.count().label("cnt"))
+ .where(DagRun.dag_id.in_(requested_dag_ids))
+ .group_by(DagRun.dag_id, DagRun.state)
+ )
+ if run_after_gte is not None:
+ count_query = count_query.where(DagRun.run_after >= run_after_gte)
+ for row in session.execute(count_query):
+ if row.state is None:
+ continue
+ counts_by_dag[row.dag_id][DagRunState(row.state)] = row.cnt
+
Review Comment:
@pierrejeambrun
Quick summary of what I found digging into the scalability question. Two
separate slow spots, both pre-existing and outside this PR.
1. Home page (historical_metrics).
It filters the time window on start_date and end_date, which are not indexed
on dag_run, and the coalesce wrapping is not sargable either. So under a time
window the per-state LIMIT cap never fills and it falls back to a near full
scan. The plan is to move the window to run_after, which is already indexed. I
will take this in a separate PR.
2. Dags list page.
The slow part is the existing recent-runs query, which ranks every run per
dag and then keeps only the top N, so it scales with the total run count (about
34s on 5M). The plan is to rewrite it as a per-dag ORDER BY run_after DESC
LIMIT N so it stops after N rows using the run_after index. This is also a
separate PR.
After those land I will re-measure this PR's state-count change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]