This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 40f6ec1c602 Flatten grid structure endpoint memory consumption (#61273)
40f6ec1c602 is described below

commit 40f6ec1c6021f242e80e010043273d2a4cbd4887
Author: Jed Cunningham <[email protected]>
AuthorDate: Sun Feb 1 07:18:46 2026 -0700

    Flatten grid structure endpoint memory consumption (#61273)
    
    The grid structure endpoint was loading all serdags for the shown
    dagruns into memory at once, before merging them together.
    
    Now, we load 5 at a time and also expunge so they can be gc'd more
    quickly.
---
 .../airflow/api_fastapi/core_api/routes/ui/grid.py | 50 ++++++++++++++--------
 1 file changed, 31 insertions(+), 19 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index f52c3e7f75c..0f061933b0d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -143,6 +143,8 @@ def get_dag_structure(
     """Return dag structure for grid view."""
     latest_serdag = _get_latest_serdag(dag_id, session)
     latest_dag = latest_serdag.dag
+    latest_serdag_id = latest_serdag.id
+    session.expunge(latest_serdag)  # allow GC of serdag; only latest_dag is 
needed from here
 
     # Apply filtering if root task is specified
     if root:
@@ -176,12 +178,22 @@ def get_dag_structure(
         nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(latest_dag.task_group)]
         return [GridNodeResponse(**n) for n in nodes]
 
-    serdags = session.scalars(
-        select(SerializedDagModel).where(
+    # Process and merge the latest serdag first
+    merged_nodes: list[dict[str, Any]] = []
+    nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(latest_dag.task_group)]
+    _merge_node_dicts(merged_nodes, nodes)
+    del latest_dag
+
+    # Process serdags one by one and merge immediately to reduce memory usage.
+    # Use yield_per() for streaming results and expunge each serdag after 
processing
+    # to allow garbage collection and prevent memory buildup in the session 
identity map.
+    serdags_query = (
+        select(SerializedDagModel)
+        .where(
             # Even though dag_id is filtered in base_query,
             # adding this line here can improve the performance of this 
endpoint
             SerializedDagModel.dag_id == dag_id,
-            SerializedDagModel.id != latest_serdag.id,
+            SerializedDagModel.id != latest_serdag_id,
             SerializedDagModel.dag_version_id.in_(
                 select(TaskInstance.dag_version_id)
                 .join(TaskInstance.dag_run)
@@ -191,25 +203,25 @@ def get_dag_structure(
                 .distinct()
             ),
         )
+        .execution_options(yield_per=5)  # balance between peak memory usage 
and round trips
     )
-    merged_nodes: list[dict[str, Any]] = []
-    dags = [latest_dag]
-    for serdag in serdags:
-        if serdag:
-            filtered_dag = serdag.dag
-            # Apply the same filtering to historical DAG versions
-            if root:
-                filtered_dag = filtered_dag.partial_subset(
-                    task_ids=root,
-                    include_upstream=include_upstream,
-                    include_downstream=include_downstream,
-                    depth=depth,
-                )
-            dags.append(filtered_dag)
-    for dag in dags:
-        nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(dag.task_group)]
+
+    for serdag in session.scalars(serdags_query):
+        filtered_dag = serdag.dag
+        # Apply the same filtering to historical DAG versions
+        if root:
+            filtered_dag = filtered_dag.partial_subset(
+                task_ids=root,
+                include_upstream=include_upstream,
+                include_downstream=include_downstream,
+                depth=depth,
+            )
+        # Merge immediately instead of collecting all DAGs in memory
+        nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(filtered_dag.task_group)]
         _merge_node_dicts(merged_nodes, nodes)
 
+        session.expunge(serdag)  # to allow garbage collection
+
     return [GridNodeResponse(**n) for n in merged_nodes]
 
 

Reply via email to