SameerMesiah97 commented on code in PR #59857:
URL: https://github.com/apache/airflow/pull/59857#discussion_r2668395546
##########
airflow-core/src/airflow/callbacks/callback_requests.py:
##########
@@ -95,6 +103,39 @@ class DagRunContext(BaseModel):
dag_run: ti_datamodel.DagRun | None = None
last_ti: ti_datamodel.TaskInstance | None = None
+ @model_validator(mode="before")
+ @classmethod
+ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) ->
Mapping[str, Any]:
+ dag_run = values.get("dag_run")
+
+ if dag_run is None:
+ return values
+
+ # DagRunContext may receive non-ORM dag_run objects (e.g. datamodels).
+ # Only apply this validator to ORM-mapped instances.
+ try:
+ _ = sa_inspect(dag_run)
+ except NoInspectionAvailable:
+ return values
+
+ # Relationship access may raise DetachedInstanceError; fall back to
empty list to
+ # avoid crashing the scheduler.
+ try:
+ events = dag_run.consumed_asset_events
+ set_committed_value(
+ dag_run,
+ "consumed_asset_events",
+ list(events) if events is not None else [],
+ )
+ except DetachedInstanceError:
Review Comment:
@Lee-W that addresses my concern. I will reload the DagRun with a session
like you requested. Thank you for clarifying that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]