This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5398923f42a Fix atomicity issue in SerializedDagModel.write_dag
preventing orphaned DagVersions (#58259)
5398923f42a is described below
commit 5398923f42ac75b8575ef7e8652a6693d7f33531
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu Nov 13 20:29:15 2025 +0100
Fix atomicity issue in SerializedDagModel.write_dag preventing orphaned
DagVersions (#58259)
---
airflow-core/src/airflow/models/dag_version.py | 1 -
airflow-core/tests/unit/jobs/test_scheduler_job.py | 2 +-
.../tests/unit/models/test_serialized_dag.py | 54 ++++++++++++++++++++++
3 files changed, 55 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/models/dag_version.py
b/airflow-core/src/airflow/models/dag_version.py
index b6e2811a8a9..d8ea6fb9b6d 100644
--- a/airflow-core/src/airflow/models/dag_version.py
+++ b/airflow-core/src/airflow/models/dag_version.py
@@ -137,7 +137,6 @@ class DagVersion(Base):
)
log.debug("Writing DagVersion %s to the DB", dag_version)
session.add(dag_version)
- session.commit()
log.debug("DagVersion %s written to the DB", dag_version)
return dag_version
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 177b173af5f..99989ca3e99 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -3795,7 +3795,7 @@ class TestSchedulerJob:
SerializedDagModel.write_dag(
LazyDeserializedDAG.from_dag(dag), bundle_name="testing",
session=session
)
-
+ session.commit()
dag_version_2 = DagVersion.get_latest_version(dr.dag_id,
session=session)
assert dag_version_2 != dag_version_1
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py
b/airflow-core/tests/unit/models/test_serialized_dag.py
index 50b596c1ba0..80f3242cb3f 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -628,3 +628,57 @@ class TestSerializedDagModel:
updated_sdag = SDM.get("test_dynamic_success", session=session)
assert updated_sdag.dag_hash != initial_hash # Hash should change
assert len(updated_sdag.dag.task_dict) == 2 # Should have 2 tasks now
+
+ def test_write_dag_atomicity_on_dagcode_failure(self, dag_maker, session):
+ """
+ Test that SerializedDagModel.write_dag maintains atomicity.
+
+ If DagCode.write_code fails, the entire transaction should rollback,
+ including the DagVersion. This test verifies that DagVersion is not
+ committed separately, which would leave orphaned records.
+
+ This test would fail if DagVersion.write_dag() was used (which commits
+ immediately), because the DagVersion would be persisted even though
+ the rest of the transaction failed.
+ """
+ from airflow.models.dagcode import DagCode
+
+ with dag_maker("test_atomicity_dag"):
+ EmptyOperator(task_id="task1")
+
+ dag = dag_maker.dag
+ initial_version_count =
session.query(DagVersion).filter(DagVersion.dag_id == dag.dag_id).count()
+ assert initial_version_count == 1, "Should have one DagVersion after
initial write"
+ dag_maker.create_dagrun() # ensure the second dag version is created
+
+ EmptyOperator(task_id="task2", dag=dag)
+ modified_lazy_dag = LazyDeserializedDAG.from_dag(dag)
+
+ # Mock DagCode.write_code to raise an exception
+ with mock.patch.object(
+ DagCode, "write_code", side_effect=RuntimeError("Simulated
DagCode.write_code failure")
+ ):
+ with pytest.raises(RuntimeError, match="Simulated
DagCode.write_code failure"):
+ SDM.write_dag(
+ dag=modified_lazy_dag,
+ bundle_name="testing",
+ bundle_version=None,
+ session=session,
+ )
+ session.rollback()
+
+ # Verify that no new DagVersion was committed
+ # Use a fresh session to ensure we're reading from committed data
+ with create_session() as fresh_session:
+ final_version_count = (
+ fresh_session.query(DagVersion).filter(DagVersion.dag_id
== dag.dag_id).count()
+ )
+ assert final_version_count == initial_version_count, (
+ "DagVersion should not be committed when
DagCode.write_code fails"
+ )
+
+ sdag = SDM.get(dag.dag_id, session=fresh_session)
+ assert sdag is not None, "Original SerializedDagModel should
still exist"
+ assert len(sdag.dag.task_dict) == 1, (
+ "SerializedDagModel should not be updated when write fails"
+ )