This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 235595b6c29 Fix DAG processor OOM || Avoid loading all TaskInstances 
when checking DagVersion in write_dag (#60937)
235595b6c29 is described below

commit 235595b6c299ccd705ff965704bb20ece9c49fd1
Author: Rahul Vats <[email protected]>
AuthorDate: Fri Jan 23 09:54:29 2026 +0530

    Fix DAG processor OOM || Avoid loading all TaskInstances when checking 
DagVersion in write_dag (#60937)
    
    Fix DAG processor OOM || Avoid loading all TaskInstances when checking 
DagVersion in write_dag (#60937)
---
 airflow-core/src/airflow/models/serialized_dag.py | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git a/airflow-core/src/airflow/models/serialized_dag.py 
b/airflow-core/src/airflow/models/serialized_dag.py
index a019ed6377f..bd499a1a179 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -27,9 +27,9 @@ from typing import TYPE_CHECKING, Any, Literal
 
 import sqlalchemy_jsonfield
 import uuid6
-from sqlalchemy import ForeignKey, LargeBinary, String, select, tuple_, update
+from sqlalchemy import ForeignKey, LargeBinary, String, exists, select, 
tuple_, update
 from sqlalchemy.dialects.postgresql import JSONB
-from sqlalchemy.orm import Mapped, backref, foreign, joinedload, relationship
+from sqlalchemy.orm import Mapped, backref, foreign, relationship
 from sqlalchemy.sql.expression import func, literal
 from sqlalchemy_utils import UUIDType
 
@@ -45,6 +45,7 @@ from airflow.models.dag_version import DagVersion
 from airflow.models.dagcode import DagCode
 from airflow.models.dagrun import DagRun
 from airflow.models.deadline_alert import DeadlineAlert as DeadlineAlertModel
+from airflow.models.taskinstance import TaskInstance
 from airflow.serialization.dag_dependency import DagDependency
 from airflow.serialization.definitions.assets import SerializedAssetUniqueKey 
as UKey
 from airflow.serialization.definitions.deadline import DeadlineAlertFields
@@ -486,7 +487,6 @@ class SerializedDagModel(Base):
         dag_version = session.scalar(
             select(DagVersion)
             .where(DagVersion.dag_id == dag.dag_id)
-            .options(joinedload(DagVersion.task_instances))
             .order_by(DagVersion.created_at.desc())
             .limit(1)
         )
@@ -520,7 +520,13 @@ class SerializedDagModel(Base):
             log.debug("Serialized DAG (%s) is unchanged. Skipping writing to 
DB", dag.dag_id)
             return False
 
-        if dag_version and not dag_version.task_instances:
+        has_task_instances: bool = False
+        if dag_version:
+            has_task_instances = bool(
+                
session.scalar(select(exists().where(TaskInstance.dag_version_id == 
dag_version.id)))
+            )
+
+        if dag_version and not has_task_instances:
             # This is for dynamic DAGs that the hashes changes often. We 
should update
             # the serialized dag, the dag_version and the dag_code instead of 
a new version
             # if the dag_version is not associated with any task instances

Reply via email to