nathadfield commented on code in PR #61448:
URL: https://github.com/apache/airflow/pull/61448#discussion_r2871552093


##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -1094,6 +1096,116 @@ def get_edge_info(self, upstream_task_id: str, 
downstream_task_id: str) -> EdgeI
         return empty
 
 
+def _resolve_bundle_version(
+    dag: SerializedDAG,
+    session: Session,
+) -> str | None:
+    """
+    Resolve the bundle version for a DAG run based on the precedence hierarchy.
+
+    Precedence (highest to lowest):
+    1. DAG-level configuration (dag.run_on_latest_version)
+    2. Global configuration (core.run_on_latest_version)
+    3. System default (use original version from DagModel)
+
+    :param dag: The serialized DAG
+    :param session: Database session
+    :return: The resolved bundle version, or None if versioning is disabled
+    """
+    if dag.disable_bundle_versioning:
+        return None
+
+    # Determine whether to use latest version based on precedence hierarchy
+    use_latest = _should_use_latest_version(dag)
+    source = _get_config_source(dag)
+
+    if use_latest:
+        log.debug("Using latest bundle version", dag_id=dag.dag_id, 
source=source)
+        return _get_latest_bundle_version(dag.dag_id, session)
+
+    log.debug("Using original bundle version", dag_id=dag.dag_id, 
source=source)
+    return _get_original_bundle_version(dag.dag_id, session)
+
+
+def _should_use_latest_version(
+    dag: SerializedDAG,
+) -> bool:
+    """
+    Determine whether to use latest bundle version based on precedence 
hierarchy.
+
+    Returns True if latest version should be used, False otherwise.
+    """
+    # Level 1: DAG-level configuration (explicit True or False)
+    if dag.run_on_latest_version is not None:
+        return dag.run_on_latest_version
+
+    # Level 2: Global configuration (fallback to False)
+    return airflow_conf.getboolean("core", "run_on_latest_version", 
fallback=False)
+
+
+def _get_config_source(
+    dag: SerializedDAG,
+) -> str:
+    """Return descriptive source of the bundle version configuration for 
logging."""
+    if dag.run_on_latest_version is not None:
+        return "DAG configuration"
+    if airflow_conf.getboolean("core", "run_on_latest_version", 
fallback=False):
+        return "global configuration"
+    return "system default"
+
+
+def _get_latest_bundle_version(dag_id: str, session: Session) -> str | None:
+    """
+    Get the latest bundle version from DagBundleModel, falling back to 
DagModel.
+
+    :param dag_id: The DAG ID
+    :param session: Database session
+    """
+    from airflow.models.dagbundle import DagBundleModel
+
+    dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == 
dag_id))
+    if not dag_model:
+        log.warning(
+            "Cannot resolve latest bundle version: DagModel not found",
+            dag_id=dag_id,
+        )
+        return None
+
+    # Non-versioned bundle (e.g., LocalDagBundle) - use original version
+    if not dag_model.bundle_name:

Review Comment:
   Thanks! Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to