ephraimbuddy commented on code in PR #61550:
URL: https://github.com/apache/airflow/pull/61550#discussion_r3085346677


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -487,6 +488,13 @@ def trigger_dag_run(
         dag = get_latest_version_of_dag(dag_bag, dag_id, session)
         params = body.validate_context(dag)
 
+        if body.bundle_version is not None:
+            if dag.disable_bundle_versioning:
+                raise HTTPException(
+                    status.HTTP_400_BAD_REQUEST,
+                    f"DAG with dag_id: '{dag_id}' does not support bundle 
versioning",
+                )
+

Review Comment:
   line 488 above and 
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:118 still 
build the trigger context from get_latest_version_of_dag(...) before 
create_dagrun() switches to the requested bundle_version. That means 
validate_context() derives data_interval and generated run_id from the latest 
DAG's timetable, not the requested older version. If a DAG's schedule/timetable 
changed between bundle versions, a bundle_version="v1" trigger can be created 
with v2 semantics.



##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##########
@@ -106,6 +106,7 @@ class TriggerDAGRunPostBody(StrictBaseModel):
     conf: dict | None = Field(default_factory=dict)
     note: str | None = None
     partition_key: str | None = None
+    bundle_version: str | None = None

Review Comment:
   the above adds bundle_version to TriggerDAGRunPostBody, and 
airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py:197 makes 
MaterializeAssetBody inherit it automatically. But 
airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py:430 still 
ignores that field and calls create_dagrun() without bundle_version. So the 
generated OpenAPI/SDKs now advertise bundle-version selection on 
/assets/{asset_id}/materialize, but the server silently materializes the latest 
DAG version instead.



##########
airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py:
##########
@@ -178,6 +178,20 @@ def remove_note_field(response: ResponseInfo) -> None:  # 
type: ignore[misc]
             response.body["dag_run"].pop("note", None)
 
 
+class AddBundleVersionField(VersionChange):
+    """Add the `bundle_version` field to DagRun model."""
+
+    description = __doc__
+
+    instructions_to_migrate_to_previous_version = 
(schema(DagRun).field("bundle_version").didnt_exist,)
+
+    @convert_response_to_previous_version_for(TIRunContext)  # type: 
ignore[arg-type]
+    def remove_bundle_version_from_dag_run(response: ResponseInfo) -> None:  # 
type: ignore[misc]
+        """Remove the `bundle_version` field from the dag_run object when 
converting to the previous version."""
+        if "dag_run" in response.body and isinstance(response.body["dag_run"], 
dict):
+            response.body["dag_run"].pop("bundle_version", None)
+

Review Comment:
   This adds AddBundleVersionField, but the shim only strips the field from 
TIRunContext. The execution API also returns DagRun directly from 
airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py:68 and 
:242, so older versioned clients can still receive the new bundle_version field 
on those routes. That breaks the version boundary this Cadwyn change is 
supposed to preserve.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to