SameerMesiah97 commented on code in PR #59857:
URL: https://github.com/apache/airflow/pull/59857#discussion_r2664613088
##########
airflow-core/src/airflow/callbacks/callback_requests.py:
##########
@@ -95,6 +103,39 @@ class DagRunContext(BaseModel):
dag_run: ti_datamodel.DagRun | None = None
last_ti: ti_datamodel.TaskInstance | None = None
+ @model_validator(mode="before")
+ @classmethod
+ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) ->
Mapping[str, Any]:
+ dag_run = values.get("dag_run")
+
+ if dag_run is None:
+ return values
+
+ # DagRunContext may receive non-ORM dag_run objects (e.g. datamodels).
+ # Only apply this validator to ORM-mapped instances.
+ try:
+ _ = sa_inspect(dag_run)
+ except NoInspectionAvailable:
+ return values
+
+ # Relationship access may raise DetachedInstanceError; fall back to
empty list to
+ # avoid crashing the scheduler.
+ try:
+ events = dag_run.consumed_asset_events
+ set_committed_value(
+ dag_run,
+ "consumed_asset_events",
+ list(events) if events is not None else [],
+ )
+ except DetachedInstanceError:
Review Comment:
@Lee-W @uranusjr
I agree in principle that reloading would be more correct in cases where
consumed_asset_events isn’t loaded. My hesitation is about doing that in the
validator. consumed_asset_events isn’t eagerly loaded in all contexts, but in
the scheduler it’s explicitly populated at DagRun creation time. Hitting a
DetachedInstanceError here usually reflects an ORM lifecycle issue
(detachment), not missing data.
Since this validator doesn’t know which context it’s running in and has no
session guarantees, reloading here would require implicit DB I/O, which feels
risky given validators may run frequently. I see this validator as a
last-resort guardrail: log loudly and fail safe rather than reattaching or
reloading state at this boundary. If we want to tighten correctness, I think
the right place is ensuring reattachment or eager loading happens before
DagRunContext is constructed, in a session-aware layer. I believe PR #56916 and
#59714 already handle this to a great degree but we could do the same for other
places where DagRunContext is called.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]