This is an automated email from the ASF dual-hosted git repository.

ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e09ad45c785 Refresh bundle version in place when DAG serialization is 
unchanged (#68336)
e09ad45c785 is described below

commit e09ad45c7852460a2c65f7baaa66869be31b3cdb
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Jun 15 12:26:21 2026 +0100

    Refresh bundle version in place when DAG serialization is unchanged (#68336)
    
    * Refresh bundle version in place when DAG serialization is unchanged
    
    When a DAG bundle advances to a new version/commit but the DAG's
    serialized content is byte-identical, write_dag took the 'unchanged'
    fast path and left the latest DagVersion's bundle_version pinned to the
    old commit. Tasks resolve their code from ti.dag_version.bundle_version
    at run time, so new runs and reruns 'with the latest bundle version'
    executed an outdated commit.
    
    Refresh the latest version's bundle_version/version_data (and dag code)
    in place instead. This does not create a new DagVersion, avoiding version
    inflation on every bundle commit, while keeping the recorded bundle
    pointer current.
    
    closes: #67657
    
    * Potential fix for pull request finding
    
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
    
    ---------
    
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
---
 airflow-core/src/airflow/models/serialized_dag.py  | 18 +++++--
 .../tests/unit/models/test_serialized_dag.py       | 63 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/models/serialized_dag.py 
b/airflow-core/src/airflow/models/serialized_dag.py
index 4f20c3a180e..a27cad68908 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -687,9 +687,21 @@ class SerializedDagModel(Base):
             and dag_version
             and dag_version.bundle_name == bundle_name
         ):
-            if name_updated:
-                # The serialized DAG itself is unchanged, but deadline alert 
name(s) were
-                # updated in the DB, so report True so callers know a write 
did occur.
+            # Serialized content is unchanged, so we don't create a new 
DagVersion.
+            # But if the bundle advanced, refresh the latest version's pointer 
in place — tasks resolve
+            # their code from ``ti.dag_version.bundle_version`` at run time, 
so a stale
+            # pointer makes runs execute an outdated commit.
+            bundle_metadata_changed = (
+                dag_version.bundle_version != bundle_version or 
dag_version.version_data != version_data
+            )
+            if bundle_metadata_changed:
+                dag_version.bundle_version = bundle_version
+                dag_version.version_data = version_data
+                session.merge(dag_version)
+                DagCode.update_source_code(dag_id=dag.dag_id, 
fileloc=dag.fileloc, session=session)
+            if name_updated or bundle_metadata_changed:
+                # A write occurred — a deadline alert name update and/or a 
bundle
+                # metadata refresh — so report True so callers know the DB 
changed.
                 return True
             log.debug("Serialized DAG (%s) is unchanged. Skipping writing to 
DB", dag.dag_id)
             return False
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py 
b/airflow-core/tests/unit/models/test_serialized_dag.py
index dec06a346ec..5fa7d155f8e 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -585,6 +585,69 @@ class TestSerializedDagModel:
         # There should now be two versions of the DAG
         assert session.scalar(select(func.count()).select_from(DagVersion)) == 
2
 
+    def test_bundle_version_refreshed_in_place_when_hash_unchanged(self, 
dag_maker, session):
+        """When the bundle advances to a new version/commit but the DAG's 
serialized
+        content is unchanged, ``write_dag`` must refresh the latest 
DagVersion's
+        ``bundle_version`` in place (so tasks resolve the current commit) 
without
+        creating a new DagVersion (which would inflate versions on every 
commit).
+        """
+        with dag_maker("test_dag_bundle_version_refresh", 
bundle_name="bundleA") as dag:
+            EmptyOperator(task_id="task1")
+        # Pin the version with task instances so the in-place "no TI" branch 
is NOT taken.
+        dag_maker.create_dagrun(run_id="test_run")
+
+        did_write = SDM.write_dag(
+            LazyDeserializedDAG.from_dag(dag),
+            bundle_name="bundleA",
+            bundle_version="commit_A",
+            version_data={"manifest": "A"},
+            session=session,
+        )
+        assert did_write is True
+        assert session.scalar(select(func.count()).select_from(DagVersion)) == 
1
+        latest = DagVersion.get_latest_version(dag.dag_id, session=session)
+        assert latest.bundle_version == "commit_A"
+        assert latest.version_data == {"manifest": "A"}
+
+        # Same content, same bundle_name, but the bundle moved to a new commit.
+        did_write = SDM.write_dag(
+            LazyDeserializedDAG.from_dag(dag),
+            bundle_name="bundleA",
+            bundle_version="commit_B",
+            version_data={"manifest": "B"},
+            session=session,
+        )
+
+        # No new version was created, but the latest version's bundle pointer 
advanced.
+        assert did_write is True
+        assert session.scalar(select(func.count()).select_from(DagVersion)) == 
1
+        latest = DagVersion.get_latest_version(dag.dag_id, session=session)
+        assert latest.bundle_version == "commit_B"
+        assert latest.version_data == {"manifest": "B"}
+
+    def test_write_dag_unchanged_with_same_bundle_version_skips_write(self, 
dag_maker, session):
+        """A re-parse with identical content and identical bundle metadata is 
a no-op."""
+        with dag_maker("test_dag_unchanged_noop", bundle_name="bundleA") as 
dag:
+            EmptyOperator(task_id="task1")
+        dag_maker.create_dagrun(run_id="test_run")
+
+        SDM.write_dag(
+            LazyDeserializedDAG.from_dag(dag),
+            bundle_name="bundleA",
+            bundle_version="commit_A",
+            session=session,
+        )
+
+        did_write = SDM.write_dag(
+            LazyDeserializedDAG.from_dag(dag),
+            bundle_name="bundleA",
+            bundle_version="commit_A",
+            session=session,
+        )
+
+        assert did_write is False
+        assert session.scalar(select(func.count()).select_from(DagVersion)) == 
1
+
     def test_hash_method_removes_fileloc_and_remains_consistent(self):
         """Test that the hash method removes fileloc before hashing."""
         test_data = {

Reply via email to