This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 41df5a980c8 Add BundleVersion dataclass and version_data persistence 
to DagVersion (#66491)
41df5a980c8 is described below

commit 41df5a980c8d92fabc0fb98e5b4c808d90292463
Author: Niko Oliveira <[email protected]>
AuthorDate: Tue May 19 12:24:36 2026 -0700

    Add BundleVersion dataclass and version_data persistence to DagVersion 
(#66491)
    
    * Add BundleVersion dataclass and version_data persistence to DagVersion
    
    Add infrastructure for bundles to return structured version metadata
    alongside their version string. This enables block storage based
    bundles to be versioned (e.g. manifest based S3 Dag Bundle versioning)
    in follow-up PRs without requiring additional core changes.
    
    Changes:
    - Add BundleVersion frozen dataclass (version + optional data dict)
    - Widen BaseDagBundle.get_current_version() return type to accept
      BundleVersion, with backwards-compatible handling of legacy str returns
    - Add nullable version_data (JSON) column to dag_version table via
      Alembic migration 0114
    - Thread version_data through the full persistence chain:
      DagFileProcessorManager -> update_dag_parsing_results_in_db ->
      SerializedDagModel.write_dag -> DagVersion.write_dag
    - Emit DeprecationWarning for versioned bundles returning bare strings
    - Fix dag_command.py to unpack BundleVersion before passing to
      sync_bag_to_db
    
    All changes are additive and backwards compatible. Existing bundles
    continue to work unchanged with version_data=NULL.
    
    * Address review feedback: fix data loss, extract shared helper, update 
GitDagBundle
    
    Review feedback from potiuk on PR #66491:
    
    1. Fix data loss in dag_reserialize: pass version_data through 
sync_bag_to_db
       so reserialization preserves BundleVersion.data on DagVersion rows.
    
    2. Extract _unpack_bundle_version from DagFileProcessorManager to 
module-level
       unpack_bundle_version() in bundles/base.py. Both CLI and manager now use
       the same helper (with deprecation warning for legacy string returns).
    
    3. Migrate GitDagBundle to return BundleVersion on Airflow 3.3+ using
       AIRFLOW_V_3_3_PLUS version guard for backwards compatibility.
    
    4. Tighten BundleVersion.data type to dict[str, Any] | None with mutation 
note.
    
    5. Fix sa.JSON -> sa.JSON() in dag_version.py for style consistency.
    
    6. Add comment explaining is None vs falsy check in manager.py.
    
    * Fix: add '# use next version' comment for common-compat in git provider
    
    The CI 'Build info' check requires this comment when common.compat
    changes alongside another provider in the same PR.
    
    * Fix: RST docstring formatting in get_current_version to fix sphinx doc 
build
    
    Move the trailing paragraph before the bullet list and add a blank
    line before the list items so that sphinx does not produce 'Unexpected
    indentation' errors in auto-generated API docs for the git provider.
    
    * Fix: git bundle tests use _version_str helper for compat with Airflow < 
3.3
    
    The compat tests (3.0.6, 3.1.8) fail because get_current_version()
    returns a plain str when AIRFLOW_V_3_3_PLUS is False, but the tests
    unconditionally call .version on the result. Add a _version_str()
    helper that extracts the version string from either BundleVersion or
    str.
---
 airflow-core/docs/migrations-ref.rst               |  4 +-
 .../src/airflow/cli/commands/dag_command.py        |  6 +-
 .../src/airflow/dag_processing/bundles/base.py     | 57 +++++++++++++++-
 .../src/airflow/dag_processing/collection.py       |  4 ++
 airflow-core/src/airflow/dag_processing/dagbag.py  |  4 +-
 airflow-core/src/airflow/dag_processing/manager.py | 20 +++++-
 .../0115_3_3_0_add_version_data_to_dag_version.py  | 53 +++++++++++++++
 airflow-core/src/airflow/models/dag_version.py     |  6 ++
 airflow-core/src/airflow/models/serialized_dag.py  |  4 ++
 airflow-core/src/airflow/utils/db.py               |  2 +-
 .../tests/unit/dag_processing/bundles/test_base.py | 29 ++++++++
 .../tests/unit/dag_processing/test_collection.py   |  1 +
 .../tests/unit/dag_processing/test_manager.py      | 66 +++++++++++++++++++
 .../tests/unit/dag_processing/test_processor.py    |  1 +
 airflow-core/tests/unit/models/test_dag_version.py | 38 +++++++++++
 .../providers/common/compat/version_compat.py      |  2 +
 providers/git/pyproject.toml                       |  2 +-
 .../git/src/airflow/providers/git/bundles/git.py   | 16 +++--
 providers/git/tests/unit/git/bundles/test_git.py   | 77 ++++++++++++----------
 19 files changed, 342 insertions(+), 50 deletions(-)

diff --git a/airflow-core/docs/migrations-ref.rst 
b/airflow-core/docs/migrations-ref.rst
index a9502d28fe9..9d80414b64b 100644
--- a/airflow-core/docs/migrations-ref.rst
+++ b/airflow-core/docs/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | Revision ID             | Revises ID       | Airflow Version   | Description 
                                                 |
 
+=========================+==================+===================+==============================================================+
-| ``a7f3b2c1d4e5`` (head) | ``b8f3e4a1d2c9`` | ``3.3.0``         | Add 
allow_producer_teams column to                           |
+| ``a1b2c3d4e5f6`` (head) | ``a7f3b2c1d4e5`` | ``3.3.0``         | Add 
version_data to dag_version.                             |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``a7f3b2c1d4e5``        | ``b8f3e4a1d2c9`` | ``3.3.0``         | Add 
allow_producer_teams column to                           |
 |                         |                  |                   | 
dag_schedule_asset_reference table.                          |
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | ``b8f3e4a1d2c9``        | ``fde9ed84d07b`` | ``3.3.0``         | Add 
retry_delay_override and retry_reason to task_instance.  |
diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py 
b/airflow-core/src/airflow/cli/commands/dag_command.py
index d0162a49fc5..13a4ad90596 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -37,6 +37,7 @@ from airflow.api.client import get_current_api_client
 from airflow.api_fastapi.core_api.datamodels.dags import DAGResponse
 from airflow.cli.simple_table import AirflowConsole
 from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
+from airflow.dag_processing.bundles.base import unpack_bundle_version
 from airflow.dag_processing.bundles.manager import DagBundlesManager
 from airflow.dag_processing.dagbag import BundleDagBag, DagBag, sync_bag_to_db
 from airflow.exceptions import AirflowConfigException, AirflowException
@@ -740,4 +741,7 @@ def dag_reserialize(args, session: Session = NEW_SESSION) 
-> None:
             continue
         bundle.initialize()
         dag_bag = BundleDagBag(bundle.path, bundle_path=bundle.path, 
bundle_name=bundle.name)
-        sync_bag_to_db(dag_bag, bundle.name, 
bundle_version=bundle.get_current_version(), session=session)
+        version, version_data = 
unpack_bundle_version(bundle.get_current_version(), bundle)
+        sync_bag_to_db(
+            dag_bag, bundle.name, bundle_version=version, 
version_data=version_data, session=session
+        )
diff --git a/airflow-core/src/airflow/dag_processing/bundles/base.py 
b/airflow-core/src/airflow/dag_processing/bundles/base.py
index 2f4654382cb..b6b55f9251c 100644
--- a/airflow-core/src/airflow/dag_processing/bundles/base.py
+++ b/airflow-core/src/airflow/dag_processing/bundles/base.py
@@ -30,7 +30,7 @@ from datetime import timedelta
 from fcntl import LOCK_SH, LOCK_UN, flock
 from operator import attrgetter
 from pathlib import Path
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
 
 import pendulum
 from pendulum.parsing import ParserError
@@ -229,6 +229,50 @@ class BundleUsageTrackingManager:
             
self._remove_stale_bundle_versions_for_bundle(bundle_name=bundle.name)
 
 
+@dataclass(frozen=True)
+class BundleVersion:
+    """
+    Version information returned by a DAG bundle.
+
+    Bundles return this from ``get_current_version()`` to provide both a 
version
+    identifier and optional structured data (e.g., a manifest) atomically.
+
+    :param version: A string identifier for this bundle version (e.g., git 
SHA, content hash).
+    :param data: Optional structured data associated with this version (e.g., 
S3 manifest).
+        Mutating ``data`` after construction is undefined behavior.
+    """
+
+    version: str
+    data: dict[str, Any] | None = None
+
+
+def unpack_bundle_version(
+    result: str | BundleVersion | None, bundle: BaseDagBundle
+) -> tuple[str | None, dict[str, Any] | None]:
+    """
+    Unpack the return value of get_current_version().
+
+    Handles both the new BundleVersion dataclass and legacy str | None returns.
+    Emits a deprecation warning for bare string returns from versioned bundles.
+
+    :return: Tuple of (version_string, version_data)
+    """
+    if result is None:
+        return None, None
+    if isinstance(result, BundleVersion):
+        return result.version, result.data
+    # Legacy path: bare string return
+    if bundle.supports_versioning:
+        warnings.warn(
+            f"Bundle '{bundle.name}' returned a plain string from 
get_current_version(). "
+            f"Return a BundleVersion instance instead. "
+            f"Plain string returns are deprecated and will be removed in a 
future version.",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+    return result, None
+
+
 class BaseDagBundle(ABC):
     """
     Base class for DAG bundles.
@@ -313,11 +357,18 @@ class BaseDagBundle(ABC):
         """
 
     @abstractmethod
-    def get_current_version(self) -> str | None:
+    def get_current_version(self) -> str | BundleVersion | None:
         """
-        Retrieve a string that represents the version of the DAG bundle.
+        Retrieve the version of the DAG bundle.
 
         Airflow can use this value to retrieve this same bundle version later.
+
+        May return:
+
+        - A ``BundleVersion`` instance (preferred) containing both a version 
string and
+          optional structured data (e.g., a manifest).
+        - A plain string (deprecated; will emit a warning in a future release).
+        - None if the bundle does not support versioning.
         """
 
     @abstractmethod
diff --git a/airflow-core/src/airflow/dag_processing/collection.py 
b/airflow-core/src/airflow/dag_processing/collection.py
index 6f8bc752bbc..361f09871c8 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -262,6 +262,7 @@ def _serialize_dag_capturing_errors(
     bundle_name,
     session: Session,
     bundle_version: str | None,
+    version_data: dict | None = None,
     _prefetched: DagWriteMetadata | None = None,
 ):
     """
@@ -282,6 +283,7 @@ def _serialize_dag_capturing_errors(
             dag,
             bundle_name=bundle_name,
             bundle_version=bundle_version,
+            version_data=version_data,
             min_update_interval=MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
             session=session,
             _prefetched=_prefetched,
@@ -436,6 +438,7 @@ def update_dag_parsing_results_in_db(
     warnings: set[DagWarning],
     session: Session,
     *,
+    version_data: dict | None = None,
     warning_types: tuple[DagWarningType, ...] = (
         DagWarningType.NONEXISTENT_POOL,
         DagWarningType.RUNTIME_VARYING_VALUE,
@@ -493,6 +496,7 @@ def update_dag_parsing_results_in_db(
                             dag=dag,
                             bundle_name=bundle_name,
                             bundle_version=bundle_version,
+                            version_data=version_data,
                             session=session,
                             _prefetched=prefetched_metadata.get(dag.dag_id),
                         )
diff --git a/airflow-core/src/airflow/dag_processing/dagbag.py 
b/airflow-core/src/airflow/dag_processing/dagbag.py
index 5062a47bc7e..bd0a6dacb86 100644
--- a/airflow-core/src/airflow/dag_processing/dagbag.py
+++ b/airflow-core/src/airflow/dag_processing/dagbag.py
@@ -25,7 +25,7 @@ import warnings
 from collections.abc import Generator
 from datetime import datetime, timedelta
 from pathlib import Path
-from typing import TYPE_CHECKING, NamedTuple
+from typing import TYPE_CHECKING, Any, NamedTuple
 
 from tabulate import tabulate
 
@@ -574,6 +574,7 @@ def sync_bag_to_db(
     bundle_name: str,
     bundle_version: str | None,
     *,
+    version_data: dict[str, Any] | None = None,
     session: Session = NEW_SESSION,
 ) -> None:
     """Save attributes about list of DAG to the DB."""
@@ -598,5 +599,6 @@ def sync_bag_to_db(
         None,  # file parsing duration is not well defined when parsing 
multiple files / multiple DAGs.
         dagbag.dag_warnings,
         session=session,
+        version_data=version_data,
         files_parsed=files_parsed,
     )
diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 72e51e48743..ed5c61c604c 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -50,7 +50,10 @@ from airflow._shared.observability.metrics.stats import 
normalize_name_for_stats
 from airflow._shared.timezones import timezone
 from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
 from airflow.configuration import conf
-from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
+from airflow.dag_processing.bundles.base import (
+    BundleUsageTrackingManager,
+    unpack_bundle_version,
+)
 from airflow.dag_processing.bundles.manager import DagBundlesManager
 from airflow.dag_processing.collection import update_dag_parsing_results_in_db
 from airflow.dag_processing.processor import DagFileParsingResult, 
DagFileProcessorProcess
@@ -241,6 +244,7 @@ class DagFileProcessorManager(LoggingMixin):
 
     _dag_bundles: list[BaseDagBundle] = attrs.field(factory=list, init=False)
     _bundle_versions: dict[str, str | None] = attrs.field(factory=dict, 
init=False)
+    _bundle_version_data: dict[str, dict | None] = attrs.field(factory=dict, 
init=False)
 
     _processors: dict[DagFileInfo, DagFileProcessorProcess] = 
attrs.field(factory=dict, init=False)
 
@@ -770,7 +774,10 @@ class DagFileProcessorManager(LoggingMixin):
             if bundle.supports_versioning:
                 # we will also check the version of the bundle to see if 
another DAG processor has seen
                 # a new version
-                pre_refresh_version = self._bundle_versions.get(bundle.name) 
or bundle.get_current_version()
+                pre_refresh_version = self._bundle_versions.get(bundle.name)
+                # Use `is None` (not falsy) so an empty-string version is 
treated as a valid cached value.
+                if pre_refresh_version is None:
+                    pre_refresh_version, _ = 
unpack_bundle_version(bundle.get_current_version(), bundle)
                 current_version_matches_db = pre_refresh_version == 
bundle_state.version
             else:
                 # With no versioning, it always "matches"
@@ -801,7 +808,9 @@ class DagFileProcessorManager(LoggingMixin):
                 # We can short-circuit the rest of this if (1) bundle was seen 
before by
                 # this dag processor and (2) the version of the bundle did not 
change
                 # after refreshing it
-                version_after_refresh = bundle.get_current_version()
+                version_after_refresh, version_data_after_refresh = 
unpack_bundle_version(
+                    bundle.get_current_version(), bundle
+                )
                 if previously_seen and pre_refresh_version == 
version_after_refresh:
                     self.log.debug(
                         "Bundle %s version not changed after refresh: %s",
@@ -817,6 +826,7 @@ class DagFileProcessorManager(LoggingMixin):
                 self.log.info("Version changed for %s, new version: %s", 
bundle.name, version_after_refresh)
             else:
                 version_after_refresh = None
+                version_data_after_refresh = None
 
             # Persistence failure must not skip file scanning (bundle is 
already refreshed locally).
             # _bundle_versions is only advanced on success to stay consistent 
with the DB.
@@ -826,6 +836,7 @@ class DagFileProcessorManager(LoggingMixin):
                 self.log.exception("Error persisting state for bundle %s", 
bundle.name)
             else:
                 self._bundle_versions[bundle.name] = version_after_refresh
+                self._bundle_version_data[bundle.name] = 
version_data_after_refresh
 
             found_files = {
                 DagFileInfo(rel_path=p, bundle_name=bundle.name, 
bundle_path=bundle.path)
@@ -1126,6 +1137,7 @@ class DagFileProcessorManager(LoggingMixin):
                 self.persist_parsing_result(
                     bundle_name=file.bundle_name,
                     bundle_version=self._bundle_versions[file.bundle_name],
+                    
version_data=self._bundle_version_data.get(file.bundle_name),
                     parsing_result=proc.parsing_result,
                     run_duration=run_duration,
                     relative_fileloc=str(file.rel_path),
@@ -1157,6 +1169,7 @@ class DagFileProcessorManager(LoggingMixin):
         *,
         bundle_name: str,
         bundle_version: str | None,
+        version_data: dict | None,
         parsing_result: DagFileParsingResult,
         run_duration: float,
         relative_fileloc: str | None,
@@ -1183,6 +1196,7 @@ class DagFileProcessorManager(LoggingMixin):
         update_dag_parsing_results_in_db(
             bundle_name=bundle_name,
             bundle_version=bundle_version,
+            version_data=version_data,
             dags=parsing_result.serialized_dags,
             import_errors=import_errors,
             parse_duration=run_duration,
diff --git 
a/airflow-core/src/airflow/migrations/versions/0115_3_3_0_add_version_data_to_dag_version.py
 
b/airflow-core/src/airflow/migrations/versions/0115_3_3_0_add_version_data_to_dag_version.py
new file mode 100644
index 00000000000..deca97ceacb
--- /dev/null
+++ 
b/airflow-core/src/airflow/migrations/versions/0115_3_3_0_add_version_data_to_dag_version.py
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add version_data to dag_version.
+
+Adds a nullable JSON column to the dag_version table for storing
+structured version metadata (e.g., S3 object version manifests).
+Bundles that do not use version_data leave this column NULL.
+
+On PostgreSQL and MySQL 8+, adding a nullable column without a
+default is a metadata-only operation (no table rewrite).
+
+Revision ID: a1b2c3d4e5f6
+Revises: a7f3b2c1d4e5
+Create Date: 2026-05-05 23:30:00.000000
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+revision = "a1b2c3d4e5f6"
+down_revision = "a7f3b2c1d4e5"
+branch_labels = None
+depends_on = None
+airflow_version = "3.3.0"
+
+
+def upgrade():
+    """Add version_data column to dag_version table."""
+    op.add_column("dag_version", sa.Column("version_data", sa.JSON(), 
nullable=True))
+
+
+def downgrade():
+    """Remove version_data column from dag_version table."""
+    op.drop_column("dag_version", "version_data")
diff --git a/airflow-core/src/airflow/models/dag_version.py 
b/airflow-core/src/airflow/models/dag_version.py
index cfdb301d041..45ea40c091b 100644
--- a/airflow-core/src/airflow/models/dag_version.py
+++ b/airflow-core/src/airflow/models/dag_version.py
@@ -53,6 +53,7 @@ class DagVersion(Base):
     dag_model = relationship("DagModel", back_populates="dag_versions")
     bundle_name: Mapped[str | None] = mapped_column(StringID(), nullable=True)
     bundle_version: Mapped[str | None] = mapped_column(StringID(), 
nullable=True)
+    version_data: Mapped[dict | None] = mapped_column(sa.JSON(), nullable=True)
     bundle = relationship(
         "DagBundleModel",
         primaryjoin="foreign(DagVersion.bundle_name) == DagBundleModel.name",
@@ -111,6 +112,7 @@ class DagVersion(Base):
         dag_id: str,
         bundle_name: str,
         bundle_version: str | None = None,
+        version_data: dict | None = None,
         version_number: int = 1,
         session: Session = NEW_SESSION,
     ) -> DagVersion:
@@ -120,6 +122,9 @@ class DagVersion(Base):
         Checks if a version of the DAG exists and increments the version 
number if it does.
 
         :param dag_id: The DAG ID.
+        :param bundle_name: The bundle name.
+        :param bundle_version: The bundle version string.
+        :param version_data: Optional structured data associated with this 
version (e.g., S3 manifest).
         :param version_number: The version number.
         :param session: The database session.
         :return: The DagVersion object.
@@ -135,6 +140,7 @@ class DagVersion(Base):
             version_number=version_number,
             bundle_name=bundle_name,
             bundle_version=bundle_version,
+            version_data=version_data,
         )
         log.debug("Writing DagVersion %s to the DB", dag_version)
         session.add(dag_version)
diff --git a/airflow-core/src/airflow/models/serialized_dag.py 
b/airflow-core/src/airflow/models/serialized_dag.py
index be55c3e0908..d2d9d47d61c 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -597,6 +597,7 @@ class SerializedDagModel(Base):
         dag: LazyDeserializedDAG,
         bundle_name: str,
         bundle_version: str | None = None,
+        version_data: dict | None = None,
         min_update_interval: int | None = None,
         session: Session = NEW_SESSION,
         _prefetched: DagWriteMetadata | None = None,
@@ -610,6 +611,7 @@ class SerializedDagModel(Base):
         :param dag: a DAG to be written into database
         :param bundle_name: bundle name of the DAG
         :param bundle_version: bundle version of the DAG
+        :param version_data: optional structured data associated with this 
version
         :param min_update_interval: minimal interval in seconds to update 
serialized DAG
         :param session: ORM Session
         :param _prefetched: Pre-fetched metadata to skip per-DAG queries; used 
by bulk callers
@@ -740,6 +742,7 @@ class SerializedDagModel(Base):
             # Update the latest dag version
             dag_version.bundle_name = bundle_name
             dag_version.bundle_version = bundle_version
+            dag_version.version_data = version_data
             session.merge(dag_version)
             # Update the latest DagCode
             DagCode.update_source_code(dag_id=dag.dag_id, fileloc=dag.fileloc, 
session=session)
@@ -749,6 +752,7 @@ class SerializedDagModel(Base):
             dag_id=dag.dag_id,
             bundle_name=bundle_name,
             bundle_version=bundle_version,
+            version_data=version_data,
             session=session,
         )
         log.debug("Writing Serialized DAG: %s to the DB", dag.dag_id)
diff --git a/airflow-core/src/airflow/utils/db.py 
b/airflow-core/src/airflow/utils/db.py
index 4caa9901bfb..9b747d7fc3f 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -116,7 +116,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
     "3.1.0": "cc92b33c6709",
     "3.1.8": "509b94a1042d",
     "3.2.0": "1d6611b6ab7c",
-    "3.3.0": "a7f3b2c1d4e5",
+    "3.3.0": "a1b2c3d4e5f6",
 }
 
 # Prefix used to identify tables holding data moved during migration.
diff --git a/airflow-core/tests/unit/dag_processing/bundles/test_base.py 
b/airflow-core/tests/unit/dag_processing/bundles/test_base.py
index bd6360de944..6fc7ba39a0a 100644
--- a/airflow-core/tests/unit/dag_processing/bundles/test_base.py
+++ b/airflow-core/tests/unit/dag_processing/bundles/test_base.py
@@ -33,6 +33,7 @@ from airflow._shared.timezones import timezone as tz
 from airflow.dag_processing.bundles.base import (
     BaseDagBundle,
     BundleUsageTrackingManager,
+    BundleVersion,
     BundleVersionLock,
     get_bundle_storage_root_path,
 )
@@ -294,3 +295,31 @@ class TestBundleUsageTrackingManager:
                 assert len(lock_files) == expected_remaining
                 bundle_folders = list(b.versions_dir.iterdir())
                 assert len(bundle_folders) == expected_remaining
+
+
+class TestBundleVersion:
+    def test_bundle_version_with_version_only(self):
+        bv = BundleVersion(version="abc123")
+        assert bv.version == "abc123"
+        assert bv.data is None
+
+    def test_bundle_version_with_data(self):
+        data = {"schema_version": 1, "files": {"dag.py": "v1"}}
+        bv = BundleVersion(version="sha256hex", data=data)
+        assert bv.version == "sha256hex"
+        assert bv.data == data
+
+    def test_bundle_version_is_frozen(self):
+        bv = BundleVersion(version="abc")
+        with pytest.raises(AttributeError):
+            bv.version = "xyz"
+
+    def test_bundle_version_equality(self):
+        bv1 = BundleVersion(version="abc", data={"key": "val"})
+        bv2 = BundleVersion(version="abc", data={"key": "val"})
+        assert bv1 == bv2
+
+    def test_bundle_version_inequality(self):
+        bv1 = BundleVersion(version="abc", data={"key": "val"})
+        bv2 = BundleVersion(version="abc", data={"key": "other"})
+        assert bv1 != bv2
diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py 
b/airflow-core/tests/unit/dag_processing/test_collection.py
index 40224253961..b0a65f3cd9c 100644
--- a/airflow-core/tests/unit/dag_processing/test_collection.py
+++ b/airflow-core/tests/unit/dag_processing/test_collection.py
@@ -570,6 +570,7 @@ class TestUpdateDagParsingResults:
                     mock_dag,
                     bundle_name="testing",
                     bundle_version=None,
+                    version_data=None,
                     min_update_interval=mock.ANY,
                     session=mock_session,
                     _prefetched=mock.ANY,
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 40b819f146e..33aaa828888 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -1184,6 +1184,7 @@ class TestDagFileProcessorManager:
         mock_persist.assert_called_once_with(
             bundle_name="testing",
             bundle_version="v1",
+            version_data=None,
             parsing_result=processor.parsing_result,
             run_duration=mock.ANY,
             relative_fileloc="abc.txt",
@@ -2724,3 +2725,68 @@ class TestDagFileProcessorManager:
         # _bundle_versions must NOT advance — DB still holds the old version, 
so the next
         # iteration will see a version mismatch and re-refresh rather than 
skip incorrectly
         assert "mock_bundle" not in manager._bundle_versions
+
+    def test_unpack_bundle_version_with_bundle_version_dataclass(self):
+        from airflow.dag_processing.bundles.base import BundleVersion, 
unpack_bundle_version
+
+        bundle = mock.MagicMock()
+        bundle.supports_versioning = True
+        result = BundleVersion(version="sha256abc", data={"schema_version": 1, 
"files": {}})
+        version, data = unpack_bundle_version(result, bundle)
+        assert version == "sha256abc"
+        assert data == {"schema_version": 1, "files": {}}
+
+    def test_unpack_bundle_version_with_none(self):
+        from airflow.dag_processing.bundles.base import unpack_bundle_version
+
+        bundle = mock.MagicMock()
+        bundle.supports_versioning = True
+        version, data = unpack_bundle_version(None, bundle)
+        assert version is None
+        assert data is None
+
+    def test_unpack_bundle_version_with_legacy_string(self):
+        from airflow.dag_processing.bundles.base import unpack_bundle_version
+
+        bundle = mock.MagicMock()
+        bundle.name = "test_bundle"
+        bundle.supports_versioning = True
+        with pytest.warns(DeprecationWarning, match="plain string"):
+            version, data = unpack_bundle_version("v1.0", bundle)
+        assert version == "v1.0"
+        assert data is None
+
+    def test_unpack_bundle_version_with_string_non_versioned_bundle(self):
+        """Non-versioned bundles returning a string should not emit a 
warning."""
+        from airflow.dag_processing.bundles.base import unpack_bundle_version
+
+        bundle = mock.MagicMock()
+        bundle.name = "local_bundle"
+        bundle.supports_versioning = False
+        import warnings as w
+
+        with w.catch_warnings():
+            w.simplefilter("error")
+            version, data = unpack_bundle_version("v1.0", bundle)
+        assert version == "v1.0"
+        assert data is None
+
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def test_bundle_version_data_stored_after_refresh(self, session):
+        """Test that version_data from BundleVersion is stored in 
_bundle_version_data."""
+        from airflow.dag_processing.bundles.base import BundleVersion
+
+        manager = DagFileProcessorManager(max_runs=1)
+        bundle = self._make_refresh_bundle(supports_versioning=True, 
current_version="v1")
+        # Override get_current_version to return BundleVersion with data
+        test_data = {"schema_version": 1, "files": {"dag.py": "ver123"}}
+        bundle.get_current_version = mock.MagicMock(
+            return_value=BundleVersion(version="newhash", data=test_data)
+        )
+        # Pre-populate so previously_seen=True and version differs
+        manager._bundle_versions["mock_bundle"] = "oldhash"
+
+        self._refresh_with_mocked_state(manager, bundle, 
BundleState(last_refreshed=None, version="oldhash"))
+
+        assert manager._bundle_versions["mock_bundle"] == "newhash"
+        assert manager._bundle_version_data["mock_bundle"] == test_data
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py 
b/airflow-core/tests/unit/dag_processing/test_processor.py
index d2ac085b0b7..7fe7204e68e 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -725,6 +725,7 @@ def test_persist_parsing_result_calls_update_db():
             manager,
             bundle_name="test-bundle",
             bundle_version="v1",
+            version_data=None,
             parsing_result=parsing_result,
             run_duration=1.5,
             relative_fileloc="dags/test.py",
diff --git a/airflow-core/tests/unit/models/test_dag_version.py 
b/airflow-core/tests/unit/models/test_dag_version.py
index 6d8e2404954..e7899422093 100644
--- a/airflow-core/tests/unit/models/test_dag_version.py
+++ b/airflow-core/tests/unit/models/test_dag_version.py
@@ -84,3 +84,41 @@ class TestDagVersion:
 
         latest_version = DagVersion.get_latest_version(dag.dag_id)
         assert latest_version.version == f"{dag.dag_id}-1"
+
+    @pytest.mark.db_test
+    def test_write_dag_with_version_data(self, dag_maker, session):
+        """Test that version_data is stored and retrievable."""
+        with dag_maker("test_version_data"):
+            pass
+
+        manifest = {"schema_version": 1, "files": {"dags/my_dag.py": 
"S3VersionId123"}}
+        DagVersion.write_dag(
+            dag_id="test_version_data",
+            bundle_name="testing",
+            bundle_version="sha256abc",
+            version_data=manifest,
+            session=session,
+        )
+        session.flush()
+
+        retrieved = DagVersion.get_latest_version("test_version_data", 
session=session)
+        assert retrieved.version_data == manifest
+        assert retrieved.bundle_version == "sha256abc"
+
+    @pytest.mark.db_test
+    def test_write_dag_without_version_data(self, dag_maker, session):
+        """Test that version_data defaults to None for bundles that don't use 
it."""
+        with dag_maker("test_no_version_data"):
+            pass
+
+        DagVersion.write_dag(
+            dag_id="test_no_version_data",
+            bundle_name="testing",
+            bundle_version="abc123",
+            session=session,
+        )
+        session.flush()
+
+        retrieved = DagVersion.get_latest_version("test_no_version_data", 
session=session)
+        assert retrieved.version_data is None
+        assert retrieved.bundle_version == "abc123"
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/version_compat.py 
b/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
index e3fd1e55f14..c42f56c0a39 100644
--- 
a/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
+++ 
b/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
@@ -35,6 +35,7 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
 AIRFLOW_V_3_0_PLUS: bool = get_base_airflow_version_tuple() >= (3, 0, 0)
 AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)
 AIRFLOW_V_3_2_PLUS: bool = get_base_airflow_version_tuple() >= (3, 2, 0)
+AIRFLOW_V_3_3_PLUS: bool = get_base_airflow_version_tuple() >= (3, 3, 0)
 
 # BaseOperator removed from version_compat to avoid circular imports
 # Import it directly in files that need it instead
@@ -43,4 +44,5 @@ __all__ = [
     "AIRFLOW_V_3_0_PLUS",
     "AIRFLOW_V_3_1_PLUS",
     "AIRFLOW_V_3_2_PLUS",
+    "AIRFLOW_V_3_3_PLUS",
 ]
diff --git a/providers/git/pyproject.toml b/providers/git/pyproject.toml
index e49f48d1e73..8d5f7de4802 100644
--- a/providers/git/pyproject.toml
+++ b/providers/git/pyproject.toml
@@ -60,7 +60,7 @@ requires-python = ">=3.10"
 # After you modify the dependencies, and rebuild your Breeze CI image with 
``breeze ci-image build``
 dependencies = [
     "apache-airflow>=3.0.0",
-    "apache-airflow-providers-common-compat>=1.12.0",
+    "apache-airflow-providers-common-compat>=1.12.0",  # use next version
     "GitPython>=3.1.44",
 ]
 
diff --git a/providers/git/src/airflow/providers/git/bundles/git.py 
b/providers/git/src/airflow/providers/git/bundles/git.py
index 2f03ed48cf1..275aa6b3f63 100644
--- a/providers/git/src/airflow/providers/git/bundles/git.py
+++ b/providers/git/src/airflow/providers/git/bundles/git.py
@@ -29,8 +29,12 @@ from tenacity import retry, retry_if_exception_type, 
stop_after_attempt
 
 from airflow.dag_processing.bundles.base import BaseDagBundle
 from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_3_PLUS
 from airflow.providers.git.hooks.git import GitHook
 
+if AIRFLOW_V_3_3_PLUS:
+    from airflow.dag_processing.bundles.base import BundleVersion
+
 log = structlog.get_logger(__name__)
 
 
@@ -329,11 +333,15 @@ class GitDagBundle(BaseDagBundle):
             f")>"
         )
 
-    def get_current_version(self) -> str:
+    def get_current_version(self) -> str | BundleVersion:
         if self.version is not None and getattr(self, "repo", None) is None:
-            return self.version
-        with self.repo as repo:
-            return repo.head.commit.hexsha
+            hexsha = self.version
+        else:
+            with self.repo as repo:
+                hexsha = repo.head.commit.hexsha
+        if AIRFLOW_V_3_3_PLUS:
+            return BundleVersion(version=hexsha)
+        return hexsha
 
     @property
     def path(self) -> Path:
diff --git a/providers/git/tests/unit/git/bundles/test_git.py 
b/providers/git/tests/unit/git/bundles/test_git.py
index 43808ffdf09..1df439f58cf 100644
--- a/providers/git/tests/unit/git/bundles/test_git.py
+++ b/providers/git/tests/unit/git/bundles/test_git.py
@@ -36,7 +36,14 @@ from airflow.providers.git.bundles.git import GitDagBundle
 from airflow.providers.git.hooks.git import GitHook
 
 from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_PLUS, 
AIRFLOW_V_3_3_PLUS
+
+
+def _version_str(version_result):
+    """Extract version string from either a BundleVersion (3.3+) or plain str 
(older)."""
+    if AIRFLOW_V_3_3_PLUS:
+        return version_result.version
+    return version_result
 
 
 @pytest.fixture(autouse=True)
@@ -145,7 +152,7 @@ class TestGitDagBundle:
 
         bundle.initialize()
 
-        assert bundle.get_current_version() == repo.head.commit.hexsha
+        assert _version_str(bundle.get_current_version()) == 
repo.head.commit.hexsha
 
         assert_repo_is_closed(bundle)
 
@@ -171,7 +178,7 @@ class TestGitDagBundle:
         )
         bundle.initialize()
 
-        assert bundle.get_current_version() == starting_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == 
starting_commit.hexsha
 
         files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
         assert {"test_dag.py"} == files_in_repo
@@ -202,7 +209,7 @@ class TestGitDagBundle:
             prune_dotgit_folder=False,
         )
         bundle.initialize()
-        assert bundle.get_current_version() == starting_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == 
starting_commit.hexsha
 
         files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
         assert {"test_dag.py"} == files_in_repo
@@ -222,7 +229,7 @@ class TestGitDagBundle:
         bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, 
tracking_ref=GIT_DEFAULT_BRANCH)
         bundle.initialize()
 
-        assert bundle.get_current_version() != starting_commit.hexsha
+        assert _version_str(bundle.get_current_version()) != 
starting_commit.hexsha
 
         files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
         assert {"test_dag.py", "new_test.py"} == files_in_repo
@@ -266,7 +273,7 @@ class TestGitDagBundle:
         bundle.initialize()
 
         assert (bundle.repo_path / ".git").exists()
-        assert bundle.get_current_version() == starting_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == 
starting_commit.hexsha
 
         assert_repo_is_closed(bundle)
 
@@ -289,7 +296,7 @@ class TestGitDagBundle:
         )
         bundle1.initialize()
         assert not (bundle1.repo_path / ".git").exists()
-        assert bundle1.get_current_version() == version
+        assert _version_str(bundle1.get_current_version()) == version
         version_path = bundle1.repo_path
 
         # Second init: same name and version; should detect pruned worktree 
and skip clone
@@ -305,7 +312,7 @@ class TestGitDagBundle:
             mock_clone.assert_not_called()
 
         assert bundle2.repo_path == version_path
-        assert bundle2.get_current_version() == version
+        assert _version_str(bundle2.get_current_version()) == version
         files_in_repo = {f.name for f in bundle2.path.iterdir() if f.is_file()}
         assert {"test_dag.py"} == files_in_repo
 
@@ -327,7 +334,7 @@ class TestGitDagBundle:
         )
         bundle1.initialize()
         assert (bundle1.repo_path / ".git").exists()
-        assert bundle1.get_current_version() == version
+        assert _version_str(bundle1.get_current_version()) == version
 
         # Should detect local repo has correct version and skip clone
         with (
@@ -371,7 +378,7 @@ class TestGitDagBundle:
             prune_dotgit_folder=False,
         )
         bundle1.initialize()
-        assert bundle1.get_current_version() == second_commit
+        assert _version_str(bundle1.get_current_version()) == second_commit
 
         # Second init with first_commit: different version means different 
repo_path
         bundle2 = GitDagBundle(
@@ -382,7 +389,7 @@ class TestGitDagBundle:
             prune_dotgit_folder=False,
         )
         bundle2.initialize()
-        assert bundle2.get_current_version() == first_commit
+        assert _version_str(bundle2.get_current_version()) == first_commit
         assert bundle1.repo_path != bundle2.repo_path
 
     @mock.patch("airflow.providers.git.bundles.git.GitHook")
@@ -411,7 +418,7 @@ class TestGitDagBundle:
             prune_dotgit_folder=False,
         )
         bundle.initialize()
-        assert bundle.get_current_version() == second_commit
+        assert _version_str(bundle.get_current_version()) == second_commit
         assert bundle._local_repo_has_version() is True
 
         # Mutate the cloned repo's HEAD to point at first_commit so HEAD != 
version
@@ -440,7 +447,7 @@ class TestGitDagBundle:
             prune_dotgit_folder=False,
         )
         bundle1.initialize()
-        assert bundle1.get_current_version() == hexsha
+        assert _version_str(bundle1.get_current_version()) == hexsha
 
         # Re-initialize: should take the skip path but still resolve to hexsha
         bundle2 = GitDagBundle(
@@ -457,7 +464,7 @@ class TestGitDagBundle:
             bundle2.initialize()
             mock_bare_clone.assert_not_called()
             mock_clone.assert_not_called()
-        assert bundle2.get_current_version() == hexsha
+        assert _version_str(bundle2.get_current_version()) == hexsha
 
     @mock.patch("airflow.providers.git.bundles.git.GitHook")
     def test_skip_path_cleans_dirty_working_tree(self, mock_githook, git_repo):
@@ -546,7 +553,7 @@ class TestGitDagBundle:
         bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, 
tracking_ref=GIT_DEFAULT_BRANCH)
         bundle.initialize()
 
-        assert bundle.get_current_version() == starting_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == 
starting_commit.hexsha
 
         files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
         assert {"test_dag.py"} == files_in_repo
@@ -559,7 +566,7 @@ class TestGitDagBundle:
 
         bundle.refresh()
 
-        assert bundle.get_current_version()[:6] in commit
+        assert _version_str(bundle.get_current_version())[:6] in commit
 
         files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
         assert {"test_dag.py", "new_test.py"} == files_in_repo
@@ -578,7 +585,7 @@ class TestGitDagBundle:
 
         bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, 
tracking_ref="test123")
         bundle.initialize()
-        assert bundle.get_current_version() == starting_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == 
starting_commit.hexsha
 
         # Add new file to the repo
         file_path = repo_path / "new_test.py"
@@ -592,7 +599,7 @@ class TestGitDagBundle:
 
         bundle.refresh()
 
-        assert bundle.get_current_version() == commit.hexsha
+        assert _version_str(bundle.get_current_version()) == commit.hexsha
 
         files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
         assert {"test_dag.py", "new_test.py"} == files_in_repo
@@ -608,7 +615,7 @@ class TestGitDagBundle:
 
         bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, 
tracking_ref="release")
         bundle.initialize()
-        assert bundle.get_current_version() == initial_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == 
initial_commit.hexsha
 
         repo.git.checkout("--orphan", "new-branch")
         repo.git.rm("-rf", ".")
@@ -622,7 +629,7 @@ class TestGitDagBundle:
 
         bundle.refresh()
 
-        assert bundle.get_current_version() == unrelated_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == 
unrelated_commit.hexsha
 
         files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
         assert {"new_file.py"} == files_in_repo
@@ -639,7 +646,7 @@ class TestGitDagBundle:
 
         bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, 
tracking_ref="release")
         bundle.initialize()
-        assert bundle.get_current_version() == initial_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == 
initial_commit.hexsha
 
         # Force-push "release" to an orphan commit
         repo.git.checkout("--orphan", "temp-orphan")
@@ -655,7 +662,7 @@ class TestGitDagBundle:
 
         bundle.refresh()
 
-        assert bundle.get_current_version() == unrelated_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == 
unrelated_commit.hexsha
 
         files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
         assert {"branch_new.py"} == files_in_repo
@@ -671,7 +678,7 @@ class TestGitDagBundle:
 
         bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, 
tracking_ref="moving")
         bundle.initialize()
-        assert bundle.get_current_version() == commit_a.hexsha
+        assert _version_str(bundle.get_current_version()) == commit_a.hexsha
 
         # Move tag forward to commit B
         file_b = repo_path / "file_b.py"
@@ -681,7 +688,7 @@ class TestGitDagBundle:
         repo.create_tag("moving", force=True)
 
         bundle.refresh()
-        assert bundle.get_current_version() == commit_b.hexsha
+        assert _version_str(bundle.get_current_version()) == commit_b.hexsha
         files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
         assert {"test_dag.py", "file_b.py"} == files_in_repo
 
@@ -689,7 +696,7 @@ class TestGitDagBundle:
         repo.create_tag("moving", ref=commit_a, force=True)
 
         bundle.refresh()
-        assert bundle.get_current_version() == commit_a.hexsha
+        assert _version_str(bundle.get_current_version()) == commit_a.hexsha
         files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
         assert {"test_dag.py"} == files_in_repo
 
@@ -722,7 +729,7 @@ class TestGitDagBundle:
         # Repos were reused, not recreated
         assert bare_repo_path.exists()
         assert working_repo_path.exists()
-        assert bundle.get_current_version() == unrelated_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == 
unrelated_commit.hexsha
 
     @mock.patch("airflow.providers.git.bundles.git.GitHook")
     def test_repeated_refreshes_after_force_push_stable(self, mock_githook, 
git_repo):
@@ -745,13 +752,13 @@ class TestGitDagBundle:
 
         # First refresh
         bundle.refresh()
-        assert bundle.get_current_version() == new_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == new_commit.hexsha
         files_after_first = {f.name for f in bundle.path.iterdir() if 
f.is_file()}
         assert {"stable_file.py"} == files_after_first
 
         # Second refresh (no upstream changes)
         bundle.refresh()
-        assert bundle.get_current_version() == new_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == new_commit.hexsha
         files_after_second = {f.name for f in bundle.path.iterdir() if 
f.is_file()}
         assert {"stable_file.py"} == files_after_second
 
@@ -775,7 +782,7 @@ class TestGitDagBundle:
         repo.create_tag("release", force=True)
 
         bundle1.refresh()
-        assert bundle1.get_current_version() == new_commit.hexsha
+        assert _version_str(bundle1.get_current_version()) == new_commit.hexsha
 
         # Simulate DAG processor restart: new bundle object, same name
         bundle2 = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, 
tracking_ref="release")
@@ -783,7 +790,7 @@ class TestGitDagBundle:
             bundle2.initialize()
             mock_clone.assert_not_called()
 
-        assert bundle2.get_current_version() == new_commit.hexsha
+        assert _version_str(bundle2.get_current_version()) == new_commit.hexsha
         files_in_repo = {f.name for f in bundle2.path.iterdir() if f.is_file()}
         assert {"reinit_file.py"} == files_in_repo
 
@@ -1403,14 +1410,14 @@ class TestGitDagBundle:
         repo.create_tag("ephemeral")
         bundle = GitDagBundle(name="test", git_conn_id=CONN_HTTPS, 
tracking_ref="ephemeral")
         bundle.initialize()
-        assert bundle.get_current_version() == initial_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == 
initial_commit.hexsha
 
         # Delete the tag from the upstream repo
         repo.delete_tag("ephemeral")
 
         # Refresh still succeeds because git fetch refspecs don't prune 
deleted tags
         bundle.refresh()
-        assert bundle.get_current_version() == initial_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == 
initial_commit.hexsha
 
         files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
         assert {"test_dag.py"} == files_in_repo
@@ -1653,7 +1660,7 @@ class TestGitDagBundle:
             submodules=True,
         )
         bundle.initialize()
-        assert bundle.get_current_version() == initial_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == 
initial_commit.hexsha
 
         # Verify submodule content is checked out
         sub_content = (bundle.repo_path / "mysub" / "sub_file.py").read_text()
@@ -1668,7 +1675,7 @@ class TestGitDagBundle:
 
         bundle.refresh()
 
-        assert bundle.get_current_version() == new_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == new_commit.hexsha
         files_in_repo = {f.name for f in bundle.repo_path.iterdir() if 
f.is_file()}
         assert "extra.py" in files_in_repo
 
@@ -1707,7 +1714,7 @@ class TestGitDagBundle:
 
         # refresh() prefers origin/ambiguous (branch) over the ambiguous tag
         bundle.refresh()
-        assert bundle.get_current_version() == branch_commit.hexsha
+        assert _version_str(bundle.get_current_version()) == 
branch_commit.hexsha
 
         files_in_repo = {f.name for f in bundle.path.iterdir() if f.is_file()}
         assert "branch_file.py" in files_in_repo

Reply via email to