Lee-W commented on code in PR #58655:
URL: https://github.com/apache/airflow/pull/58655#discussion_r2567220926
##########
airflow-core/src/airflow/models/dag.py:
##########
@@ -598,20 +598,29 @@ def dags_needing_dagruns(cls, session: Session) ->
tuple[Any, dict[str, datetime
evaluator = AssetEvaluator(session)
- def dag_ready(dag_id: str, cond: BaseAsset, statuses:
dict[AssetUniqueKey, bool]) -> bool | None:
+ def dag_ready(dag_id: str, cond: BaseAsset, statuses:
dict[AssetUniqueKey, bool]) -> bool:
try:
return evaluator.run(cond, statuses)
except AttributeError:
# if dag was serialized before 2.9 and we *just* upgraded,
# we may be dealing with old version. In that case,
# just wait for the dag to be reserialized.
- log.warning("dag '%s' has old serialization; skipping DAG run
creation.", dag_id)
- return None
+ log.warning("Dag '%s' has old serialization; skipping run
creation.", dag_id)
+ return False
+ except Exception:
+ log.exception("Failed to evaluate dag '%s'; assuming not
read", dag_id)
+ return False
# this loads all the ADRQ records.... may need to limit num dags
adrq_by_dag: dict[str, list[AssetDagRunQueue]] = defaultdict(list)
- for r in session.scalars(select(AssetDagRunQueue)):
- adrq_by_dag[r.target_dag_id].append(r)
+ for r in
session.scalars(select(AssetDagRunQueue).options(joinedload(AssetDagRunQueue.dag_model))):
+ if r.dag_model.asset_expression is None:
+ # The dag referenced does not actually depend on an asset! This
+ # could happen if the dag DID depend on an asset at some point,
Review Comment:
sounds good 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]