This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 235595b6c29 Fix DAG processor OOM || Avoid loading all TaskInstances
when checking DagVersion in write_dag (#60937)
235595b6c29 is described below
commit 235595b6c299ccd705ff965704bb20ece9c49fd1
Author: Rahul Vats <[email protected]>
AuthorDate: Fri Jan 23 09:54:29 2026 +0530
Fix DAG processor OOM || Avoid loading all TaskInstances when checking
DagVersion in write_dag (#60937)
Fix DAG processor OOM || Avoid loading all TaskInstances when checking
DagVersion in write_dag (#60937)
---
airflow-core/src/airflow/models/serialized_dag.py | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
diff --git a/airflow-core/src/airflow/models/serialized_dag.py
b/airflow-core/src/airflow/models/serialized_dag.py
index a019ed6377f..bd499a1a179 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -27,9 +27,9 @@ from typing import TYPE_CHECKING, Any, Literal
import sqlalchemy_jsonfield
import uuid6
-from sqlalchemy import ForeignKey, LargeBinary, String, select, tuple_, update
+from sqlalchemy import ForeignKey, LargeBinary, String, exists, select,
tuple_, update
from sqlalchemy.dialects.postgresql import JSONB
-from sqlalchemy.orm import Mapped, backref, foreign, joinedload, relationship
+from sqlalchemy.orm import Mapped, backref, foreign, relationship
from sqlalchemy.sql.expression import func, literal
from sqlalchemy_utils import UUIDType
@@ -45,6 +45,7 @@ from airflow.models.dag_version import DagVersion
from airflow.models.dagcode import DagCode
from airflow.models.dagrun import DagRun
from airflow.models.deadline_alert import DeadlineAlert as DeadlineAlertModel
+from airflow.models.taskinstance import TaskInstance
from airflow.serialization.dag_dependency import DagDependency
from airflow.serialization.definitions.assets import SerializedAssetUniqueKey
as UKey
from airflow.serialization.definitions.deadline import DeadlineAlertFields
@@ -486,7 +487,6 @@ class SerializedDagModel(Base):
dag_version = session.scalar(
select(DagVersion)
.where(DagVersion.dag_id == dag.dag_id)
- .options(joinedload(DagVersion.task_instances))
.order_by(DagVersion.created_at.desc())
.limit(1)
)
@@ -520,7 +520,13 @@ class SerializedDagModel(Base):
log.debug("Serialized DAG (%s) is unchanged. Skipping writing to
DB", dag.dag_id)
return False
- if dag_version and not dag_version.task_instances:
+ has_task_instances: bool = False
+ if dag_version:
+ has_task_instances = bool(
+
session.scalar(select(exists().where(TaskInstance.dag_version_id ==
dag_version.id)))
+ )
+
+ if dag_version and not has_task_instances:
# This is for dynamic DAGs that the hashes changes often. We
should update
# the serialized dag, the dag_version and the dag_code instead of
a new version
# if the dag_version is not associated with any task instances