This is an automated email from the ASF dual-hosted git repository.
turaga pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 95877c9c2e6 Fix DAG processor OOM || Avoid loading all TaskInstances
when checking DagVersion in write_dag (#60937) (#60962)
95877c9c2e6 is described below
commit 95877c9c2e60d23877920b02ed14f4f8505d680b
Author: Rahul Vats <[email protected]>
AuthorDate: Fri Jan 23 10:23:59 2026 +0530
Fix DAG processor OOM || Avoid loading all TaskInstances when checking
DagVersion in write_dag (#60937) (#60962)
---
airflow-core/src/airflow/models/serialized_dag.py | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/models/serialized_dag.py
b/airflow-core/src/airflow/models/serialized_dag.py
index 7137658825c..9a425585cff 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -27,7 +27,7 @@ from typing import TYPE_CHECKING, Any, Literal
import sqlalchemy_jsonfield
import uuid6
-from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select,
tuple_, update
+from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, exists,
select, tuple_, update
from sqlalchemy.orm import backref, foreign, relationship
from sqlalchemy.sql.expression import func, literal
from sqlalchemy_utils import UUIDType
@@ -43,6 +43,7 @@ from airflow.models.dag import DagModel
from airflow.models.dag_version import DagVersion
from airflow.models.dagcode import DagCode
from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
from airflow.sdk.definitions.asset import AssetUniqueKey
from airflow.serialization.dag_dependency import DagDependency
from airflow.serialization.serialized_objects import LazyDeserializedDAG,
SerializedDAG
@@ -423,7 +424,13 @@ class SerializedDagModel(Base):
log.debug("Serialized DAG (%s) is unchanged. Skipping writing to
DB", dag.dag_id)
return False
- if dag_version and not dag_version.task_instances:
+ has_task_instances: bool = False
+ if dag_version:
+ has_task_instances = bool(
+
session.scalar(select(exists().where(TaskInstance.dag_version_id ==
dag_version.id)))
+ )
+
+ if dag_version and not has_task_instances:
# This is for dynamic DAGs that the hashes changes often. We
should update
# the serialized dag, the dag_version and the dag_code instead of
a new version
# if the dag_version is not associated with any task instances