This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0a119e69c839469a73f08a7438433560be36ba81
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Nov 14 10:45:45 2025 +0530
[v3-1-test] Fix atomicity issue in SerializedDagModel.write_dag preventing
orphaned DagVersions (#58259) (#58281)
(cherry picked from commit 5398923f42ac75b8575ef7e8652a6693d7f33531)
Co-authored-by: Ephraim Anierobi <[email protected]>
---
airflow-core/src/airflow/models/dag_version.py | 1 -
airflow-core/tests/unit/jobs/test_scheduler_job.py | 2 +-
.../tests/unit/models/test_serialized_dag.py | 54 ++++++++++++++++++++++
3 files changed, 55 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/models/dag_version.py
b/airflow-core/src/airflow/models/dag_version.py
index 3f57333333b..1245f3c9b99 100644
--- a/airflow-core/src/airflow/models/dag_version.py
+++ b/airflow-core/src/airflow/models/dag_version.py
@@ -129,7 +129,6 @@ class DagVersion(Base):
)
log.debug("Writing DagVersion %s to the DB", dag_version)
session.add(dag_version)
- session.commit()
log.debug("DagVersion %s written to the DB", dag_version)
return dag_version
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 5f6e5fdfc61..4234fb27641 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -3793,7 +3793,7 @@ class TestSchedulerJob:
SerializedDagModel.write_dag(
LazyDeserializedDAG.from_dag(dag), bundle_name="testing",
session=session
)
-
+ session.commit()
dag_version_2 = DagVersion.get_latest_version(dr.dag_id,
session=session)
assert dag_version_2 != dag_version_1
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py
b/airflow-core/tests/unit/models/test_serialized_dag.py
index ca1c20b5d8d..92446cee218 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -628,3 +628,57 @@ class TestSerializedDagModel:
updated_sdag = SDM.get("test_dynamic_success", session=session)
assert updated_sdag.dag_hash != initial_hash # Hash should change
assert len(updated_sdag.dag.task_dict) == 2 # Should have 2 tasks now
+
+ def test_write_dag_atomicity_on_dagcode_failure(self, dag_maker, session):
+ """
+ Test that SerializedDagModel.write_dag maintains atomicity.
+
+ If DagCode.write_code fails, the entire transaction should rollback,
+ including the DagVersion. This test verifies that DagVersion is not
+ committed separately, which would leave orphaned records.
+
+ This test would fail if DagVersion.write_dag() was used (which commits
+ immediately), because the DagVersion would be persisted even though
+ the rest of the transaction failed.
+ """
+ from airflow.models.dagcode import DagCode
+
+ with dag_maker("test_atomicity_dag"):
+ EmptyOperator(task_id="task1")
+
+ dag = dag_maker.dag
+ initial_version_count =
session.query(DagVersion).filter(DagVersion.dag_id == dag.dag_id).count()
+ assert initial_version_count == 1, "Should have one DagVersion after
initial write"
+ dag_maker.create_dagrun() # ensure the second dag version is created
+
+ EmptyOperator(task_id="task2", dag=dag)
+ modified_lazy_dag = LazyDeserializedDAG.from_dag(dag)
+
+ # Mock DagCode.write_code to raise an exception
+ with mock.patch.object(
+ DagCode, "write_code", side_effect=RuntimeError("Simulated
DagCode.write_code failure")
+ ):
+ with pytest.raises(RuntimeError, match="Simulated
DagCode.write_code failure"):
+ SDM.write_dag(
+ dag=modified_lazy_dag,
+ bundle_name="testing",
+ bundle_version=None,
+ session=session,
+ )
+ session.rollback()
+
+ # Verify that no new DagVersion was committed
+ # Use a fresh session to ensure we're reading from committed data
+ with create_session() as fresh_session:
+ final_version_count = (
+ fresh_session.query(DagVersion).filter(DagVersion.dag_id
== dag.dag_id).count()
+ )
+ assert final_version_count == initial_version_count, (
+ "DagVersion should not be committed when
DagCode.write_code fails"
+ )
+
+ sdag = SDM.get(dag.dag_id, session=fresh_session)
+ assert sdag is not None, "Original SerializedDagModel should
still exist"
+ assert len(sdag.dag.task_dict) == 1, (
+ "SerializedDagModel should not be updated when write fails"
+ )