This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1eb047dbfb62086da73a0225fd25e949b6c3fd8f
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Oct 20 21:19:50 2022 +0100

    Listener: Set task on sqlalchemy taskinstance object (#27167)
    
    same as https://github.com/apache/airflow/pull/21157
    
    TaskListener API's contract promises to pass TaskInstance object to 
listener plugin. However, what happens is not 100% true - the object being 
passed is one that maps to current SQLAlchemy session.
    
    `_run_raw_task` before merging the TI operates on detached TaskInstance 
object, then merges it to current session. Since there is no attached object in 
the SQLAlchemy identity map, SQLAlchemy creates it, and it's this object that's 
being passed to the SQLAlchemy event listeners.
    
    The problem with that is that when creating new SQLAlchemy object, 
SQLAlchemy takes care about setting only database-mapped fields. The ones that 
are purely on the python side, like task aren't being set on the new object.
    
    This PR manually sets `task` on the new SQLAlchemy object, so that 
`on_task_instance_success` receives proper TaskInstance with task field set.
    
    (cherry picked from commit 395ad7110e53a30a5d33f648d1dd797482eb268c)
---
 airflow/models/taskinstance.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 48e12e7a62..d4453ca842 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1527,7 +1527,7 @@ class TaskInstance(Base, LoggingMixin):
 
         if not test_mode:
             session.add(Log(self.state, self))
-            session.merge(self)
+            session.merge(self).task = self.task
             if self.state == TaskInstanceState.SUCCESS:
                 self._register_dataset_changes(session=session)
             session.commit()

Reply via email to