pierrejeambrun commented on code in PR #61550:
URL: https://github.com/apache/airflow/pull/61550#discussion_r3281945925


##########
airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py:
##########
@@ -202,6 +202,26 @@ class AddDagEndpoint(VersionChange):
     instructions_to_migrate_to_previous_version = (endpoint("/dags/{dag_id}", 
["GET"]).didnt_exist,)
 
 
+class AddBundleVersionField(VersionChange):

Review Comment:
   **Blocker.** `v2026_04_06.py` shipped in 3.2.0 and 3.2.1. Per 
[`contributing-docs/19_execution_api_versioning.rst`](https://github.com/apache/airflow/blob/main/contributing-docs/19_execution_api_versioning.rst),
 new migrations only go into **unreleased** versions. Move 
`AddBundleVersionField` to a new CalVer file dated to the planned release, and 
update `versions/__init__.py`.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -615,16 +633,25 @@ def trigger_dag_run(
             triggering_user_name=user.get_name(),
             state=DagRunState.QUEUED,
             partition_key=params["partition_key"],
+            bundle_version=body.bundle_version,
+            dag_version=resolved_dag_version,
             session=session,
         )
+
+        dag_run_note = body.note
+        if dag_run_note:
+            current_user_id = user.get_id()
+            dag_run.note = (dag_run_note, current_user_id)
+        return dag_run
+
     except (ParamValidationError, ValueError) as e:
         raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) from e
-
-    dag_run_note = body.note
-    if dag_run_note:
-        current_user_id = user.get_id()
-        dag_run.note = (dag_run_note, current_user_id)
-    return dag_run
+    except AirflowNotFoundException as e:
+        raise HTTPException(status.HTTP_404_NOT_FOUND, str(e))
+    except AirflowException as e:

Review Comment:
   Carry-over from my Mar 6 comment. The catch is generic 400 but it forwards 
`str(e)`, so messages like "Cannot create DagRun ... because the dag is not 
serialized" leak through as a 400. Either narrow the catch to the specific 
exceptions `create_dagrun` raises as user errors, or replace the message with a 
generic one before raising.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -600,10 +600,28 @@ def trigger_dag_run(
     else:
         triggered_by = DagRunTriggeredByType.REST_API
 
-    dag = get_latest_version_of_dag(dag_bag, dag_id, session)
-    params = body.validate_context(dag)
-
     try:
+        dag = get_latest_version_of_dag(dag_bag, dag_id, session)
+
+        resolved_dag_version = None
+        if body.bundle_version is not None:

Review Comment:
   Carry-over from my Feb 17 / 20 comment. The DB re-fetch is now gone (good), 
but the `disable_bundle_versioning` check, the `get_latest_version` call, and 
the 404 still live in both the route AND `_create_orm_dagrun`. Move it fully 
into `_create_orm_dagrun` so the route just passes `bundle_version` and catches 
a specific exception.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -615,16 +633,25 @@ def trigger_dag_run(
             triggering_user_name=user.get_name(),
             state=DagRunState.QUEUED,
             partition_key=params["partition_key"],
+            bundle_version=body.bundle_version,
+            dag_version=resolved_dag_version,
             session=session,
         )
+
+        dag_run_note = body.note
+        if dag_run_note:
+            current_user_id = user.get_id()
+            dag_run.note = (dag_run_note, current_user_id)
+        return dag_run
+
     except (ParamValidationError, ValueError) as e:
         raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) from e
-
-    dag_run_note = body.note
-    if dag_run_note:
-        current_user_id = user.get_id()
-        dag_run.note = (dag_run_note, current_user_id)
-    return dag_run
+    except AirflowNotFoundException as e:
+        raise HTTPException(status.HTTP_404_NOT_FOUND, str(e))
+    except AirflowException as e:
+        raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e))
+    except ValueError as e:

Review Comment:
   Dead branch — `ValueError` is already caught in the first tuple above. Drop 
it.
   
   ```python
   except (ParamValidationError, ValueError) as e:
       raise HTTPException(...) from e
   ...
   except ValueError as e:    # never reached
       raise HTTPException(...)
   ```



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -615,16 +633,25 @@ def trigger_dag_run(
             triggering_user_name=user.get_name(),
             state=DagRunState.QUEUED,
             partition_key=params["partition_key"],
+            bundle_version=body.bundle_version,
+            dag_version=resolved_dag_version,
             session=session,
         )
+
+        dag_run_note = body.note
+        if dag_run_note:
+            current_user_id = user.get_id()
+            dag_run.note = (dag_run_note, current_user_id)
+        return dag_run
+
     except (ParamValidationError, ValueError) as e:
         raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) from e
-
-    dag_run_note = body.note
-    if dag_run_note:
-        current_user_id = user.get_id()
-        dag_run.note = (dag_run_note, current_user_id)
-    return dag_run
+    except AirflowNotFoundException as e:

Review Comment:
   Three of the four except blocks lost their `from e`. Add it back so 
tracebacks survive.



##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -69,6 +69,21 @@
 
 log = structlog.get_logger(__name__)
 
+# Callbacks are not serialized, so when running an older bundle version we 
copy them
+# from the live dag to the resolved dag so dag-level event hooks still fire 
correctly.
+# Other DAG-level attrs (max_active_runs, catchup, params, …) intentionally 
come from
+# the old serialized version, since the caller asked for that specific 
historical snapshot.
+_DAG_CALLBACK_ATTRS = (

Review Comment:
   `_DAG_CALLBACK_ATTRS` lists 8 attrs but 
`test_create_dagrun_callbacks_copied_to_resolved_bundle_version` only asserts 
on 2 (`on_failure_callback`, `on_success_callback`). Parametrize the test over 
the tuple, or add at least one of `sla_miss_callback` / `on_retry_callback`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to