This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 41df5a980c8 Add BundleVersion dataclass and version_data persistence
to DagVersion (#66491)
41df5a980c8 is described below
commit 41df5a980c8d92fabc0fb98e5b4c808d90292463
Author: Niko Oliveira <[email protected]>
AuthorDate: Tue May 19 12:24:36 2026 -0700
Add BundleVersion dataclass and version_data persistence to DagVersion
(#66491)
* Add BundleVersion dataclass and version_data persistence to DagVersion
Add infrastructure for bundles to return structured version metadata
alongside their version string. This enables block storage based
bundles to be versioned (e.g. manifest based S3 Dag Bundle versioning)
in follow-up PRs without requiring additional core changes.
Changes:
- Add BundleVersion frozen dataclass (version + optional data dict)
- Widen BaseDagBundle.get_current_version() return type to accept
BundleVersion, with backwards-compatible handling of legacy str returns
- Add nullable version_data (JSON) column to dag_version table via
Alembic migration 0114
- Thread version_data through the full persistence chain:
DagFileProcessorManager -> update_dag_parsing_results_in_db ->
SerializedDagModel.write_dag -> DagVersion.write_dag
- Emit DeprecationWarning for versioned bundles returning bare strings
- Fix dag_command.py to unpack BundleVersion before passing to
sync_bag_to_db
All changes are additive and backwards compatible. Existing bundles
continue to work unchanged with version_data=NULL.
* Address review feedback: fix data loss, extract shared helper, update
GitDagBundle
Review feedback from potiuk on PR #66491:
1. Fix data loss in dag_reserialize: pass version_data through
sync_bag_to_db
so reserialization preserves BundleVersion.data on DagVersion rows.
2. Extract _unpack_bundle_version from DagFileProcessorManager to
module-level
unpack_bundle_version() in bundles/base.py. Both CLI and manager now use
the same helper (with deprecation warning for legacy string returns).
3. Migrate GitDagBundle to return BundleVersion on Airflow 3.3+ using
AIRFLOW_V_3_3_PLUS version guard for backwards compatibility.
4. Tighten BundleVersion.data type to dict[str, Any] | None with mutation
note.
5. Fix sa.JSON -> sa.JSON() in dag_version.py for style consistency.
6. Add comment explaining is None vs falsy check in manager.py.
* Fix: add '# use next version' comment for common-compat in git provider
The CI 'Build info' check requires this comment when common.compat
changes alongside another provider in the same PR.
* Fix: RST docstring formatting in get_current_version to fix sphinx doc
build
Move the trailing paragraph before the bullet list and add a blank
line before the list items so that sphinx does not produce 'Unexpected
indentation' errors in auto-generated API docs for the git provider.
* Fix: git bundle tests use _version_str helper for compat with Airflow <
3.3
The compat tests (3.0.6, 3.1.8) fail because get_current_version()
returns a plain str when AIRFLOW_V_3_3_PLUS is False, but the tests
unconditionally call .version on the result. Add a _version_str()
helper that extracts the version string from either BundleVersion or
str.
---
airflow-core/docs/migrations-ref.rst | 4 +-
.../src/airflow/cli/commands/dag_command.py | 6 +-
.../src/airflow/dag_processing/bundles/base.py | 57 +++++++++++++++-
.../src/airflow/dag_processing/collection.py | 4 ++
airflow-core/src/airflow/dag_processing/dagbag.py | 4 +-
airflow-core/src/airflow/dag_processing/manager.py | 20 +++++-
.../0115_3_3_0_add_version_data_to_dag_version.py | 53 +++++++++++++++
airflow-core/src/airflow/models/dag_version.py | 6 ++
airflow-core/src/airflow/models/serialized_dag.py | 4 ++
airflow-core/src/airflow/utils/db.py | 2 +-
.../tests/unit/dag_processing/bundles/test_base.py | 29 ++++++++
.../tests/unit/dag_processing/test_collection.py | 1 +
.../tests/unit/dag_processing/test_manager.py | 66 +++++++++++++++++++
.../tests/unit/dag_processing/test_processor.py | 1 +
airflow-core/tests/unit/models/test_dag_version.py | 38 +++++++++++
.../providers/common/compat/version_compat.py | 2 +
providers/git/pyproject.toml | 2 +-
.../git/src/airflow/providers/git/bundles/git.py | 16 +++--
providers/git/tests/unit/git/bundles/test_git.py | 77 ++++++++++++----------
19 files changed, 342 insertions(+), 50 deletions(-)
diff --git a/airflow-core/docs/migrations-ref.rst
b/airflow-core/docs/migrations-ref.rst
index a9502d28fe9..9d80414b64b 100644
--- a/airflow-core/docs/migrations-ref.rst
+++ b/airflow-core/docs/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are
executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description
|
+=========================+==================+===================+==============================================================+
-| ``a7f3b2c1d4e5`` (head) | ``b8f3e4a1d2c9`` | ``3.3.0`` | Add
allow_producer_teams column to |
+| ``a1b2c3d4e5f6`` (head) | ``a7f3b2c1d4e5`` | ``3.3.0`` | Add
version_data to dag_version. |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``a7f3b2c1d4e5`` | ``b8f3e4a1d2c9`` | ``3.3.0`` | Add
allow_producer_teams column to |
| | | |
dag_schedule_asset_reference table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``b8f3e4a1d2c9`` | ``fde9ed84d07b`` | ``3.3.0`` | Add
retry_delay_override and retry_reason to task_instance. |
diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py
b/airflow-core/src/airflow/cli/commands/dag_command.py
index d0162a49fc5..13a4ad90596 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -37,6 +37,7 @@ from airflow.api.client import get_current_api_client
from airflow.api_fastapi.core_api.datamodels.dags import DAGResponse
from airflow.cli.simple_table import AirflowConsole
from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
+from airflow.dag_processing.bundles.base import unpack_bundle_version
from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.dag_processing.dagbag import BundleDagBag, DagBag, sync_bag_to_db
from airflow.exceptions import AirflowConfigException, AirflowException
@@ -740,4 +741,7 @@ def dag_reserialize(args, session: Session = NEW_SESSION)
-> None:
continue
bundle.initialize()
dag_bag = BundleDagBag(bundle.path, bundle_path=bundle.path,
bundle_name=bundle.name)
- sync_bag_to_db(dag_bag, bundle.name,
bundle_version=bundle.get_current_version(), session=session)
+ version, version_data =
unpack_bundle_version(bundle.get_current_version(), bundle)
+ sync_bag_to_db(
+ dag_bag, bundle.name, bundle_version=version,
version_data=version_data, session=session
+ )
diff --git a/airflow-core/src/airflow/dag_processing/bundles/base.py
b/airflow-core/src/airflow/dag_processing/bundles/base.py
index 2f4654382cb..b6b55f9251c 100644
--- a/airflow-core/src/airflow/dag_processing/bundles/base.py
+++ b/airflow-core/src/airflow/dag_processing/bundles/base.py
@@ -30,7 +30,7 @@ from datetime import timedelta
from fcntl import LOCK_SH, LOCK_UN, flock
from operator import attrgetter
from pathlib import Path
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
import pendulum
from pendulum.parsing import ParserError
@@ -229,6 +229,50 @@ class BundleUsageTrackingManager:
self._remove_stale_bundle_versions_for_bundle(bundle_name=bundle.name)
+@dataclass(frozen=True)
+class BundleVersion:
+ """
+ Version information returned by a DAG bundle.
+
+ Bundles return this from ``get_current_version()`` to provide both a
version
+ identifier and optional structured data (e.g., a manifest) atomically.
+
+ :param version: A string identifier for this bundle version (e.g., git
SHA, content hash).
+ :param data: Optional structured data associated with this version (e.g.,
S3 manifest).
+ Mutating ``data`` after construction is undefined behavior.
+ """
+
+ version: str
+ data: dict[str, Any] | None = None
+
+
+def unpack_bundle_version(
+ result: str | BundleVersion | None, bundle: BaseDagBundle
+) -> tuple[str | None, dict[str, Any] | None]:
+ """
+ Unpack the return value of get_current_version().
+
+ Handles both the new BundleVersion dataclass and legacy str | None returns.
+ Emits a deprecation warning for bare string returns from versioned bundles.
+
+ :return: Tuple of (version_string, version_data)
+ """
+ if result is None:
+ return None, None
+ if isinstance(result, BundleVersion):
+ return result.version, result.data
+ # Legacy path: bare string return
+ if bundle.supports_versioning:
+ warnings.warn(
+ f"Bundle '{bundle.name}' returned a plain string from
get_current_version(). "
+ f"Return a BundleVersion instance instead. "
+ f"Plain string returns are deprecated and will be removed in a
future version.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ return result, None
+
+
class BaseDagBundle(ABC):
"""
Base class for DAG bundles.
@@ -313,11 +357,18 @@ class BaseDagBundle(ABC):
"""
@abstractmethod
- def get_current_version(self) -> str | None:
+ def get_current_version(self) -> str | BundleVersion | None:
"""
- Retrieve a string that represents the version of the DAG bundle.
+ Retrieve the version of the DAG bundle.
Airflow can use this value to retrieve this same bundle version later.
+
+ May return:
+
+ - A ``BundleVersion`` instance (preferred) containing both a version
string and
+ optional structured data (e.g., a manifest).
+ - A plain string (deprecated; will emit a warning in a future release).
+ - None if the bundle does not support versioning.
"""
@abstractmethod
diff --git a/airflow-core/src/airflow/dag_processing/collection.py
b/airflow-core/src/airflow/dag_processing/collection.py
index 6f8bc752bbc..361f09871c8 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -262,6 +262,7 @@ def _serialize_dag_capturing_errors(
bundle_name,
session: Session,
bundle_version: str | None,
+ version_data: dict | None = None,
_prefetched: DagWriteMetadata | None = None,
):
"""
@@ -282,6 +283,7 @@ def _serialize_dag_capturing_errors(
dag,
bundle_name=bundle_name,
bundle_version=bundle_version,
+ version_data=version_data,
min_update_interval=MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
session=session,
_prefetched=_prefetched,
@@ -436,6 +438,7 @@ def update_dag_parsing_results_in_db(
warnings: set[DagWarning],
session: Session,
*,
+ version_data: dict | None = None,
warning_types: tuple[DagWarningType, ...] = (
DagWarningType.NONEXISTENT_POOL,
DagWarningType.RUNTIME_VARYING_VALUE,
@@ -493,6 +496,7 @@ def update_dag_parsing_results_in_db(
dag=dag,
bundle_name=bundle_name,
bundle_version=bundle_version,
+ version_data=version_data,
session=session,
_prefetched=prefetched_metadata.get(dag.dag_id),
)
diff --git a/airflow-core/src/airflow/dag_processing/dagbag.py
b/airflow-core/src/airflow/dag_processing/dagbag.py
index 5062a47bc7e..bd0a6dacb86 100644
--- a/airflow-core/src/airflow/dag_processing/dagbag.py
+++ b/airflow-core/src/airflow/dag_processing/dagbag.py
@@ -25,7 +25,7 @@ import warnings
from collections.abc import Generator
from datetime import datetime, timedelta
from pathlib import Path
-from typing import TYPE_CHECKING, NamedTuple
+from typing import TYPE_CHECKING, Any, NamedTuple
from tabulate import tabulate
@@ -574,6 +574,7 @@ def sync_bag_to_db(
bundle_name: str,
bundle_version: str | None,
*,
+ version_data: dict[str, Any] | None = None,
session: Session = NEW_SESSION,
) -> None:
"""Save attributes about list of DAG to the DB."""
@@ -598,5 +599,6 @@ def sync_bag_to_db(
None, # file parsing duration is not well defined when parsing
multiple files / multiple DAGs.
dagbag.dag_warnings,
session=session,
+ version_data=version_data,
files_parsed=files_parsed,
)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 72e51e48743..ed5c61c604c 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -50,7 +50,10 @@ from airflow._shared.observability.metrics.stats import
normalize_name_for_stats
from airflow._shared.timezones import timezone
from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
from airflow.configuration import conf
-from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
+from airflow.dag_processing.bundles.base import (
+ BundleUsageTrackingManager,
+ unpack_bundle_version,
+)
from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.dag_processing.collection import update_dag_parsing_results_in_db
from airflow.dag_processing.processor import DagFileParsingResult,
DagFileProcessorProcess
@@ -241,6 +244,7 @@ class DagFileProcessorManager(LoggingMixin):
_dag_bundles: list[BaseDagBundle] = attrs.field(factory=list, init=False)
_bundle_versions: dict[str, str | None] = attrs.field(factory=dict,
init=False)
+ _bundle_version_data: dict[str, dict | None] = attrs.field(factory=dict,
init=False)
_processors: dict[DagFileInfo, DagFileProcessorProcess] =
attrs.field(factory=dict, init=False)
@@ -770,7 +774,10 @@ class DagFileProcessorManager(LoggingMixin):
if bundle.supports_versioning:
# we will also check the version of the bundle to see if
another DAG processor has seen
# a new version
- pre_refresh_version = self._bundle_versions.get(bundle.name)
or bundle.get_current_version()
+ pre_refresh_version = self._bundle_versions.get(bundle.name)
+ # Use `is None` (not falsy) so an empty-string version is
treated as a valid cached value.
+ if pre_refresh_version is None:
+ pre_refresh_version, _ =
unpack_bundle_version(bundle.get_current_version(), bundle)
current_version_matches_db = pre_refresh_version ==
bundle_state.version
else:
# With no versioning, it always "matches"
@@ -801,7 +808,9 @@ class DagFileProcessorManager(LoggingMixin):
# We can short-circuit the rest of this if (1) bundle was seen
before by
# this dag processor and (2) the version of the bundle did not
change
# after refreshing it
- version_after_refresh = bundle.get_current_version()
+ version_after_refresh, version_data_after_refresh =
unpack_bundle_version(
+ bundle.get_current_version(), bundle
+ )
if previously_seen and pre_refresh_version ==
version_after_refresh:
self.log.debug(
"Bundle %s version not changed after refresh: %s",
@@ -817,6 +826,7 @@ class DagFileProcessorManager(LoggingMixin):
self.log.info("Version changed for %s, new version: %s",
bundle.name, version_after_refresh)
else:
version_after_refresh = None
+ version_data_after_refresh = None
# Persistence failure must not skip file scanning (bundle is
already refreshed locally).
# _bundle_versions is only advanced on success to stay consistent
with the DB.
@@ -826,6 +836,7 @@ class DagFileProcessorManager(LoggingMixin):
self.log.exception("Error persisting state for bundle %s",
bundle.name)
else:
self._bundle_versions[bundle.name] = version_after_refresh
+ self._bundle_version_data[bundle.name] =
version_data_after_refresh
found_files = {
DagFileInfo(rel_path=p, bundle_name=bundle.name,
bundle_path=bundle.path)
@@ -1126,6 +1137,7 @@ class DagFileProcessorManager(LoggingMixin):
self.persist_parsing_result(
bundle_name=file.bundle_name,
bundle_version=self._bundle_versions[file.bundle_name],
+
version_data=self._bundle_version_data.get(file.bundle_name),
parsing_result=proc.parsing_result,
run_duration=run_duration,
relative_fileloc=str(file.rel_path),
@@ -1157,6 +1169,7 @@ class DagFileProcessorManager(LoggingMixin):
*,
bundle_name: str,
bundle_version: str | None,
+ version_data: dict | None,
parsing_result: DagFileParsingResult,
run_duration: float,
relative_fileloc: str | None,
@@ -1183,6 +1196,7 @@ class DagFileProcessorManager(LoggingMixin):
update_dag_parsing_results_in_db(
bundle_name=bundle_name,
bundle_version=bundle_version,
+ version_data=version_data,
dags=parsing_result.serialized_dags,
import_errors=import_errors,
parse_duration=run_duration,
diff --git
a/airflow-core/src/airflow/migrations/versions/0115_3_3_0_add_version_data_to_dag_version.py
b/airflow-core/src/airflow/migrations/versions/0115_3_3_0_add_version_data_to_dag_version.py
new file mode 100644
index 00000000000..deca97ceacb
--- /dev/null
+++
b/airflow-core/src/airflow/migrations/versions/0115_3_3_0_add_version_data_to_dag_version.py
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add version_data to dag_version.
+
+Adds a nullable JSON column to the dag_version table for storing
+structured version metadata (e.g., S3 object version manifests).
+Bundles that do not use version_data leave this column NULL.
+
+On PostgreSQL and MySQL 8+, adding a nullable column without a
+default is a metadata-only operation (no table rewrite).
+
+Revision ID: a1b2c3d4e5f6
+Revises: a7f3b2c1d4e5
+Create Date: 2026-05-05 23:30:00.000000
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+revision = "a1b2c3d4e5f6"
+down_revision = "a7f3b2c1d4e5"
+branch_labels = None
+depends_on = None
+airflow_version = "3.3.0"
+
+
+def upgrade():
+ """Add version_data column to dag_version table."""
+ op.add_column("dag_version", sa.Column("version_data", sa.JSON(),
nullable=True))
+
+
+def downgrade():
+ """Remove version_data column from dag_version table."""
+ op.drop_column("dag_version", "version_data")
diff --git a/airflow-core/src/airflow/models/dag_version.py
b/airflow-core/src/airflow/models/dag_version.py
index cfdb301d041..45ea40c091b 100644
--- a/airflow-core/src/airflow/models/dag_version.py
+++ b/airflow-core/src/airflow/models/dag_version.py
@@ -53,6 +53,7 @@ class DagVersion(Base):
dag_model = relationship("DagModel", back_populates="dag_versions")
bundle_name: Mapped[str | None] = mapped_column(StringID(), nullable=True)
bundle_version: Mapped[str | None] = mapped_column(StringID(),
nullable=True)
+ version_data: Mapped[dict | None] = mapped_column(sa.JSON(), nullable=True)
bundle = relationship(
"DagBundleModel",
primaryjoin="foreign(DagVersion.bundle_name) == DagBundleModel.name",
@@ -111,6 +112,7 @@ class DagVersion(Base):
dag_id: str,
bundle_name: str,
bundle_version: str | None = None,
+ version_data: dict | None = None,
version_number: int = 1,
session: Session = NEW_SESSION,
) -> DagVersion:
@@ -120,6 +122,9 @@ class DagVersion(Base):
Checks if a version of the DAG exists and increments the version
number if it does.
:param dag_id: The DAG ID.
+ :param bundle_name: The bundle name.
+ :param bundle_version: The bundle version string.
+ :param version_data: Optional structured data associated with this
version (e.g., S3 manifest).
:param version_number: The version number.
:param session: The database session.
:return: The DagVersion object.
@@ -135,6 +140,7 @@ class DagVersion(Base):
version_number=version_number,
bundle_name=bundle_name,
bundle_version=bundle_version,
+ version_data=version_data,
)
log.debug("Writing DagVersion %s to the DB", dag_version)
session.add(dag_version)
diff --git a/airflow-core/src/airflow/models/serialized_dag.py
b/airflow-core/src/airflow/models/serialized_dag.py
index be55c3e0908..d2d9d47d61c 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -597,6 +597,7 @@ class SerializedDagModel(Base):
dag: LazyDeserializedDAG,
bundle_name: str,
bundle_version: str | None = None,
+ version_data: dict | None = None,
min_update_interval: int | None = None,
session: Session = NEW_SESSION,
_prefetched: DagWriteMetadata | None = None,
@@ -610,6 +611,7 @@ class SerializedDagModel(Base):
:param dag: a DAG to be written into database
:param bundle_name: bundle name of the DAG
:param bundle_version: bundle version of the DAG
+ :param version_data: optional structured data associated with this
version
:param min_update_interval: minimal interval in seconds to update
serialized DAG
:param session: ORM Session
:param _prefetched: Pre-fetched metadata to skip per-DAG queries; used
by bulk callers
@@ -740,6 +742,7 @@ class SerializedDagModel(Base):
# Update the latest dag version
dag_version.bundle_name = bundle_name
dag_version.bundle_version = bundle_version
+ dag_version.version_data = version_data
session.merge(dag_version)
# Update the latest DagCode
DagCode.update_source_code(dag_id=dag.dag_id, fileloc=dag.fileloc,
session=session)
@@ -749,6 +752,7 @@ class SerializedDagModel(Base):
dag_id=dag.dag_id,
bundle_name=bundle_name,
bundle_version=bundle_version,
+ version_data=version_data,
session=session,
)
log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id)
diff --git a/airflow-core/src/airflow/utils/db.py
b/airflow-core/src/airflow/utils/db.py
index 4caa9901bfb..9b747d7fc3f 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -116,7 +116,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
"3.1.0": "cc92b33c6709",
"3.1.8": "509b94a1042d",
"3.2.0": "1d6611b6ab7c",
- "3.3.0": "a7f3b2c1d4e5",
+ "3.3.0": "a1b2c3d4e5f6",
}
# Prefix used to identify tables holding data moved during migration.
diff --git a/airflow-core/tests/unit/dag_processing/bundles/test_base.py
b/airflow-core/tests/unit/dag_processing/bundles/test_base.py
index bd6360de944..6fc7ba39a0a 100644
--- a/airflow-core/tests/unit/dag_processing/bundles/test_base.py
+++ b/airflow-core/tests/unit/dag_processing/bundles/test_base.py
@@ -33,6 +33,7 @@ from airflow._shared.timezones import timezone as tz
from airflow.dag_processing.bundles.base import (
BaseDagBundle,
BundleUsageTrackingManager,
+ BundleVersion,
BundleVersionLock,
get_bundle_storage_root_path,
)
@@ -294,3 +295,31 @@ class TestBundleUsageTrackingManager:
assert len(lock_files) == expected_remaining
bundle_folders = list(b.versions_dir.iterdir())
assert len(bundle_folders) == expected_remaining
+
+
+class TestBundleVersion:
+ def test_bundle_version_with_version_only(self):
+ bv = BundleVersion(version="abc123")
+ assert bv.version == "abc123"
+ assert bv.data is None
+
+ def test_bundle_version_with_data(self):
+ data = {"schema_version": 1, "files": {"dag.py": "v1"}}
+ bv = BundleVersion(version="sha256hex", data=data)
+ assert bv.version == "sha256hex"
+ assert bv.data == data
+
+ def test_bundle_version_is_frozen(self):
+ bv = BundleVersion(version="abc")
+ with pytest.raises(AttributeError):
+ bv.version = "xyz"
+
+ def test_bundle_version_equality(self):
+ bv1 = BundleVersion(version="abc", data={"key": "val"})
+ bv2 = BundleVersion(version="abc", data={"key": "val"})
+ assert bv1 == bv2
+
+ def test_bundle_version_inequality(self):
+ bv1 = BundleVersion(version="abc", data={"key": "val"})
+ bv2 = BundleVersion(version="abc", data={"key": "other"})
+ assert bv1 != bv2
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py
b/airflow-core/tests/unit/dag_processing/test_collection.py
index 40224253961..b0a65f3cd9c 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -570,6 +570,7 @@ class TestUpdateDagParsingResults:
mock_dag,
bundle_name="testing",
bundle_version=None,
+ version_data=None,
min_update_interval=mock.ANY,
session=mock_session,
_prefetched=mock.ANY,
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 40b819f146e..33aaa828888 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -1184,6 +1184,7 @@ class TestDagFileProcessorManager:
mock_persist.assert_called_once_with(
bundle_name="testing",
bundle_version="v1",
+ version_data=None,
parsing_result=processor.parsing_result,
run_duration=mock.ANY,
relative_fileloc="abc.txt",
@@ -2724,3 +2725,68 @@ class TestDagFileProcessorManager:
# _bundle_versions must NOT advance — DB still holds the old version,
so the next
# iteration will see a version mismatch and re-refresh rather than
skip incorrectly
assert "mock_bundle" not in manager._bundle_versions
+
+ def test_unpack_bundle_version_with_bundle_version_dataclass(self):
+ from airflow.dag_processing.bundles.base import BundleVersion,
unpack_bundle_version
+
+ bundle = mock.MagicMock()
+ bundle.supports_versioning = True
+ result = BundleVersion(version="sha256abc", data={"schema_version": 1,
"files": {}})
+ version, data = unpack_bundle_version(result, bundle)
+ assert version == "sha256abc"
+ assert data == {"schema_version": 1, "files": {}}
+
+ def test_unpack_bundle_version_with_none(self):
+ from airflow.dag_processing.bundles.base import unpack_bundle_version
+
+ bundle = mock.MagicMock()
+ bundle.supports_versioning = True
+ version, data = unpack_bundle_version(None, bundle)
+ assert version is None
+ assert data is None
+
+ def test_unpack_bundle_version_with_legacy_string(self):
+ from airflow.dag_processing.bundles.base import unpack_bundle_version
+
+ bundle = mock.MagicMock()
+ bundle.name = "test_bundle"
+ bundle.supports_versioning = True
+ with pytest.warns(DeprecationWarning, match="plain string"):
+ version, data = unpack_bundle_version("v1.0", bundle)
+ assert version == "v1.0"
+ assert data is None
+
+ def test_unpack_bundle_version_with_string_non_versioned_bundle(self):
+ """Non-versioned bundles returning a string should not emit a
warning."""
+ from airflow.dag_processing.bundles.base import unpack_bundle_version
+
+ bundle = mock.MagicMock()
+ bundle.name = "local_bundle"
+ bundle.supports_versioning = False
+ import warnings as w
+
+ with w.catch_warnings():
+ w.simplefilter("error")
+ version, data = unpack_bundle_version("v1.0", bundle)
+ assert version == "v1.0"
+ assert data is None
+
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_bundle_version_data_stored_after_refresh(self, session):
+ """Test that version_data from BundleVersion is stored in
_bundle_version_data."""
+ from airflow.dag_processing.bundles.base import BundleVersion
+
+ manager = DagFileProcessorManager(max_runs=1)
+ bundle = self._make_refresh_bundle(supports_versioning=True,
current_version="v1")
+ # Override get_current_version to return BundleVersion with data
+ test_data = {"schema_version": 1, "files": {"dag.py": "ver123"}}
+ bundle.get_current_version = mock.MagicMock(
+ return_value=BundleVersion(version="newhash", data=test_data)
+ )
+ # Pre-populate so previously_seen=True and version differs
+ manager._bundle_versions["mock_bundle"] = "oldhash"
+
+ self._refresh_with_mocked_state(manager, bundle,
BundleState(last_refreshed=None, version="oldhash"))
+
+ assert manager._bundle_versions["mock_bundle"] == "newhash"
+ assert manager._bundle_version_data["mock_bundle"] == test_data
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index d2ac085b0b7..7fe7204e68e 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -725,6 +725,7 @@ def test_persist_parsing_result_calls_update_db():
manager,
bundle_name="test-bundle",
bundle_version="v1",
+ version_data=None,
parsing_result=parsing_result,
run_duration=1.5,
relative_fileloc="dags/test.py",
diff --git a/airflow-core/tests/unit/models/test_dag_version.py
b/airflow-core/tests/unit/models/test_dag_version.py
index 6d8e2404954..e7899422093 100644
--- a/airflow-core/tests/unit/models/test_dag_version.py
+++ b/airflow-core/tests/unit/models/test_dag_version.py
@@ -84,3 +84,41 @@ class TestDagVersion:
latest_version = DagVersion.get_latest_version(dag.dag_id)
assert latest_version.version == f"{dag.dag_id}-1"
+
+ @pytest.mark.db_test
+ def test_write_dag_with_version_data(self, dag_maker, session):
+ """Test that version_data is stored and retrievable."""
+ with dag_maker("test_version_data"):
+ pass
+
+ manifest = {"schema_version": 1, "files": {"dags/my_dag.py":
"S3VersionId123"}}
+ DagVersion.write_dag(
+ dag_id="test_version_data",
+ bundle_name="testing",
+ bundle_version="sha256abc",
+ version_data=manifest,
+ session=session,
+ )
+ session.flush()
+
+ retrieved = DagVersion.get_latest_version("test_version_data",
session=session)
+ assert retrieved.version_data == manifest
+ assert retrieved.bundle_version == "sha256abc"
+
+ @pytest.mark.db_test
+ def test_write_dag_without_version_data(self, dag_maker, session):
+ """Test that version_data defaults to None for bundles that don't use
it."""
+ with dag_maker("test_no_version_data"):
+ pass
+
+ DagVersion.write_dag(
+ dag_id="test_no_version_data",
+ bundle_name="testing",
+ bundle_version="abc123",
+ session=session,
+ )
+ session.flush()
+
+ retrieved = DagVersion.get_latest_version("test_no_version_data",
session=session)
+ assert retrieved.version_data is None
+ assert retrieved.bundle_version == "abc123"
diff --git
a/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
b/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
index e3fd1e55f14..c42f56c0a39 100644
---
a/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
+++
b/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
@@ -35,6 +35,7 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_0_PLUS: bool = get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)
AIRFLOW_V_3_2_PLUS: bool = get_base_airflow_version_tuple() >= (3, 2, 0)
+AIRFLOW_V_3_3_PLUS: bool = get_base_airflow_version_tuple() >= (3, 3, 0)
# BaseOperator removed from version_compat to avoid circular imports
# Import it directly in files that need it instead
@@ -43,4 +44,5 @@ __all__ = [
"AIRFLOW_V_3_0_PLUS",
"AIRFLOW_V_3_1_PLUS",
"AIRFLOW_V_3_2_PLUS",
+ "AIRFLOW_V_3_3_PLUS",
]
diff --git a/providers/git/pyproject.toml b/providers/git/pyproject.toml
index e49f48d1e73..8d5f7de4802 100644
--- a/providers/git/pyproject.toml
+++ b/providers/git/pyproject.toml
@@ -60,7 +60,7 @@ requires-python = ">=3.10"
# After you modify the dependencies, and rebuild your Breeze CI image with
``breeze ci-image build``
dependencies = [
"apache-airflow>=3.0.0",
- "apache-airflow-providers-common-compat>=1.12.0",
+ "apache-airflow-providers-common-compat>=1.12.0", # use next version
"GitPython>=3.1.44",
]
diff --git a/providers/git/src/airflow/providers/git/bundles/git.py
b/providers/git/src/airflow/providers/git/bundles/git.py
index 2f03ed48cf1..275aa6b3f63 100644
--- a/providers/git/src/airflow/providers/git/bundles/git.py
+++ b/providers/git/src/airflow/providers/git/bundles/git.py
@@ -29,8 +29,12 @@ from tenacity import retry, retry_if_exception_type,
stop_after_attempt
from airflow.dag_processing.bundles.base import BaseDagBundle
from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_3_PLUS
from airflow.providers.git.hooks.git import GitHook
+if AIRFLOW_V_3_3_PLUS:
+ from airflow.dag_processing.bundles.base import BundleVersion
+
log = structlog.get_logger(__name__)
@@ -329,11 +333,15 @@ class GitDagBundle(BaseDagBundle):
f")>"
)
- def get_current_version(self) -> str:
+ def get_current_version(self) -> str | BundleVersion:
if self.version is not None and getattr(self, "repo", None) is None:
- return self.version
- with self.repo as repo:
- return repo.head.commit.hexsha
+ hexsha = self.version
+ else:
+ with self.repo as repo:
+ hexsha = repo.head.commit.hexsha
+ if AIRFLOW_V_3_3_PLUS:
+ return BundleVersion(version=hexsha)
+ return hexsha
@property
def path(self) -> Path:
diff --git a/providers/git/tests/unit/git/bundles/test_git.py
b/providers/git/tests/unit/git/bundles/test_git.py
index 43808ffdf09..1df439f58cf 100644
--- a/providers/git/tests/unit/git/bundles/test_git.py
+++ b/providers/git/tests/unit/git/bundles/test_git.py
@@ -36,7 +36,14 @@ from airflow.providers.git.bundles.git import GitDagBundle
from airflow.providers.git.hooks.git import GitHook
from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_PLUS,
AIRFLOW_V_3_3_PLUS
+
+
+def _version_str(version_result):
+ """Extract version string from either a BundleVersion (3.3+) or plain str
(older)."""
+ if AIRFLOW_V_3_3_PLUS:
+ return version_result.version
+ return version_result
@pytest.fixture(autouse=True)
@@ -145,7 +152,7 @@ class TestGitDagBundle:
bundle.initialize()
- assert bundle.get_current_version() == repo.head.commit.hexsha
+ assert _version_str(bundle.get_current_version()) ==
repo.head.commit.hexsha
assert_repo_is_closed(bundle)
@@ -171,7 +178,7 @@ class TestGitDagBundle:
)
bundle.initialize()
- assert bundle.get_current_version() == starting_commit.hexsha
+ assert _version_str(bundle.get_current_version()) ==
starting_commit.hexsha
files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
assert {"test_dag.py"} == files_in_repo
@@ -202,7 +209,7 @@ class TestGitDagBundle:
prune_dotgit_folder=False,
)
bundle.initialize()
- assert bundle.get_current_version() == starting_commit.hexsha
+ assert _version_str(bundle.get_current_version()) ==
starting_commit.hexsha
files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
assert {"test_dag.py"} == files_in_repo
@@ -222,7 +229,7 @@ class TestGitDagBundle:
bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS,
tracking_ref=GIT_DEFAULT_BRANCH)
bundle.initialize()
- assert bundle.get_current_version() != starting_commit.hexsha
+ assert _version_str(bundle.get_current_version()) !=
starting_commit.hexsha
files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
assert {"test_dag.py", "new_test.py"} == files_in_repo
@@ -266,7 +273,7 @@ class TestGitDagBundle:
bundle.initialize()
assert (bundle.repo_path / ".git").exists()
- assert bundle.get_current_version() == starting_commit.hexsha
+ assert _version_str(bundle.get_current_version()) ==
starting_commit.hexsha
assert_repo_is_closed(bundle)
@@ -289,7 +296,7 @@ class TestGitDagBundle:
)
bundle1.initialize()
assert not (bundle1.repo_path / ".git").exists()
- assert bundle1.get_current_version() == version
+ assert _version_str(bundle1.get_current_version()) == version
version_path = bundle1.repo_path
# Second init: same name and version; should detect pruned worktree
and skip clone
@@ -305,7 +312,7 @@ class TestGitDagBundle:
mock_clone.assert_not_called()
assert bundle2.repo_path == version_path
- assert bundle2.get_current_version() == version
+ assert _version_str(bundle2.get_current_version()) == version
files_in_repo = {f.name for f in bundle2.path.iterdir() if f.is_file()}
assert {"test_dag.py"} == files_in_repo
@@ -327,7 +334,7 @@ class TestGitDagBundle:
)
bundle1.initialize()
assert (bundle1.repo_path / ".git").exists()
- assert bundle1.get_current_version() == version
+ assert _version_str(bundle1.get_current_version()) == version
# Should detect local repo has correct version and skip clone
with (
@@ -371,7 +378,7 @@ class TestGitDagBundle:
prune_dotgit_folder=False,
)
bundle1.initialize()
- assert bundle1.get_current_version() == second_commit
+ assert _version_str(bundle1.get_current_version()) == second_commit
# Second init with first_commit: different version means different
repo_path
bundle2 = GitDagBundle(
@@ -382,7 +389,7 @@ class TestGitDagBundle:
prune_dotgit_folder=False,
)
bundle2.initialize()
- assert bundle2.get_current_version() == first_commit
+ assert _version_str(bundle2.get_current_version()) == first_commit
assert bundle1.repo_path != bundle2.repo_path
@mock.patch("airflow.providers.git.bundles.git.GitHook")
@@ -411,7 +418,7 @@ class TestGitDagBundle:
prune_dotgit_folder=False,
)
bundle.initialize()
- assert bundle.get_current_version() == second_commit
+ assert _version_str(bundle.get_current_version()) == second_commit
assert bundle._local_repo_has_version() is True
# Mutate the cloned repo's HEAD to point at first_commit so HEAD !=
version
@@ -440,7 +447,7 @@ class TestGitDagBundle:
prune_dotgit_folder=False,
)
bundle1.initialize()
- assert bundle1.get_current_version() == hexsha
+ assert _version_str(bundle1.get_current_version()) == hexsha
# Re-initialize: should take the skip path but still resolve to hexsha
bundle2 = GitDagBundle(
@@ -457,7 +464,7 @@ class TestGitDagBundle:
bundle2.initialize()
mock_bare_clone.assert_not_called()
mock_clone.assert_not_called()
- assert bundle2.get_current_version() == hexsha
+ assert _version_str(bundle2.get_current_version()) == hexsha
@mock.patch("airflow.providers.git.bundles.git.GitHook")
def test_skip_path_cleans_dirty_working_tree(self, mock_githook, git_repo):
@@ -546,7 +553,7 @@ class TestGitDagBundle:
bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS,
tracking_ref=GIT_DEFAULT_BRANCH)
bundle.initialize()
- assert bundle.get_current_version() == starting_commit.hexsha
+ assert _version_str(bundle.get_current_version()) ==
starting_commit.hexsha
files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
assert {"test_dag.py"} == files_in_repo
@@ -559,7 +566,7 @@ class TestGitDagBundle:
bundle.refresh()
- assert bundle.get_current_version()[:6] in commit
+ assert _version_str(bundle.get_current_version())[:6] in commit
files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
assert {"test_dag.py", "new_test.py"} == files_in_repo
@@ -578,7 +585,7 @@ class TestGitDagBundle:
bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS,
tracking_ref="test123")
bundle.initialize()
- assert bundle.get_current_version() == starting_commit.hexsha
+ assert _version_str(bundle.get_current_version()) ==
starting_commit.hexsha
# Add new file to the repo
file_path = repo_path / "new_test.py"
@@ -592,7 +599,7 @@ class TestGitDagBundle:
bundle.refresh()
- assert bundle.get_current_version() == commit.hexsha
+ assert _version_str(bundle.get_current_version()) == commit.hexsha
files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
assert {"test_dag.py", "new_test.py"} == files_in_repo
@@ -608,7 +615,7 @@ class TestGitDagBundle:
bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS,
tracking_ref="release")
bundle.initialize()
- assert bundle.get_current_version() == initial_commit.hexsha
+ assert _version_str(bundle.get_current_version()) ==
initial_commit.hexsha
repo.git.checkout("--orphan", "new-branch")
repo.git.rm("-rf", ".")
@@ -622,7 +629,7 @@ class TestGitDagBundle:
bundle.refresh()
- assert bundle.get_current_version() == unrelated_commit.hexsha
+ assert _version_str(bundle.get_current_version()) ==
unrelated_commit.hexsha
files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
assert {"new_file.py"} == files_in_repo
@@ -639,7 +646,7 @@ class TestGitDagBundle:
bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS,
tracking_ref="release")
bundle.initialize()
- assert bundle.get_current_version() == initial_commit.hexsha
+ assert _version_str(bundle.get_current_version()) ==
initial_commit.hexsha
# Force-push "release" to an orphan commit
repo.git.checkout("--orphan", "temp-orphan")
@@ -655,7 +662,7 @@ class TestGitDagBundle:
bundle.refresh()
- assert bundle.get_current_version() == unrelated_commit.hexsha
+ assert _version_str(bundle.get_current_version()) ==
unrelated_commit.hexsha
files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
assert {"branch_new.py"} == files_in_repo
@@ -671,7 +678,7 @@ class TestGitDagBundle:
bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS,
tracking_ref="moving")
bundle.initialize()
- assert bundle.get_current_version() == commit_a.hexsha
+ assert _version_str(bundle.get_current_version()) == commit_a.hexsha
# Move tag forward to commit B
file_b = repo_path / "file_b.py"
@@ -681,7 +688,7 @@ class TestGitDagBundle:
repo.create_tag("moving", force=True)
bundle.refresh()
- assert bundle.get_current_version() == commit_b.hexsha
+ assert _version_str(bundle.get_current_version()) == commit_b.hexsha
files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
assert {"test_dag.py", "file_b.py"} == files_in_repo
@@ -689,7 +696,7 @@ class TestGitDagBundle:
repo.create_tag("moving", ref=commit_a, force=True)
bundle.refresh()
- assert bundle.get_current_version() == commit_a.hexsha
+ assert _version_str(bundle.get_current_version()) == commit_a.hexsha
files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
assert {"test_dag.py"} == files_in_repo
@@ -722,7 +729,7 @@ class TestGitDagBundle:
# Repos were reused, not recreated
assert bare_repo_path.exists()
assert working_repo_path.exists()
- assert bundle.get_current_version() == unrelated_commit.hexsha
+ assert _version_str(bundle.get_current_version()) ==
unrelated_commit.hexsha
@mock.patch("airflow.providers.git.bundles.git.GitHook")
def test_repeated_refreshes_after_force_push_stable(self, mock_githook,
git_repo):
@@ -745,13 +752,13 @@ class TestGitDagBundle:
# First refresh
bundle.refresh()
- assert bundle.get_current_version() == new_commit.hexsha
+ assert _version_str(bundle.get_current_version()) == new_commit.hexsha
files_after_first = {f.name for f in bundle.path.iterdir() if
f.is_file()}
assert {"stable_file.py"} == files_after_first
# Second refresh (no upstream changes)
bundle.refresh()
- assert bundle.get_current_version() == new_commit.hexsha
+ assert _version_str(bundle.get_current_version()) == new_commit.hexsha
files_after_second = {f.name for f in bundle.path.iterdir() if
f.is_file()}
assert {"stable_file.py"} == files_after_second
@@ -775,7 +782,7 @@ class TestGitDagBundle:
repo.create_tag("release", force=True)
bundle1.refresh()
- assert bundle1.get_current_version() == new_commit.hexsha
+ assert _version_str(bundle1.get_current_version()) == new_commit.hexsha
# Simulate DAG processor restart: new bundle object, same name
bundle2 = GitDagBundle(name="test", git_conn_id=CONN_HTTPS,
tracking_ref="release")
@@ -783,7 +790,7 @@ class TestGitDagBundle:
bundle2.initialize()
mock_clone.assert_not_called()
- assert bundle2.get_current_version() == new_commit.hexsha
+ assert _version_str(bundle2.get_current_version()) == new_commit.hexsha
files_in_repo = {f.name for f in bundle2.path.iterdir() if f.is_file()}
assert {"reinit_file.py"} == files_in_repo
@@ -1403,14 +1410,14 @@ class TestGitDagBundle:
repo.create_tag("ephemeral")
bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS,
tracking_ref="ephemeral")
bundle.initialize()
- assert bundle.get_current_version() == initial_commit.hexsha
+ assert _version_str(bundle.get_current_version()) ==
initial_commit.hexsha
# Delete the tag from the upstream repo
repo.delete_tag("ephemeral")
# Refresh still succeeds because git fetch refspecs don't prune
deleted tags
bundle.refresh()
- assert bundle.get_current_version() == initial_commit.hexsha
+ assert _version_str(bundle.get_current_version()) ==
initial_commit.hexsha
files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
assert {"test_dag.py"} == files_in_repo
@@ -1653,7 +1660,7 @@ class TestGitDagBundle:
submodules=True,
)
bundle.initialize()
- assert bundle.get_current_version() == initial_commit.hexsha
+ assert _version_str(bundle.get_current_version()) ==
initial_commit.hexsha
# Verify submodule content is checked out
sub_content = (bundle.repo_path / "mysub" / "sub_file.py").read_text()
@@ -1668,7 +1675,7 @@ class TestGitDagBundle:
bundle.refresh()
- assert bundle.get_current_version() == new_commit.hexsha
+ assert _version_str(bundle.get_current_version()) == new_commit.hexsha
files_in_repo = {f.name for f in bundle.repo_path.iterdir() if
f.is_file()}
assert "extra.py" in files_in_repo
@@ -1707,7 +1714,7 @@ class TestGitDagBundle:
# refresh() prefers origin/ambiguous (branch) over the ambiguous tag
bundle.refresh()
- assert bundle.get_current_version() == branch_commit.hexsha
+ assert _version_str(bundle.get_current_version()) ==
branch_commit.hexsha
files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
assert "branch_file.py" in files_in_repo