This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch backport-pr-60937
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0d0c6018b513450b7e2e6f84d3f34e5933c2782d
Author: Rahul Vats <[email protected]>
AuthorDate: Fri Jan 23 09:54:29 2026 +0530

    Fix DAG processor OOM || Avoid loading all TaskInstances when checking 
DagVersion in write_dag (#60937)
    
    Fix DAG processor OOM || Avoid loading all TaskInstances when checking 
DagVersion in write_dag (#60937)
    
    (cherry picked from commit 235595b6c299ccd705ff965704bb20ece9c49fd1)
---
 airflow-core/src/airflow/models/serialized_dag.py | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/models/serialized_dag.py 
b/airflow-core/src/airflow/models/serialized_dag.py
index 7137658825c..9a425585cff 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -27,7 +27,7 @@ from typing import TYPE_CHECKING, Any, Literal
 
 import sqlalchemy_jsonfield
 import uuid6
-from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select, 
tuple_, update
+from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, exists, 
select, tuple_, update
 from sqlalchemy.orm import backref, foreign, relationship
 from sqlalchemy.sql.expression import func, literal
 from sqlalchemy_utils import UUIDType
@@ -43,6 +43,7 @@ from airflow.models.dag import DagModel
 from airflow.models.dag_version import DagVersion
 from airflow.models.dagcode import DagCode
 from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
 from airflow.sdk.definitions.asset import AssetUniqueKey
 from airflow.serialization.dag_dependency import DagDependency
 from airflow.serialization.serialized_objects import LazyDeserializedDAG, 
SerializedDAG
@@ -423,7 +424,13 @@ class SerializedDagModel(Base):
             log.debug("Serialized DAG (%s) is unchanged. Skipping writing to 
DB", dag.dag_id)
             return False
 
-        if dag_version and not dag_version.task_instances:
+        has_task_instances: bool = False
+        if dag_version:
+            has_task_instances = bool(
+                
session.scalar(select(exists().where(TaskInstance.dag_version_id == 
dag_version.id)))
+            )
+
+        if dag_version and not has_task_instances:
             # This is for dynamic DAGs that the hashes changes often. We 
should update
             # the serialized dag, the dag_version and the dag_code instead of 
a new version
             # if the dag_version is not associated with any task instances

Reply via email to