nathadfield commented on code in PR #61448:
URL: https://github.com/apache/airflow/pull/61448#discussion_r2871591146
##########
airflow-core/src/airflow/serialization/definitions/dag.py:
##########
@@ -1113,13 +1225,27 @@ def _create_orm_dagrun(
partition_key: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun:
- bundle_version = None
- if not dag.disable_bundle_versioning:
- bundle_version = session.scalar(
- select(DagModel.bundle_version).where(DagModel.dag_id ==
dag.dag_id),
- )
- dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
+ bundle_version = _resolve_bundle_version(
+ dag=dag,
+ session=session,
+ )
+ dag_version = DagVersion.get_latest_version(dag.dag_id,
bundle_version=bundle_version, session=session)
+
if not dag_version:
+ if bundle_version:
+ # Bundle version exists but not yet serialized - this is a
temporary race condition
+ log.warning(
+ "Bundle version %s for DAG %s is not yet available. "
Review Comment:
I haven't reproduced it in a live system but the race condition was
identified through code analysis. It's a pretty narrow window in practice and
it's more likely to surface if the DAG files are large or slow to parse, or the
scheduler is under heavy load.
I validated the scenario with a unit test
(`test_create_dagrun_race_condition_bundle_version_not_serialized_yet`) that
sets up exactly this state, DagBundleModel at v2.0.0 but only a DagVersion for
v1.0.0, and confirms BundleVersionUnavailable is raised with a clear retry
message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]