This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 10589ca7ca9 Flatten grid structure endpoint memory consumption
(#61273) (#61393)
10589ca7ca9 is described below
commit 10589ca7ca95a6dde5b5785002549aa489862b5c
Author: Rahul Vats <[email protected]>
AuthorDate: Tue Feb 3 19:56:13 2026 +0530
Flatten grid structure endpoint memory consumption (#61273) (#61393)
The grid structure endpoint was loading all serdags for the shown
dagruns into memory at once, before merging them together.
Now, we load 5 at a time and also expunge so they can be gc'd more
quickly.
(cherry picked from commit 40f6ec1c6021f242e80e010043273d2a4cbd4887)
---
.../airflow/api_fastapi/core_api/routes/ui/grid.py | 32 +++++++++++++++-------
1 file changed, 22 insertions(+), 10 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index 0379bd957ca..a95bb5c7f56 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -135,6 +135,8 @@ def get_dag_structure(
"""Return dag structure for grid view."""
latest_serdag = _get_latest_serdag(dag_id, session)
latest_dag = latest_serdag.dag
+ latest_serdag_id = latest_serdag.id
+ session.expunge(latest_serdag) # allow GC of serdag; only latest_dag is
needed from here
# Retrieve, sort the previous DAG Runs
base_query = select(DagRun.id).where(DagRun.dag_id == dag_id)
@@ -159,12 +161,22 @@ def get_dag_structure(
nodes = [task_group_to_dict_grid(x) for x in
task_group_sort(latest_dag.task_group)]
return [GridNodeResponse(**n) for n in nodes]
- serdags = session.scalars(
- select(SerializedDagModel).where(
+ # Process and merge the latest serdag first
+ merged_nodes: list[dict[str, Any]] = []
+ nodes = [task_group_to_dict_grid(x) for x in
task_group_sort(latest_dag.task_group)]
+ _merge_node_dicts(merged_nodes, nodes)
+ del latest_dag
+
+ # Process serdags one by one and merge immediately to reduce memory usage.
+ # Use yield_per() for streaming results and expunge each serdag after
processing
+ # to allow garbage collection and prevent memory buildup in the session
identity map.
+ serdags_query = (
+ select(SerializedDagModel)
+ .where(
# Even though dag_id is filtered in base_query,
# adding this line here can improve the performance of this
endpoint
SerializedDagModel.dag_id == dag_id,
- SerializedDagModel.id != latest_serdag.id,
+ SerializedDagModel.id != latest_serdag_id,
SerializedDagModel.dag_version_id.in_(
select(TaskInstance.dag_version_id)
.join(TaskInstance.dag_run)
@@ -174,16 +186,16 @@ def get_dag_structure(
.distinct()
),
)
+ .execution_options(yield_per=5) # balance between peak memory usage
and round trips
)
- merged_nodes: list[dict[str, Any]] = []
- dags = [latest_dag]
- for serdag in serdags:
- if serdag:
- dags.append(serdag.dag)
- for dag in dags:
- nodes = [task_group_to_dict_grid(x) for x in
task_group_sort(dag.task_group)]
+
+ for serdag in session.scalars(serdags_query):
+ # Merge immediately instead of collecting all DAGs in memory
+ nodes = [task_group_to_dict_grid(x) for x in
task_group_sort(serdag.dag.task_group)]
_merge_node_dicts(merged_nodes, nodes)
+ session.expunge(serdag) # to allow garbage collection
+
# Ensure historical tasks (e.g. removed) that exist in TIs for the
selected runs are represented
def _collect_ids(nodes: list[dict[str, Any]]) -> set[str]:
ids: set[str] = set()