ephraimbuddy commented on code in PR #61550:
URL: https://github.com/apache/airflow/pull/61550#discussion_r3375709173
##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -1325,14 +1367,34 @@ def _create_orm_dagrun(
partition_key: str | None = None,
partition_date: datetime.datetime | None = None,
note: str | None = None,
+ bundle_version: str | None = None,
+ dag_version: DagVersion | None = None,
session: Session = NEW_SESSION,
) -> DagRun:
- bundle_version = None
- if not dag.disable_bundle_versioning:
- bundle_version = session.scalar(
- select(DagModel.bundle_version).where(DagModel.dag_id ==
dag.dag_id),
+ resolved_bundle_version: str | None = None
+ use_resolved_dag = False
+ if dag_version is not None:
+ resolved_bundle_version = bundle_version
+ use_resolved_dag = True
+ elif bundle_version is not None:
+ if dag.disable_bundle_versioning:
+ raise AirflowBadRequest(f"DAG with dag_id: '{dag.dag_id}' does not
support bundle versioning")
Review Comment:
We cannot raise this here. This is HTTP semantics and they don't belong here
as this method is called outside the API
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -451,8 +452,27 @@ def materialize_asset(
f"Dag with dag_id: '{dag_id}' does not allow asset materialization
runs",
)
+ resolved_body = body or MaterializeAssetBody()
+
+ resolved_dag_version = None
+ if resolved_body.bundle_version is not None:
+ if dag.disable_bundle_versioning:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ f"DAG with dag_id: '{dag_id}' does not support bundle
versioning",
Review Comment:
This resolution and validation logic is duplicated across three layers, in
`materialize_asset`, in `create_dagrun` and `_create_orm_dagrun`. We should
have one source of truth for this resolutions as all three to give the same
error and are different.
##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py:
##########
@@ -197,6 +198,7 @@ def validate_context(self, dag: SerializedDAG) -> dict:
"conf": self.conf,
"note": self.note,
"partition_key": self.partition_key,
+ "bundle_version": self.bundle_version,
Review Comment:
`bundle_version` here is dead — both routes pass `body.bundle_version`
straight to `create_dagrun`, so `params['bundle_version']` is never read.
##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -609,7 +633,23 @@ def create_dagrun(
self.validate_partition_key(partition_key)
# todo: AIP-78 add verification that if run type is backfill then we
have a backfill id
- copied_params = self.params.deep_merge(conf)
+
+ # When triggering against a specific bundle version, validate conf
against that
+ # version's param schema (not the live dag's), so callers get the
right errors.
+ if bundle_version is not None and not self.disable_bundle_versioning:
+ if dag_version is None:
+ dag_version = DagVersion.get_latest_version(
+ self.dag_id, bundle_version=bundle_version,
load_serialized_dag=True, session=session
+ )
+ if not dag_version:
+ raise DagVersionNotFound(
+ f"DAG with dag_id: '{self.dag_id}' does not have a
version for bundle_version '{bundle_version}'"
+ )
+ params_dag = dag_version.serialized_dag.dag
Review Comment:
`serialized_dag.dag` rebuilds the DAG via `from_dict` on every access, and
it's accessed again as `resolved_dag` in `_create_orm_dagrun` — two full
deserializations per trigger. Resolve once and thread the object through.
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py:
##########
@@ -2725,6 +2725,114 @@ def
test_custom_timetable_generate_run_id_for_manual_trigger(self, dag_maker, te
run = session.scalars(select(DagRun).where(DagRun.run_id ==
run_id_without_logical_date)).one()
assert run.dag_id == custom_dag_id
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_trigger_dag_run_with_bundle_version(self, test_client, session,
dag_maker):
+ """Test triggering a DAG run with a specific bundle version."""
+ from tests_common.test_utils.dag import sync_dag_to_db
+
+ dag_id = "test_bundle_version_dag"
+ bundle_name = "testing_bundle"
+
+ with dag_maker(
+ dag_id=dag_id,
+ bundle_name=bundle_name,
+ bundle_version="v1",
+ session=session,
+ ) as dag1:
+ EmptyOperator(task_id="task_1")
+ sync_dag_to_db(dag1, bundle_name=bundle_name)
+
+ with dag_maker(
+ dag_id=dag_id,
+ bundle_name=bundle_name,
+ bundle_version="v2",
+ session=session,
+ ) as dag2:
+ EmptyOperator(task_id="task_1")
+ EmptyOperator(task_id="task_2")
+ sync_dag_to_db(dag2, bundle_name=bundle_name)
+
+ response = test_client.post(
+ f"/dags/{dag_id}/dagRuns", json={"logical_date":
"2024-01-01T00:00:00Z", "bundle_version": "v1"}
+ )
+ assert response.status_code == 200
+ assert response.json()["dag_versions"][0]["bundle_version"] == "v1"
Review Comment:
Only asserts the recorded `bundle_version`, never the resolved task set —
which is the feature. `v1` has `task_1`, `v2` has `task_1 + task_2`; if
resolution fell back to the live dag this still passes. Please assert the run's
TIs are exactly `{task_1}`.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py:
##########
@@ -451,8 +452,27 @@ def materialize_asset(
f"Dag with dag_id: '{dag_id}' does not allow asset materialization
runs",
)
+ resolved_body = body or MaterializeAssetBody()
+
+ resolved_dag_version = None
+ if resolved_body.bundle_version is not None:
+ if dag.disable_bundle_versioning:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ f"DAG with dag_id: '{dag_id}' does not support bundle
versioning",
+ )
+ resolved_dag_version = DagVersion.get_latest_version(
+ dag.dag_id, bundle_version=resolved_body.bundle_version,
load_serialized_dag=True, session=session
+ )
+ if resolved_dag_version is None:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ f"DAG with dag_id: '{dag_id}' does not have a version for
bundle_version '{resolved_body.bundle_version}'",
+ )
+ dag = resolved_dag_version.serialized_dag.dag
+
try:
- params = (body or MaterializeAssetBody()).validate_context(dag)
+ params = (resolved_body or
MaterializeAssetBody()).validate_context(dag)
Review Comment:
`resolved_body` is already non-None (line 455), so `resolved_body` or
`MaterializeAssetBody()` is redundant — just
`resolved_body.validate_context(dag)`.
##########
airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##########
@@ -338,6 +338,7 @@ class DagRun(StrictBaseModel):
consumed_asset_events: list[AssetEventDagRunReference]
partition_key: str | None
note: str | None = None
+ bundle_version: str | None = None
Review Comment:
Is the worker actually consuming `bundle_version` from its run context? The
version gating in v2026_06_30 is correct, but if nothing reads it on the worker
this is a field on a hot context for no reason.
##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -71,6 +78,21 @@
log = structlog.get_logger(__name__)
+# Callbacks are not serialized, so when running an older bundle version we
copy them
+# from the live dag to the resolved dag so dag-level event hooks still fire
correctly.
+# Other DAG-level attrs (max_active_runs, catchup, params, …) intentionally
come from
+# the old serialized version, since the caller asked for that specific
historical snapshot.
+_DAG_CALLBACK_ATTRS = (
+ "sla_miss_callback",
+ "on_success_callback",
+ "on_failure_callback",
+ "on_retry_callback",
+ "on_execute_callback",
+ "on_skipped_callback",
+ "has_on_success_callback",
+ "has_on_failure_callback",
+)
Review Comment:
This looks like a no-op on the real path.
Callables aren't serialized. On the REST path both the live dag
(`get_latest_version_of_dag` → from serialization) and
`dag_version.serialized_dag.dag` come from `from_dict`, so `on_*_callback` are
None on both — copying them copies None→None. Only the `has_on_*_callback`
booleans carry anything, and runtime callbacks fire from what the Dag processor
loads keyed by `dag_version_id`, not from this transient `run.dag`. The passing
test sets real callables on a `dag_maker` DAG, which is a path the REST API
never produces — so the mechanism is verified only against a scenario that
can't occur in production. What concrete runtime behavior does this block fix?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]