This is an automated email from the ASF dual-hosted git repository.

turaga pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 95877c9c2e6 Fix DAG processor OOM || Avoid loading all TaskInstances 
when checking DagVersion in write_dag (#60937) (#60962)
95877c9c2e6 is described below

commit 95877c9c2e60d23877920b02ed14f4f8505d680b
Author: Rahul Vats <[email protected]>
AuthorDate: Fri Jan 23 10:23:59 2026 +0530

    Fix DAG processor OOM || Avoid loading all TaskInstances when checking 
DagVersion in write_dag (#60937) (#60962)
---
 airflow-core/src/airflow/models/serialized_dag.py | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/models/serialized_dag.py 
b/airflow-core/src/airflow/models/serialized_dag.py
index 7137658825c..9a425585cff 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -27,7 +27,7 @@ from typing import TYPE_CHECKING, Any, Literal
 
 import sqlalchemy_jsonfield
 import uuid6
-from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select, 
tuple_, update
+from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, exists, 
select, tuple_, update
 from sqlalchemy.orm import backref, foreign, relationship
 from sqlalchemy.sql.expression import func, literal
 from sqlalchemy_utils import UUIDType
@@ -43,6 +43,7 @@ from airflow.models.dag import DagModel
 from airflow.models.dag_version import DagVersion
 from airflow.models.dagcode import DagCode
 from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
 from airflow.sdk.definitions.asset import AssetUniqueKey
 from airflow.serialization.dag_dependency import DagDependency
 from airflow.serialization.serialized_objects import LazyDeserializedDAG, 
SerializedDAG
@@ -423,7 +424,13 @@ class SerializedDagModel(Base):
             log.debug("Serialized DAG (%s) is unchanged. Skipping writing to 
DB", dag.dag_id)
             return False
 
-        if dag_version and not dag_version.task_instances:
+        has_task_instances: bool = False
+        if dag_version:
+            has_task_instances = bool(
+                
session.scalar(select(exists().where(TaskInstance.dag_version_id == 
dag_version.id)))
+            )
+
+        if dag_version and not has_task_instances:
             # This is for dynamic DAGs that the hashes changes often. We 
should update
             # the serialized dag, the dag_version and the dag_code instead of 
a new version
             # if the dag_version is not associated with any task instances

Reply via email to