jason810496 commented on code in PR #61550:
URL: https://github.com/apache/airflow/pull/61550#discussion_r2911344509
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -494,6 +503,10 @@ def trigger_dag_run(
current_user_id = user.get_id()
dag_run.note = (dag_run_note, current_user_id)
return dag_run
+ except AirflowException as e:
+ if "does not have a version for bundle_version" in str(e):
Review Comment:
Then we could distinguish the exact exception class here.
##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -1114,16 +1116,37 @@ def _create_orm_dagrun(
triggered_by: DagRunTriggeredByType,
triggering_user_name: str | None = None,
partition_key: str | None = None,
+ bundle_version: str | None = None,
partition_date: datetime.datetime | None = None,
note: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun:
- bundle_version = None
+ resolved_bundle_version: str | None = None
if not dag.disable_bundle_versioning:
- bundle_version = session.scalar(
- select(DagModel.bundle_version).where(DagModel.dag_id ==
dag.dag_id),
- )
- dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
+ if bundle_version is not None:
+ resolved_bundle_version = bundle_version
+ dag_version = DagVersion.get_latest_version(
+ dag.dag_id, bundle_version=resolved_bundle_version,
load_serialized_dag=True, session=session
+ )
+ if not dag_version:
+ raise AirflowException(
+ f"DAG with dag_id: '{dag.dag_id}' does not have a version
for bundle_version '{bundle_version}'"
Review Comment:
```suggestion
f"Dag with dag_id: '{dag.dag_id}' does not have a
version for bundle_version '{bundle_version}'"
```
##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -1114,16 +1116,37 @@ def _create_orm_dagrun(
triggered_by: DagRunTriggeredByType,
triggering_user_name: str | None = None,
partition_key: str | None = None,
+ bundle_version: str | None = None,
partition_date: datetime.datetime | None = None,
note: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun:
- bundle_version = None
+ resolved_bundle_version: str | None = None
if not dag.disable_bundle_versioning:
- bundle_version = session.scalar(
- select(DagModel.bundle_version).where(DagModel.dag_id ==
dag.dag_id),
- )
- dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
+ if bundle_version is not None:
+ resolved_bundle_version = bundle_version
+ dag_version = DagVersion.get_latest_version(
+ dag.dag_id, bundle_version=resolved_bundle_version,
load_serialized_dag=True, session=session
+ )
+ if not dag_version:
+ raise AirflowException(
Review Comment:
Please add a dedicated Exception class for this case as we're avoiding the
too board `AirflowException` usage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]