xBis7 commented on code in PR #43941:
URL: https://github.com/apache/airflow/pull/43941#discussion_r1888781949


##########
airflow/models/taskinstance.py:
##########
@@ -992,7 +1004,11 @@ def get_triggering_events() -> dict[str, list[AssetEvent 
| AssetEventPydantic]]:
         # Re-attach it if we get called.
         nonlocal dag_run
         if dag_run not in session:
-            dag_run = session.merge(dag_run, load=False)
+            # In case, refresh_from_db has also included the dag_run,
+            # the object will be considered dirty by the session.
+            # Trying to merge the dirty dag_run with load=False, will result 
to an SQLAlchemy error.

Review Comment:
   @ashb I haven't been able to reproduce the error but it's related to 
   ```
   _set_ti_attrs(task_instance, ti, include_dag_run=include_dag_run)
   ```
   
   The issue is that we need the ti to pickup any changes to the dag_run 
context_carrier.
   
   For example, the scheduler that has been running a dag is no longer 
available and another one picks it up. In that case, the new scheduler has to 
create a new span for the dag_run and needs to update the context_carrier to 
point to the info of the new span. Any tasks running from that point until the 
end, will be children of the new span. For that to work, the ti must reference 
the new span carrier and not the old one.
   
   It's not as simple as always including the dag_run because there are cases 
where it's detached or not available and we can't retrieve it from the db.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to