This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch backport-pr-60937 in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0d0c6018b513450b7e2e6f84d3f34e5933c2782d Author: Rahul Vats <[email protected]> AuthorDate: Fri Jan 23 09:54:29 2026 +0530 Fix DAG processor OOM || Avoid loading all TaskInstances when checking DagVersion in write_dag (#60937) Fix DAG processor OOM || Avoid loading all TaskInstances when checking DagVersion in write_dag (#60937) (cherry picked from commit 235595b6c299ccd705ff965704bb20ece9c49fd1) --- airflow-core/src/airflow/models/serialized_dag.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index 7137658825c..9a425585cff 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -27,7 +27,7 @@ from typing import TYPE_CHECKING, Any, Literal import sqlalchemy_jsonfield import uuid6 -from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select, tuple_, update +from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, exists, select, tuple_, update from sqlalchemy.orm import backref, foreign, relationship from sqlalchemy.sql.expression import func, literal from sqlalchemy_utils import UUIDType @@ -43,6 +43,7 @@ from airflow.models.dag import DagModel from airflow.models.dag_version import DagVersion from airflow.models.dagcode import DagCode from airflow.models.dagrun import DagRun +from airflow.models.taskinstance import TaskInstance from airflow.sdk.definitions.asset import AssetUniqueKey from airflow.serialization.dag_dependency import DagDependency from airflow.serialization.serialized_objects import LazyDeserializedDAG, SerializedDAG @@ -423,7 +424,13 @@ class SerializedDagModel(Base): log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id) return False - if dag_version and not dag_version.task_instances: + has_task_instances: bool = False + if dag_version: + has_task_instances = bool( + session.scalar(select(exists().where(TaskInstance.dag_version_id == dag_version.id))) + ) + + if dag_version and not has_task_instances: # This is for dynamic DAGs that the hashes changes often. We should update # the serialized dag, the dag_version and the dag_code instead of a new version # if the dag_version is not associated with any task instances
