This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch backport-61273 in repository https://gitbox.apache.org/repos/asf/airflow.git
commit e6661ff680a24d1fa11519be264a95edf3832755 Author: Jed Cunningham <[email protected]> AuthorDate: Sun Feb 1 07:18:46 2026 -0700 Flatten grid structure endpoint memory consumption (#61273) The grid structure endpoint was loading all serdags for the shown dagruns into memory at once, before merging them together. Now, we load 5 at a time and also expunge so they can be gc'd more quickly. --- .../airflow/api_fastapi/core_api/routes/ui/grid.py | 60 ++++++++-------------- 1 file changed, 21 insertions(+), 39 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py index 0379bd957ca..f0d077b277e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py @@ -135,6 +135,8 @@ def get_dag_structure( """Return dag structure for grid view.""" latest_serdag = _get_latest_serdag(dag_id, session) latest_dag = latest_serdag.dag + latest_serdag_id = latest_serdag.id + session.expunge(latest_serdag) # allow GC of serdag; only latest_dag is needed from here # Retrieve, sort the previous DAG Runs base_query = select(DagRun.id).where(DagRun.dag_id == dag_id) @@ -159,12 +161,22 @@ def get_dag_structure( nodes = [task_group_to_dict_grid(x) for x in task_group_sort(latest_dag.task_group)] return [GridNodeResponse(**n) for n in nodes] - serdags = session.scalars( - select(SerializedDagModel).where( + # Process and merge the latest serdag first + merged_nodes: list[dict[str, Any]] = [] + nodes = [task_group_to_dict_grid(x) for x in task_group_sort(latest_dag.task_group)] + _merge_node_dicts(merged_nodes, nodes) + del latest_dag + + # Process serdags one by one and merge immediately to reduce memory usage. + # Use yield_per() for streaming results and expunge each serdag after processing + # to allow garbage collection and prevent memory buildup in the session identity map. + serdags_query = ( + select(SerializedDagModel) + .where( # Even though dag_id is filtered in base_query, # adding this line here can improve the performance of this endpoint SerializedDagModel.dag_id == dag_id, - SerializedDagModel.id != latest_serdag.id, + SerializedDagModel.id != latest_serdag_id, SerializedDagModel.dag_version_id.in_( select(TaskInstance.dag_version_id) .join(TaskInstance.dag_run) @@ -174,45 +186,15 @@ def get_dag_structure( .distinct() ), ) + .execution_options(yield_per=5) # balance between peak memory usage and round trips ) - merged_nodes: list[dict[str, Any]] = [] - dags = [latest_dag] - for serdag in serdags: - if serdag: - dags.append(serdag.dag) - for dag in dags: - nodes = [task_group_to_dict_grid(x) for x in task_group_sort(dag.task_group)] + + for serdag in session.scalars(serdags_query): + # Merge immediately instead of collecting all DAGs in memory + nodes = [task_group_to_dict_grid(x) for x in task_group_sort(serdag.dag.task_group)] _merge_node_dicts(merged_nodes, nodes) - # Ensure historical tasks (e.g. removed) that exist in TIs for the selected runs are represented - def _collect_ids(nodes: list[dict[str, Any]]) -> set[str]: - ids: set[str] = set() - for n in nodes: - nid = n.get("id") - if nid: - ids.add(nid) - children = n.get("children") - if children: - ids |= _collect_ids(children) # recurse - return ids - - existing_ids = _collect_ids(merged_nodes) - historical_task_ids = session.scalars( - select(TaskInstance.task_id) - .join(TaskInstance.dag_run) - .where(TaskInstance.dag_id == dag_id, DagRun.id.in_(run_ids)) - .distinct() - ) - for task_id in historical_task_ids: - if task_id not in existing_ids: - merged_nodes.append( - { - "id": task_id, - "label": task_id, - "is_mapped": None, - "children": None, - } - ) + session.expunge(serdag) # to allow garbage collection return [GridNodeResponse(**n) for n in merged_nodes]
