This is an automated email from the ASF dual-hosted git repository.
ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e09ad45c785 Refresh bundle version in place when DAG serialization is
unchanged (#68336)
e09ad45c785 is described below
commit e09ad45c7852460a2c65f7baaa66869be31b3cdb
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Jun 15 12:26:21 2026 +0100
Refresh bundle version in place when DAG serialization is unchanged (#68336)
* Refresh bundle version in place when DAG serialization is unchanged
When a DAG bundle advances to a new version/commit but the DAG's
serialized content is byte-identical, write_dag took the 'unchanged'
fast path and left the latest DagVersion's bundle_version pinned to the
old commit. Tasks resolve their code from ti.dag_version.bundle_version
at run time, so new runs and reruns 'with the latest bundle version'
executed an outdated commit.
Refresh the latest version's bundle_version/version_data (and dag code)
in place instead. This does not create a new DagVersion, avoiding version
inflation on every bundle commit, while keeping the recorded bundle
pointer current.
closes: #67657
* Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---------
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---
airflow-core/src/airflow/models/serialized_dag.py | 18 +++++--
.../tests/unit/models/test_serialized_dag.py | 63 ++++++++++++++++++++++
2 files changed, 78 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/models/serialized_dag.py
b/airflow-core/src/airflow/models/serialized_dag.py
index 4f20c3a180e..a27cad68908 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -687,9 +687,21 @@ class SerializedDagModel(Base):
and dag_version
and dag_version.bundle_name == bundle_name
):
- if name_updated:
- # The serialized DAG itself is unchanged, but deadline alert
name(s) were
- # updated in the DB, so report True so callers know a write
did occur.
+ # Serialized content is unchanged, so we don't create a new
DagVersion.
+ # But if the bundle advanced, refresh the latest version's pointer
in place — tasks resolve
+ # their code from ``ti.dag_version.bundle_version`` at run time,
so a stale
+ # pointer makes runs execute an outdated commit.
+ bundle_metadata_changed = (
+ dag_version.bundle_version != bundle_version or
dag_version.version_data != version_data
+ )
+ if bundle_metadata_changed:
+ dag_version.bundle_version = bundle_version
+ dag_version.version_data = version_data
+ session.merge(dag_version)
+ DagCode.update_source_code(dag_id=dag.dag_id,
fileloc=dag.fileloc, session=session)
+ if name_updated or bundle_metadata_changed:
+ # A write occurred — a deadline alert name update and/or a
bundle
+ # metadata refresh — so report True so callers know the DB
changed.
return True
log.debug("Serialized DAG (%s) is unchanged. Skipping writing to
DB", dag.dag_id)
return False
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py
b/airflow-core/tests/unit/models/test_serialized_dag.py
index dec06a346ec..5fa7d155f8e 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -585,6 +585,69 @@ class TestSerializedDagModel:
# There should now be two versions of the DAG
assert session.scalar(select(func.count()).select_from(DagVersion)) ==
2
+ def test_bundle_version_refreshed_in_place_when_hash_unchanged(self,
dag_maker, session):
+ """When the bundle advances to a new version/commit but the DAG's
serialized
+ content is unchanged, ``write_dag`` must refresh the latest
DagVersion's
+ ``bundle_version`` in place (so tasks resolve the current commit)
without
+ creating a new DagVersion (which would inflate versions on every
commit).
+ """
+ with dag_maker("test_dag_bundle_version_refresh",
bundle_name="bundleA") as dag:
+ EmptyOperator(task_id="task1")
+ # Pin the version with task instances so the in-place "no TI" branch
is NOT taken.
+ dag_maker.create_dagrun(run_id="test_run")
+
+ did_write = SDM.write_dag(
+ LazyDeserializedDAG.from_dag(dag),
+ bundle_name="bundleA",
+ bundle_version="commit_A",
+ version_data={"manifest": "A"},
+ session=session,
+ )
+ assert did_write is True
+ assert session.scalar(select(func.count()).select_from(DagVersion)) ==
1
+ latest = DagVersion.get_latest_version(dag.dag_id, session=session)
+ assert latest.bundle_version == "commit_A"
+ assert latest.version_data == {"manifest": "A"}
+
+ # Same content, same bundle_name, but the bundle moved to a new commit.
+ did_write = SDM.write_dag(
+ LazyDeserializedDAG.from_dag(dag),
+ bundle_name="bundleA",
+ bundle_version="commit_B",
+ version_data={"manifest": "B"},
+ session=session,
+ )
+
+ # No new version was created, but the latest version's bundle pointer
advanced.
+ assert did_write is True
+ assert session.scalar(select(func.count()).select_from(DagVersion)) ==
1
+ latest = DagVersion.get_latest_version(dag.dag_id, session=session)
+ assert latest.bundle_version == "commit_B"
+ assert latest.version_data == {"manifest": "B"}
+
+ def test_write_dag_unchanged_with_same_bundle_version_skips_write(self,
dag_maker, session):
+ """A re-parse with identical content and identical bundle metadata is
a no-op."""
+ with dag_maker("test_dag_unchanged_noop", bundle_name="bundleA") as
dag:
+ EmptyOperator(task_id="task1")
+ dag_maker.create_dagrun(run_id="test_run")
+
+ SDM.write_dag(
+ LazyDeserializedDAG.from_dag(dag),
+ bundle_name="bundleA",
+ bundle_version="commit_A",
+ session=session,
+ )
+
+ did_write = SDM.write_dag(
+ LazyDeserializedDAG.from_dag(dag),
+ bundle_name="bundleA",
+ bundle_version="commit_A",
+ session=session,
+ )
+
+ assert did_write is False
+ assert session.scalar(select(func.count()).select_from(DagVersion)) ==
1
+
def test_hash_method_removes_fileloc_and_remains_consistent(self):
"""Test that the hash method removes fileloc before hashing."""
test_data = {