This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1336cbc54cae73f79df54aaef24e17bc93c3be9f Author: Kaxil Naik <[email protected]> AuthorDate: Fri Oct 31 11:00:53 2025 +0000 Optimize dynamic DAG updates to avoid loading large serialized DAGs (#57592) When updating dynamic DAGs (those without task instances), Airflow was loading the entire `SerializedDagModel` object from the database, which could contain megabytes of JSON data, just to update a few fields which was completely unnecesary. This change replaces the object-loading approach with a direct SQL UPDATE statement, significantly improving performance for deployments with large or frequently-changing dynamic DAGs. The optimization uses SQLAlchemy's update() construct to modify only the necessary columns (_data, _data_compressed, dag_hash) without fetching the existing row, reducing both database load and network transfer. Additionally, removed an unnecessary session.merge() call on dag_version, as the object is already tracked by the session after being loaded. (cherry picked from commit 27c9b9406d61aa4fad107da155c95ac39568e583) --- airflow-core/src/airflow/models/serialized_dag.py | 27 ++++--- .../tests/unit/models/test_serialized_dag.py | 84 ++++++++++++++++++++++ 2 files changed, 102 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index f0e9bd9c6da..7137658825c 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -27,7 +27,7 @@ from typing import TYPE_CHECKING, Any, Literal import sqlalchemy_jsonfield import uuid6 -from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select, tuple_ +from sqlalchemy import Column, ForeignKey, LargeBinary, String, exc, select, tuple_, update from sqlalchemy.orm import backref, foreign, relationship from sqlalchemy.sql.expression import func, literal from sqlalchemy_utils import UUIDType @@ -427,14 +427,23 @@ class SerializedDagModel(Base): # This is for dynamic DAGs that the hashes changes often. We should update # the serialized dag, the dag_version and the dag_code instead of a new version # if the dag_version is not associated with any task instances - latest_ser_dag = cls.get(dag.dag_id, session=session) - if TYPE_CHECKING: - assert latest_ser_dag is not None - # Update the serialized DAG with the new_serialized_dag - latest_ser_dag._data = new_serialized_dag._data - latest_ser_dag._data_compressed = new_serialized_dag._data_compressed - latest_ser_dag.dag_hash = new_serialized_dag.dag_hash - session.merge(latest_ser_dag) + + # Use direct UPDATE to avoid loading the full serialized DAG + result = session.execute( + update(cls) + .where(cls.dag_version_id == dag_version.id) + .values( + { + cls._data: new_serialized_dag._data, + cls._data_compressed: new_serialized_dag._data_compressed, + cls.dag_hash: new_serialized_dag.dag_hash, + } + ) + ) + + if result.rowcount == 0: + # No rows updated - serialized DAG doesn't exist + return False # The dag_version and dag_code may not have changed, still we should # do the below actions: # Update the latest dag version diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index ba207cc39ec..ca1c20b5d8d 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -544,3 +544,87 @@ class TestSerializedDagModel: # Verify that the original data still has fileloc (method shouldn't modify original) assert "fileloc" in test_data["dag"] assert test_data["dag"]["fileloc"] == "/different/path/to/dag.py" + + def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session): + """ + Test that dynamic DAG update gracefully handles case where SerializedDagModel doesn't exist. + This preserves the null-check fix from PR #56422 and tests the direct UPDATE path. + """ + with dag_maker(dag_id="test_missing_serdag", serialized=True, session=session) as dag: + EmptyOperator(task_id="task1") + + # Write the DAG first + lazy_dag = LazyDeserializedDAG.from_dag(dag) + SDM.write_dag( + dag=lazy_dag, + bundle_name="test_bundle", + bundle_version=None, + session=session, + ) + session.commit() + + # Get the dag_version + dag_version = session.scalar( + select(DagVersion).where(DagVersion.dag_id == "test_missing_serdag").limit(1) + ) + assert dag_version is not None + + # Manually delete SerializedDagModel (simulates edge case) + session.query(SDM).filter(SDM.dag_id == "test_missing_serdag").delete() + session.commit() + + # Verify no SerializedDagModel exists + assert SDM.get("test_missing_serdag", session=session) is None + + # Try to update - should return False gracefully (not crash) + result = SDM.write_dag( + dag=lazy_dag, + bundle_name="test_bundle", + bundle_version=None, + min_update_interval=None, + session=session, + ) + + assert result is False # Should return False when SerializedDagModel is missing + + def test_dynamic_dag_update_success(self, dag_maker, session): + """ + Test that dynamic DAG update successfully updates the serialized DAG hash + when no task instances exist. + """ + with dag_maker(dag_id="test_dynamic_success", session=session) as dag: + EmptyOperator(task_id="task1") + + # Write the DAG first + lazy_dag = LazyDeserializedDAG.from_dag(dag) + result1 = SDM.write_dag( + dag=lazy_dag, + bundle_name="test_bundle", + bundle_version=None, + session=session, + ) + session.commit() + + assert result1 is True + initial_sdag = SDM.get("test_dynamic_success", session=session) + assert initial_sdag is not None + initial_hash = initial_sdag.dag_hash + + # Modify the DAG (add a task) + EmptyOperator(task_id="task2", dag=dag) + lazy_dag_updated = LazyDeserializedDAG.from_dag(dag) + + # Write again - should use UPDATE path (no task instances yet) + result2 = SDM.write_dag( + dag=lazy_dag_updated, + bundle_name="test_bundle", + bundle_version=None, + session=session, + ) + session.commit() + + # Verify update succeeded + assert result2 is True + updated_sdag = SDM.get("test_dynamic_success", session=session) + assert updated_sdag.dag_hash != initial_hash # Hash should change + assert len(updated_sdag.dag.task_dict) == 2 # Should have 2 tasks now
