xBis7 commented on code in PR #43941:
URL: https://github.com/apache/airflow/pull/43941#discussion_r1888781949
##########
airflow/models/taskinstance.py:
##########
@@ -992,7 +1004,11 @@ def get_triggering_events() -> dict[str, list[AssetEvent
| AssetEventPydantic]]:
# Re-attach it if we get called.
nonlocal dag_run
if dag_run not in session:
- dag_run = session.merge(dag_run, load=False)
+ # In case, refresh_from_db has also included the dag_run,
+ # the object will be considered dirty by the session.
+ # Trying to merge the dirty dag_run with load=False, will result
to an SQLAlchemy error.
Review Comment:
@ashb I haven't been able to reproduce the error but it's related to
```
_set_ti_attrs(task_instance, ti, include_dag_run=include_dag_run)
```
The issue is that we need the ti to pickup any changes to the dag_run
context_carrier.
For example, the scheduler that has been running a dag is no longer
available and another one picks it up. In that case, the new scheduler has to
create a new span for the dag_run and needs to update the context_carrier to
point to the info of the new span. Any tasks running from that point until the
end, will be children of the new span. For that to work, the ti must reference
the new span carrier and not the old one.
It's not as simple as always including the dag_run because there are cases
where it's detached or not available and we can't retrieve it from the db.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]