This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1336cbc54cae73f79df54aaef24e17bc93c3be9f
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri Oct 31 11:00:53 2025 +0000

    Optimize dynamic DAG updates to avoid loading large serialized DAGs (#57592)
    
    When updating dynamic DAGs (those without task instances), Airflow was
    loading the entire `SerializedDagModel` object from the database, which
    could contain megabytes of JSON data, just to update a few fields which
    was completely unnecesary.
    
    This change replaces the object-loading approach with a direct SQL UPDATE
    statement, significantly improving performance for deployments with large
    or frequently-changing dynamic DAGs.
    
    The optimization uses SQLAlchemy's update() construct to modify only the
    necessary columns (_data, _data_compressed, dag_hash) without fetching
    the existing row, reducing both database load and network transfer.
    
    Additionally, removed an unnecessary session.merge() call on dag_version,
    as the object is already tracked by the session after being loaded.
    
    (cherry picked from commit 27c9b9406d61aa4fad107da155c95ac39568e583)
---
 airflow-core/src/airflow/models/serialized_dag.py  | 27 ++++---
 .../tests/unit/models/test_serialized_dag.py       | 84 ++++++++++++++++++++++
 2 files changed, 102 insertions(+), 9 deletions(-)

diff --git a/airflow-core/src/airflow/models/serialized_dag.py 
b/airflow-core/src/airflow/models/serialized_dag.py
index f0e9bd9c6da..7137658825c 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -27,7 +27,7 @@ from typing import TYPE_CHECKING, Any, Literal
 
 import sqlalchemy_jsonfield
 import uuid6
-from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select, 
tuple_
+from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select, 
tuple_, update
 from sqlalchemy.orm import backref, foreign, relationship
 from sqlalchemy.sql.expression import func, literal
 from sqlalchemy_utils import UUIDType
@@ -427,14 +427,23 @@ class SerializedDagModel(Base):
             # This is for dynamic DAGs that the hashes changes often. We 
should update
             # the serialized dag, the dag_version and the dag_code instead of 
a new version
             # if the dag_version is not associated with any task instances
-            latest_ser_dag = cls.get(dag.dag_id, session=session)
-            if TYPE_CHECKING:
-                assert latest_ser_dag is not None
-            # Update the serialized DAG with the new_serialized_dag
-            latest_ser_dag._data = new_serialized_dag._data
-            latest_ser_dag._data_compressed = 
new_serialized_dag._data_compressed
-            latest_ser_dag.dag_hash = new_serialized_dag.dag_hash
-            session.merge(latest_ser_dag)
+
+            # Use direct UPDATE to avoid loading the full serialized DAG
+            result = session.execute(
+                update(cls)
+                .where(cls.dag_version_id == dag_version.id)
+                .values(
+                    {
+                        cls._data: new_serialized_dag._data,
+                        cls._data_compressed: 
new_serialized_dag._data_compressed,
+                        cls.dag_hash: new_serialized_dag.dag_hash,
+                    }
+                )
+            )
+
+            if result.rowcount == 0:
+                # No rows updated - serialized DAG doesn't exist
+                return False
             # The dag_version and dag_code may not have changed, still we 
should
             # do the below actions:
             # Update the latest dag version
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py 
b/airflow-core/tests/unit/models/test_serialized_dag.py
index ba207cc39ec..ca1c20b5d8d 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -544,3 +544,87 @@ class TestSerializedDagModel:
         # Verify that the original data still has fileloc (method shouldn't 
modify original)
         assert "fileloc" in test_data["dag"]
         assert test_data["dag"]["fileloc"] == "/different/path/to/dag.py"
+
+    def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session):
+        """
+        Test that dynamic DAG update gracefully handles case where 
SerializedDagModel doesn't exist.
+        This preserves the null-check fix from PR #56422 and tests the direct 
UPDATE path.
+        """
+        with dag_maker(dag_id="test_missing_serdag", serialized=True, 
session=session) as dag:
+            EmptyOperator(task_id="task1")
+
+        # Write the DAG first
+        lazy_dag = LazyDeserializedDAG.from_dag(dag)
+        SDM.write_dag(
+            dag=lazy_dag,
+            bundle_name="test_bundle",
+            bundle_version=None,
+            session=session,
+        )
+        session.commit()
+
+        # Get the dag_version
+        dag_version = session.scalar(
+            select(DagVersion).where(DagVersion.dag_id == 
"test_missing_serdag").limit(1)
+        )
+        assert dag_version is not None
+
+        # Manually delete SerializedDagModel (simulates edge case)
+        session.query(SDM).filter(SDM.dag_id == "test_missing_serdag").delete()
+        session.commit()
+
+        # Verify no SerializedDagModel exists
+        assert SDM.get("test_missing_serdag", session=session) is None
+
+        # Try to update - should return False gracefully (not crash)
+        result = SDM.write_dag(
+            dag=lazy_dag,
+            bundle_name="test_bundle",
+            bundle_version=None,
+            min_update_interval=None,
+            session=session,
+        )
+
+        assert result is False  # Should return False when SerializedDagModel 
is missing
+
+    def test_dynamic_dag_update_success(self, dag_maker, session):
+        """
+        Test that dynamic DAG update successfully updates the serialized DAG 
hash
+        when no task instances exist.
+        """
+        with dag_maker(dag_id="test_dynamic_success", session=session) as dag:
+            EmptyOperator(task_id="task1")
+
+        # Write the DAG first
+        lazy_dag = LazyDeserializedDAG.from_dag(dag)
+        result1 = SDM.write_dag(
+            dag=lazy_dag,
+            bundle_name="test_bundle",
+            bundle_version=None,
+            session=session,
+        )
+        session.commit()
+
+        assert result1 is True
+        initial_sdag = SDM.get("test_dynamic_success", session=session)
+        assert initial_sdag is not None
+        initial_hash = initial_sdag.dag_hash
+
+        # Modify the DAG (add a task)
+        EmptyOperator(task_id="task2", dag=dag)
+        lazy_dag_updated = LazyDeserializedDAG.from_dag(dag)
+
+        # Write again - should use UPDATE path (no task instances yet)
+        result2 = SDM.write_dag(
+            dag=lazy_dag_updated,
+            bundle_name="test_bundle",
+            bundle_version=None,
+            session=session,
+        )
+        session.commit()
+
+        # Verify update succeeded
+        assert result2 is True
+        updated_sdag = SDM.get("test_dynamic_success", session=session)
+        assert updated_sdag.dag_hash != initial_hash  # Hash should change
+        assert len(updated_sdag.dag.task_dict) == 2  # Should have 2 tasks now

Reply via email to