This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0a119e69c839469a73f08a7438433560be36ba81
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Nov 14 10:45:45 2025 +0530

    [v3-1-test] Fix atomicity issue in SerializedDagModel.write_dag preventing 
orphaned DagVersions (#58259) (#58281)
    
    (cherry picked from commit 5398923f42ac75b8575ef7e8652a6693d7f33531)
    
    Co-authored-by: Ephraim Anierobi <[email protected]>
---
 airflow-core/src/airflow/models/dag_version.py     |  1 -
 airflow-core/tests/unit/jobs/test_scheduler_job.py |  2 +-
 .../tests/unit/models/test_serialized_dag.py       | 54 ++++++++++++++++++++++
 3 files changed, 55 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/models/dag_version.py 
b/airflow-core/src/airflow/models/dag_version.py
index 3f57333333b..1245f3c9b99 100644
--- a/airflow-core/src/airflow/models/dag_version.py
+++ b/airflow-core/src/airflow/models/dag_version.py
@@ -129,7 +129,6 @@ class DagVersion(Base):
         )
         log.debug("Writing DagVersion %s to the DB", dag_version)
         session.add(dag_version)
-        session.commit()
         log.debug("DagVersion %s written to the DB", dag_version)
         return dag_version
 
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 5f6e5fdfc61..4234fb27641 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -3793,7 +3793,7 @@ class TestSchedulerJob:
         SerializedDagModel.write_dag(
             LazyDeserializedDAG.from_dag(dag), bundle_name="testing", 
session=session
         )
-
+        session.commit()
         dag_version_2 = DagVersion.get_latest_version(dr.dag_id, 
session=session)
         assert dag_version_2 != dag_version_1
 
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py 
b/airflow-core/tests/unit/models/test_serialized_dag.py
index ca1c20b5d8d..92446cee218 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -628,3 +628,57 @@ class TestSerializedDagModel:
         updated_sdag = SDM.get("test_dynamic_success", session=session)
         assert updated_sdag.dag_hash != initial_hash  # Hash should change
         assert len(updated_sdag.dag.task_dict) == 2  # Should have 2 tasks now
+
+    def test_write_dag_atomicity_on_dagcode_failure(self, dag_maker, session):
+        """
+        Test that SerializedDagModel.write_dag maintains atomicity.
+
+        If DagCode.write_code fails, the entire transaction should rollback,
+        including the DagVersion. This test verifies that DagVersion is not
+        committed separately, which would leave orphaned records.
+
+        This test would fail if DagVersion.write_dag() was used (which commits
+        immediately), because the DagVersion would be persisted even though
+        the rest of the transaction failed.
+        """
+        from airflow.models.dagcode import DagCode
+
+        with dag_maker("test_atomicity_dag"):
+            EmptyOperator(task_id="task1")
+
+        dag = dag_maker.dag
+        initial_version_count = 
session.query(DagVersion).filter(DagVersion.dag_id == dag.dag_id).count()
+        assert initial_version_count == 1, "Should have one DagVersion after 
initial write"
+        dag_maker.create_dagrun()  # ensure the second dag version is created
+
+        EmptyOperator(task_id="task2", dag=dag)
+        modified_lazy_dag = LazyDeserializedDAG.from_dag(dag)
+
+        # Mock DagCode.write_code to raise an exception
+        with mock.patch.object(
+            DagCode, "write_code", side_effect=RuntimeError("Simulated 
DagCode.write_code failure")
+        ):
+            with pytest.raises(RuntimeError, match="Simulated 
DagCode.write_code failure"):
+                SDM.write_dag(
+                    dag=modified_lazy_dag,
+                    bundle_name="testing",
+                    bundle_version=None,
+                    session=session,
+                )
+            session.rollback()
+
+            # Verify that no new DagVersion was committed
+            # Use a fresh session to ensure we're reading from committed data
+            with create_session() as fresh_session:
+                final_version_count = (
+                    fresh_session.query(DagVersion).filter(DagVersion.dag_id 
== dag.dag_id).count()
+                )
+                assert final_version_count == initial_version_count, (
+                    "DagVersion should not be committed when 
DagCode.write_code fails"
+                )
+
+                sdag = SDM.get(dag.dag_id, session=fresh_session)
+                assert sdag is not None, "Original SerializedDagModel should 
still exist"
+                assert len(sdag.dag.task_dict) == 1, (
+                    "SerializedDagModel should not be updated when write fails"
+                )

Reply via email to