This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 27c9b9406d6 Optimize dynamic DAG updates to avoid loading large
serialized DAGs (#57592)
27c9b9406d6 is described below
commit 27c9b9406d61aa4fad107da155c95ac39568e583
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri Oct 31 11:00:53 2025 +0000
Optimize dynamic DAG updates to avoid loading large serialized DAGs (#57592)
When updating dynamic DAGs (those without task instances), Airflow was
loading the entire `SerializedDagModel` object from the database, which
could contain megabytes of JSON data, just to update a few fields which
was completely unnecesary.
This change replaces the object-loading approach with a direct SQL UPDATE
statement, significantly improving performance for deployments with large
or frequently-changing dynamic DAGs.
The optimization uses SQLAlchemy's update() construct to modify only the
necessary columns (_data, _data_compressed, dag_hash) without fetching
the existing row, reducing both database load and network transfer.
Additionally, removed an unnecessary session.merge() call on dag_version,
as the object is already tracked by the session after being loaded.
---
airflow-core/src/airflow/models/serialized_dag.py | 25 ++++---
.../tests/unit/models/test_serialized_dag.py | 84 ++++++++++++++++++++++
2 files changed, 101 insertions(+), 8 deletions(-)
diff --git a/airflow-core/src/airflow/models/serialized_dag.py
b/airflow-core/src/airflow/models/serialized_dag.py
index 1e75e86aad8..e4b458084a3 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -27,7 +27,7 @@ from typing import TYPE_CHECKING, Any, Literal
import sqlalchemy_jsonfield
import uuid6
-from sqlalchemy import ForeignKey, LargeBinary, String, select, tuple_
+from sqlalchemy import ForeignKey, LargeBinary, String, select, tuple_, update
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, backref, foreign, joinedload, relationship
from sqlalchemy.sql.expression import func, literal
@@ -434,14 +434,23 @@ class SerializedDagModel(Base):
# This is for dynamic DAGs that the hashes changes often. We
should update
# the serialized dag, the dag_version and the dag_code instead of
a new version
# if the dag_version is not associated with any task instances
- latest_ser_dag = cls.get(dag.dag_id, session=session)
- if not latest_ser_dag:
+
+ # Use direct UPDATE to avoid loading the full serialized DAG
+ result = session.execute(
+ update(cls)
+ .where(cls.dag_version_id == dag_version.id)
+ .values(
+ {
+ cls._data: new_serialized_dag._data,
+ cls._data_compressed:
new_serialized_dag._data_compressed,
+ cls.dag_hash: new_serialized_dag.dag_hash,
+ }
+ )
+ )
+
+ if result.rowcount == 0:
+ # No rows updated - serialized DAG doesn't exist
return False
- # Update the serialized DAG with the new_serialized_dag
- latest_ser_dag._data = new_serialized_dag._data
- latest_ser_dag._data_compressed =
new_serialized_dag._data_compressed
- latest_ser_dag.dag_hash = new_serialized_dag.dag_hash
- session.merge(latest_ser_dag)
# The dag_version and dag_code may not have changed, still we
should
# do the below actions:
# Update the latest dag version
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py
b/airflow-core/tests/unit/models/test_serialized_dag.py
index e310836a391..ea9ff4cbfdd 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -544,3 +544,87 @@ class TestSerializedDagModel:
# Verify that the original data still has fileloc (method shouldn't
modify original)
assert "fileloc" in test_data["dag"]
assert test_data["dag"]["fileloc"] == "/different/path/to/dag.py"
+
+ def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session):
+ """
+ Test that dynamic DAG update gracefully handles case where
SerializedDagModel doesn't exist.
+ This preserves the null-check fix from PR #56422 and tests the direct
UPDATE path.
+ """
+ with dag_maker(dag_id="test_missing_serdag", serialized=True,
session=session) as dag:
+ EmptyOperator(task_id="task1")
+
+ # Write the DAG first
+ lazy_dag = LazyDeserializedDAG.from_dag(dag)
+ SDM.write_dag(
+ dag=lazy_dag,
+ bundle_name="test_bundle",
+ bundle_version=None,
+ session=session,
+ )
+ session.commit()
+
+ # Get the dag_version
+ dag_version = session.scalar(
+ select(DagVersion).where(DagVersion.dag_id ==
"test_missing_serdag").limit(1)
+ )
+ assert dag_version is not None
+
+ # Manually delete SerializedDagModel (simulates edge case)
+ session.query(SDM).filter(SDM.dag_id == "test_missing_serdag").delete()
+ session.commit()
+
+ # Verify no SerializedDagModel exists
+ assert SDM.get("test_missing_serdag", session=session) is None
+
+ # Try to update - should return False gracefully (not crash)
+ result = SDM.write_dag(
+ dag=lazy_dag,
+ bundle_name="test_bundle",
+ bundle_version=None,
+ min_update_interval=None,
+ session=session,
+ )
+
+ assert result is False # Should return False when SerializedDagModel
is missing
+
+ def test_dynamic_dag_update_success(self, dag_maker, session):
+ """
+ Test that dynamic DAG update successfully updates the serialized DAG
hash
+ when no task instances exist.
+ """
+ with dag_maker(dag_id="test_dynamic_success", session=session) as dag:
+ EmptyOperator(task_id="task1")
+
+ # Write the DAG first
+ lazy_dag = LazyDeserializedDAG.from_dag(dag)
+ result1 = SDM.write_dag(
+ dag=lazy_dag,
+ bundle_name="test_bundle",
+ bundle_version=None,
+ session=session,
+ )
+ session.commit()
+
+ assert result1 is True
+ initial_sdag = SDM.get("test_dynamic_success", session=session)
+ assert initial_sdag is not None
+ initial_hash = initial_sdag.dag_hash
+
+ # Modify the DAG (add a task)
+ EmptyOperator(task_id="task2", dag=dag)
+ lazy_dag_updated = LazyDeserializedDAG.from_dag(dag)
+
+ # Write again - should use UPDATE path (no task instances yet)
+ result2 = SDM.write_dag(
+ dag=lazy_dag_updated,
+ bundle_name="test_bundle",
+ bundle_version=None,
+ session=session,
+ )
+ session.commit()
+
+ # Verify update succeeded
+ assert result2 is True
+ updated_sdag = SDM.get("test_dynamic_success", session=session)
+ assert updated_sdag.dag_hash != initial_hash # Hash should change
+ assert len(updated_sdag.dag.task_dict) == 2 # Should have 2 tasks now