ephraimbuddy commented on code in PR #61550:
URL: https://github.com/apache/airflow/pull/61550#discussion_r3375709173


##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -1325,14 +1367,34 @@ def _create_orm_dagrun(
     partition_key: str | None = None,
     partition_date: datetime.datetime | None = None,
     note: str | None = None,
+    bundle_version: str | None = None,
+    dag_version: DagVersion | None = None,
     session: Session = NEW_SESSION,
 ) -> DagRun:
-    bundle_version = None
-    if not dag.disable_bundle_versioning:
-        bundle_version = session.scalar(
-            select(DagModel.bundle_version).where(DagModel.dag_id == 
dag.dag_id),
+    resolved_bundle_version: str | None = None
+    use_resolved_dag = False
+    if dag_version is not None:
+        resolved_bundle_version = bundle_version
+        use_resolved_dag = True
+    elif bundle_version is not None:
+        if dag.disable_bundle_versioning:
+            raise AirflowBadRequest(f"DAG with dag_id: '{dag.dag_id}' does not 
support bundle versioning")

Review Comment:
   We cannot raise this here. This is HTTP semantics and they don't belong here 
as this method is called outside the API



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -451,8 +452,27 @@ def materialize_asset(
             f"Dag with dag_id: '{dag_id}' does not allow asset materialization 
runs",
         )
 
+    resolved_body = body or MaterializeAssetBody()
+
+    resolved_dag_version = None
+    if resolved_body.bundle_version is not None:
+        if dag.disable_bundle_versioning:
+            raise HTTPException(
+                status.HTTP_400_BAD_REQUEST,
+                f"DAG with dag_id: '{dag_id}' does not support bundle 
versioning",

Review Comment:
   This resolution and validation logic is duplicated across three layers, in 
`materialize_asset`, in `create_dagrun` and `_create_orm_dagrun`. We should 
have one source of truth for this resolutions as all three to give the same 
error and are different.



##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##########
@@ -197,6 +198,7 @@ def validate_context(self, dag: SerializedDAG) -> dict:
             "conf": self.conf,
             "note": self.note,
             "partition_key": self.partition_key,
+            "bundle_version": self.bundle_version,

Review Comment:
   `bundle_version` here is dead — both routes pass `body.bundle_version` 
straight to `create_dagrun`, so `params['bundle_version']` is never read.



##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -609,7 +633,23 @@ def create_dagrun(
         self.validate_partition_key(partition_key)
 
         # todo: AIP-78 add verification that if run type is backfill then we 
have a backfill id
-        copied_params = self.params.deep_merge(conf)
+
+        # When triggering against a specific bundle version, validate conf 
against that
+        # version's param schema (not the live dag's), so callers get the 
right errors.
+        if bundle_version is not None and not self.disable_bundle_versioning:
+            if dag_version is None:
+                dag_version = DagVersion.get_latest_version(
+                    self.dag_id, bundle_version=bundle_version, 
load_serialized_dag=True, session=session
+                )
+                if not dag_version:
+                    raise DagVersionNotFound(
+                        f"DAG with dag_id: '{self.dag_id}' does not have a 
version for bundle_version '{bundle_version}'"
+                    )
+            params_dag = dag_version.serialized_dag.dag

Review Comment:
   `serialized_dag.dag` rebuilds the DAG via `from_dict` on every access, and 
it's accessed again as `resolved_dag` in `_create_orm_dagrun` — two full 
deserializations per trigger. Resolve once and thread the object through.



##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py:
##########
@@ -2725,6 +2725,114 @@ def 
test_custom_timetable_generate_run_id_for_manual_trigger(self, dag_maker, te
         run = session.scalars(select(DagRun).where(DagRun.run_id == 
run_id_without_logical_date)).one()
         assert run.dag_id == custom_dag_id
 
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_trigger_dag_run_with_bundle_version(self, test_client, session, 
dag_maker):
+        """Test triggering a DAG run with a specific bundle version."""
+        from tests_common.test_utils.dag import sync_dag_to_db
+
+        dag_id = "test_bundle_version_dag"
+        bundle_name = "testing_bundle"
+
+        with dag_maker(
+            dag_id=dag_id,
+            bundle_name=bundle_name,
+            bundle_version="v1",
+            session=session,
+        ) as dag1:
+            EmptyOperator(task_id="task_1")
+        sync_dag_to_db(dag1, bundle_name=bundle_name)
+
+        with dag_maker(
+            dag_id=dag_id,
+            bundle_name=bundle_name,
+            bundle_version="v2",
+            session=session,
+        ) as dag2:
+            EmptyOperator(task_id="task_1")
+            EmptyOperator(task_id="task_2")
+        sync_dag_to_db(dag2, bundle_name=bundle_name)
+
+        response = test_client.post(
+            f"/dags/{dag_id}/dagRuns", json={"logical_date": 
"2024-01-01T00:00:00Z", "bundle_version": "v1"}
+        )
+        assert response.status_code == 200
+        assert response.json()["dag_versions"][0]["bundle_version"] == "v1"

Review Comment:
   Only asserts the recorded `bundle_version`, never the resolved task set — 
which is the feature. `v1` has `task_1`, `v2` has `task_1 + task_2`; if 
resolution fell back to the live dag this still passes. Please assert the run's 
TIs are exactly `{task_1}`.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -451,8 +452,27 @@ def materialize_asset(
             f"Dag with dag_id: '{dag_id}' does not allow asset materialization 
runs",
         )
 
+    resolved_body = body or MaterializeAssetBody()
+
+    resolved_dag_version = None
+    if resolved_body.bundle_version is not None:
+        if dag.disable_bundle_versioning:
+            raise HTTPException(
+                status.HTTP_400_BAD_REQUEST,
+                f"DAG with dag_id: '{dag_id}' does not support bundle 
versioning",
+            )
+        resolved_dag_version = DagVersion.get_latest_version(
+            dag.dag_id, bundle_version=resolved_body.bundle_version, 
load_serialized_dag=True, session=session
+        )
+        if resolved_dag_version is None:
+            raise HTTPException(
+                status.HTTP_404_NOT_FOUND,
+                f"DAG with dag_id: '{dag_id}' does not have a version for 
bundle_version '{resolved_body.bundle_version}'",
+            )
+        dag = resolved_dag_version.serialized_dag.dag
+
     try:
-        params = (body or MaterializeAssetBody()).validate_context(dag)
+        params = (resolved_body or 
MaterializeAssetBody()).validate_context(dag)

Review Comment:
   `resolved_body` is already non-None (line 455), so `resolved_body` or 
`MaterializeAssetBody()` is redundant — just 
`resolved_body.validate_context(dag)`.
   
   
   



##########
airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##########
@@ -338,6 +338,7 @@ class DagRun(StrictBaseModel):
     consumed_asset_events: list[AssetEventDagRunReference]
     partition_key: str | None
     note: str | None = None
+    bundle_version: str | None = None

Review Comment:
   Is the worker actually consuming `bundle_version` from its run context? The 
version gating in v2026_06_30 is correct, but if nothing reads it on the worker 
this is a field on a hot context for no reason.



##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -71,6 +78,21 @@
 
 log = structlog.get_logger(__name__)
 
+# Callbacks are not serialized, so when running an older bundle version we 
copy them
+# from the live dag to the resolved dag so dag-level event hooks still fire 
correctly.
+# Other DAG-level attrs (max_active_runs, catchup, params, …) intentionally 
come from
+# the old serialized version, since the caller asked for that specific 
historical snapshot.
+_DAG_CALLBACK_ATTRS = (
+    "sla_miss_callback",
+    "on_success_callback",
+    "on_failure_callback",
+    "on_retry_callback",
+    "on_execute_callback",
+    "on_skipped_callback",
+    "has_on_success_callback",
+    "has_on_failure_callback",
+)

Review Comment:
   This looks like a no-op on the real path. 
   Callables aren't serialized. On the REST path both the live dag 
(`get_latest_version_of_dag` → from serialization) and 
`dag_version.serialized_dag.dag` come from `from_dict`, so `on_*_callback` are 
None on both — copying them copies None→None. Only the `has_on_*_callback` 
booleans carry anything, and runtime callbacks fire from what the Dag processor 
loads keyed by `dag_version_id`, not from this transient `run.dag`. The passing 
test sets real callables on a `dag_maker` DAG, which is a path the REST API 
never produces — so the mechanism is verified only against a scenario that 
can't occur in production. What concrete runtime behavior does this block fix? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to