Lee-W commented on code in PR #59857:
URL: https://github.com/apache/airflow/pull/59857#discussion_r2667359353


##########
airflow-core/src/airflow/callbacks/callback_requests.py:
##########
@@ -95,6 +103,39 @@ class DagRunContext(BaseModel):
     dag_run: ti_datamodel.DagRun | None = None
     last_ti: ti_datamodel.TaskInstance | None = None
 
+    @model_validator(mode="before")
+    @classmethod
+    def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> 
Mapping[str, Any]:
+        dag_run = values.get("dag_run")
+
+        if dag_run is None:
+            return values
+
+        # DagRunContext may receive non-ORM dag_run objects (e.g. datamodels).
+        # Only apply this validator to ORM-mapped instances.
+        try:
+            _ = sa_inspect(dag_run)
+        except NoInspectionAvailable:
+            return values
+
+        # Relationship access may raise DetachedInstanceError; fall back to 
empty list to
+        # avoid crashing the scheduler.
+        try:
+            events = dag_run.consumed_asset_events
+            set_committed_value(
+                dag_run,
+                "consumed_asset_events",
+                list(events) if events is not None else [],
+            )
+        except DetachedInstanceError:

Review Comment:
   Even though the validator itself is frequently called, the 
DetachedInstanceError is not often encountered. Thus, doing some DB call here 
should be acceptable. Making it an empty list could be misleading.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to