Copilot commented on code in PR #61448:
URL: https://github.com/apache/airflow/pull/61448#discussion_r2896279761


##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -1245,6 +1343,112 @@ def test_dag_add_task_sets_default_task_group(self):
         assert task_group.get_child_by_label("task_with_task_group") == 
task_with_task_group
         assert dag.get_task("task_group.task_with_task_group") == 
task_with_task_group
 
+    def test_create_dagrun_system_default_uses_original_version(self, 
dag_maker, session):
+        """Test system default: when DAG=None and global=False, uses original 
bundle version."""
+        with conf_vars({("core", "run_on_latest_version"): "False"}):
+            with dag_maker("test_system_default", run_on_latest_version=None):
+                ...
+
+            self._setup_bundle(session, "test_system_default", 
"test_bundle_system_default")
+
+            dr = self._create_test_dagrun(dag_maker, 
run_id="test_system_default")
+            assert dr.bundle_version == "v1.0.0"
+
+    def test_create_dagrun_disable_bundle_versioning_bypasses_logic(self, 
dag_maker, session):
+        """Test that disable_bundle_versioning=True bypasses all bundle 
version logic."""
+        with conf_vars({("core", "run_on_latest_version"): "True"}):
+            with dag_maker("test_bypass", disable_bundle_versioning=True):
+                ...
+
+            self._setup_bundle(session, "test_bypass", "test_bundle_bypass")
+
+            dr = self._create_test_dagrun(dag_maker, run_id="test_bypass")
+            assert dr.bundle_version is None
+
+    def test_create_dagrun_race_condition_fails_fast(self, dag_maker, session):
+        """Test that race condition (bundle updated but not serialized) raises 
clear error."""
+        from airflow.models.dag_version import DagVersion
+        from airflow.models.dagbundle import DagBundleModel

Review Comment:
   These imports are inside the test function body. Please move them to the 
module-level imports (DagBundleModel is already imported at the top) unless 
there is a specific need for a local import.



##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -1245,6 +1343,112 @@ def test_dag_add_task_sets_default_task_group(self):
         assert task_group.get_child_by_label("task_with_task_group") == 
task_with_task_group
         assert dag.get_task("task_group.task_with_task_group") == 
task_with_task_group
 
+    def test_create_dagrun_system_default_uses_original_version(self, 
dag_maker, session):
+        """Test system default: when DAG=None and global=False, uses original 
bundle version."""
+        with conf_vars({("core", "run_on_latest_version"): "False"}):
+            with dag_maker("test_system_default", run_on_latest_version=None):
+                ...
+
+            self._setup_bundle(session, "test_system_default", 
"test_bundle_system_default")
+
+            dr = self._create_test_dagrun(dag_maker, 
run_id="test_system_default")
+            assert dr.bundle_version == "v1.0.0"
+
+    def test_create_dagrun_disable_bundle_versioning_bypasses_logic(self, 
dag_maker, session):
+        """Test that disable_bundle_versioning=True bypasses all bundle 
version logic."""
+        with conf_vars({("core", "run_on_latest_version"): "True"}):
+            with dag_maker("test_bypass", disable_bundle_versioning=True):
+                ...
+
+            self._setup_bundle(session, "test_bypass", "test_bundle_bypass")
+
+            dr = self._create_test_dagrun(dag_maker, run_id="test_bypass")
+            assert dr.bundle_version is None
+
+    def test_create_dagrun_race_condition_fails_fast(self, dag_maker, session):
+        """Test that race condition (bundle updated but not serialized) raises 
clear error."""
+        from airflow.models.dag_version import DagVersion
+        from airflow.models.dagbundle import DagBundleModel
+
+        with dag_maker("test_race_condition", run_on_latest_version=True):
+            ...
+
+        # Create DagBundleModel with v2.0.0 (simulating bundle refresh)
+        dag_bundle = DagBundleModel(name="test_bundle_race", version="v2.0.0")
+        session.add(dag_bundle)
+        session.flush()
+
+        # Update DagModel to point to this bundle
+        dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == 
"test_race_condition"))
+        dag_model.bundle_name = "test_bundle_race"
+        dag_model.bundle_version = "v1.0.0"  # Original version
+        session.flush()
+
+        # Create only ONE DagVersion for v1.0.0 (v2.0.0 not serialized yet - 
simulating race)
+        existing_dag_version = session.scalar(
+            select(DagVersion).where(DagVersion.dag_id == 
"test_race_condition").limit(1)
+        )
+        if existing_dag_version:
+            existing_dag_version.bundle_name = "test_bundle_race"
+            existing_dag_version.bundle_version = "v1.0.0"
+            session.flush()
+
+        # Request "run on latest" - should get v2.0.0 from DagBundleModel
+        # But DagVersion for v2.0.0 doesn't exist yet (race condition)
+        # Should raise exception with clear message about temporary condition
+        with pytest.raises(
+            BundleVersionUnavailable,
+            match="Cannot create DagRun.*bundle version v2.0.0.*not been 
parsed yet.*retry",
+        ):
+            self._create_test_dagrun(dag_maker, run_id="test_race_fail")
+
+    @mock.patch("airflow.configuration.conf.getboolean")
+    def test_create_dagrun_dag_none_uses_global_true(self, mock_getboolean, 
dag_maker, session):
+        """Test DAG=None with Global=True uses latest version (not 
original)."""
+        mock_getboolean.side_effect = lambda section, key, fallback=None: (
+            True if section == "core" and key == "run_on_latest_version" else 
fallback
+        )
+
+        # DAG level is None (not explicitly set) - should use global True
+        with dag_maker("test_dag_none_global_true", 
run_on_latest_version=None):
+            ...
+
+        self._setup_bundle(session, "test_dag_none_global_true", 
"test_bundle_none_global")
+
+        dr = self._create_test_dagrun(dag_maker)
+        # Should use v2.0.0 because global=True and DAG=None falls through
+        assert dr.bundle_version == "v2.0.0"
+
+    def test_create_dagrun_non_versioned_bundle_uses_original_version(self, 
dag_maker, session):
+        """Test that non-versioned bundles (e.g. LocalDagBundle) fall back to 
original version."""
+        from airflow.models.dag_version import DagVersion
+        from airflow.models.dagbundle import DagBundleModel

Review Comment:
   These imports are inside the test function body. Please move them to the 
module-level imports (DagBundleModel is already imported at the top) unless 
there is a specific need for a local import.



##########
airflow-core/tests/unit/models/test_backfill.py:
##########
@@ -162,6 +162,8 @@ def 
test_create_backfill_clear_existing_bundle_version(dag_maker, session, run_o
     """
     Verify that when backfill clears an existing dag run, bundle version is 
cleared.
     """
+    from airflow.models.dag_version import DagVersion
+

Review Comment:
   This local import inside the test body is avoidable and makes dependencies 
less discoverable. Please move DagVersion to the module-level imports unless 
the local import is needed for circular import avoidance or test isolation.



##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -3542,6 +3746,7 @@ def test_validate_setup_teardown_trigger_rule(self):
 )
 def test_disable_bundle_versioning(disable, bundle_version, expected, 
dag_maker, session, clear_dags):
     """When bundle versioning is disabled for a dag, the dag run should not 
have a bundle version."""
+    from airflow.models.dag_version import DagVersion
 

Review Comment:
   Importing DagVersion inside the test function body makes imports harder to 
track and is inconsistent with the module-level import style used elsewhere in 
this file. Please move this to the top-level imports unless there's a specific 
reason for a local import.



##########
airflow-core/tests/unit/models/test_dag.py:
##########
@@ -1232,6 +1232,104 @@ def test_create_dagrun_partition_key(self, 
partition_key, dag_maker):
             )
             assert dr.partition_key == partition_key
 
+    def _setup_bundle(self, session, dag_id, bundle_name, 
original_version="v1.0.0", latest_version="v2.0.0"):
+        """
+        Set up bundle configuration for testing bundle version resolution.
+
+        Creates a DagBundleModel with the latest version, updates the DagModel 
with bundle info,
+        and creates DagVersion entries for both original and latest versions 
to simulate
+        what happens when different bundle versions are parsed.
+        """
+        from airflow.models.dag_version import DagVersion
+        from airflow.models.dagbundle import DagBundleModel

Review Comment:
   These imports are inside a helper method body. To keep imports consistent 
and avoid repeated imports during test runs, please move them to the module 
import section unless there is a specific need for a local import (e.g. 
circular import avoidance).



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py:
##########
@@ -143,6 +143,18 @@ def trigger_dag_run(
                 "message": f"A run already exists for Dag '{dag_id}' with 
run_id '{run_id}'",
             },
         )
+    except BundleVersionUnavailable as e:
+        log.warning(
+            "Bundle version unavailable when triggering DAG run",

Review Comment:
   The warning log for BundleVersionUnavailable omits the exception detail, 
which makes production troubleshooting harder. Consider including the exception 
message in the log context (e.g. add it to `extra` or include it in the 
formatted message).
   ```suggestion
               "Bundle version unavailable when triggering DAG run: %s",
               e,
   ```



##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -1097,6 +1099,116 @@ def get_edge_info(self, upstream_task_id: str, 
downstream_task_id: str) -> EdgeI
         return empty
 
 
+def _resolve_bundle_version(
+    dag: SerializedDAG,
+    session: Session,
+) -> str | None:
+    """
+    Resolve the bundle version for a DAG run based on the precedence hierarchy.
+
+    Precedence (highest to lowest):
+    1. DAG-level configuration (dag.run_on_latest_version)
+    2. Global configuration (core.run_on_latest_version)
+    3. System default (use original version from DagModel)
+
+    :param dag: The serialized DAG
+    :param session: Database session
+    :return: The resolved bundle version, or None if versioning is disabled
+    """
+    if dag.disable_bundle_versioning:
+        return None
+
+    # Determine whether to use latest version based on precedence hierarchy
+    use_latest = _should_use_latest_version(dag)
+    source = _get_config_source(dag)
+
+    if use_latest:
+        log.debug("Using latest bundle version", dag_id=dag.dag_id, 
source=source)
+        return _get_latest_bundle_version(dag.dag_id, session)
+
+    log.debug("Using original bundle version", dag_id=dag.dag_id, 
source=source)
+    return _get_original_bundle_version(dag.dag_id, session)
+
+
+def _should_use_latest_version(
+    dag: SerializedDAG,
+) -> bool:
+    """
+    Determine whether to use latest bundle version based on precedence 
hierarchy.
+
+    Returns True if latest version should be used, False otherwise.
+    """
+    # Level 1: DAG-level configuration (explicit True or False)
+    if dag.run_on_latest_version is not None:
+        return dag.run_on_latest_version
+
+    # Level 2: Global configuration (fallback to False)
+    return airflow_conf.getboolean("core", "run_on_latest_version", 
fallback=False)
+
+
+def _get_config_source(
+    dag: SerializedDAG,
+) -> str:
+    """Return descriptive source of the bundle version configuration for 
logging."""
+    if dag.run_on_latest_version is not None:
+        return "DAG configuration"
+    if airflow_conf.has_option("core", "run_on_latest_version"):
+        return "global configuration"
+    return "system default"
+
+
+def _get_latest_bundle_version(dag_id: str, session: Session) -> str | None:
+    """
+    Get the latest bundle version from DagBundleModel, falling back to 
DagModel.
+
+    :param dag_id: The DAG ID
+    :param session: Database session
+    """
+    from airflow.models.dagbundle import DagBundleModel
+

Review Comment:
   Importing DagBundleModel inside _get_latest_bundle_version() violates the 
project rule to keep imports at module scope. Please move this import to the 
top of the file (or add a short comment explaining why a local import is 
required, e.g. circular import avoidance).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to