sjyangkevin commented on issue #52574:
URL: https://github.com/apache/airflow/issues/52574#issuecomment-3149060840

   > I took a quick look at the code. In `task_runner.py`, the task runner does 
not commit the asset changes to the database (by sending a request through API) 
before calling the callbacks and listeners (not a bad implementation; those 
functions may further modify the task instance so we want to wait until they 
are finished), but when you call `get_template_context` in the listener you get 
a brand new context object that does not contain changes made during task 
execution.
   > 
   > If my understanding is correct, the easiest fix is probably to add a cache 
to `get_template_context` so all calls inside a task worker always return the 
same context object instead of building new ones.
   
   
   Have another look into it. I think `get_template_context` refers to the one 
in `task_runner.py` which is defined under `sdk/execution_time`. I was looking 
into the `taskinstance.py` which is under `airflow-core/src/airflow/models`.
   
   I added a logging to print the context, and found that the two invocations 
referring to different OutletEventAccessors. Below is the one when the 
"producer" dag is triggered, the OutletEventAccessors object is 
**0x7fe889c50fb0**.
   
   ```
   [2025-08-03, 23:48:37] INFO - DAG bundles loaded: dags-folder: 
source="airflow.dag_processing.bundles.manager.DagBundlesManager"
   [2025-08-03, 23:48:37] INFO - Filling up the DagBag from 
/files/dags/outlet_test_dag.py: source="airflow.models.dagbag.DagBag"
   [2025-08-03, 23:48:38] INFO - 
execution_time.task_runner.RuntimeTaskInstance.get_template_context:: 
chan="stdout": source="task"
   [2025-08-03, 23:48:38] INFO -  {'dag': <DAG: asset-producer>, 'inlets': [], 
'map_index_template': None, 'outlets': [Asset(name='test-asset', 
uri='test-asset', group='asset', extra={}, watchers=[])], ...., 
'outlet_events': <airflow.sdk.execution_time.context.OutletEventAccessors 
object at 0x7fe889c50fb0>
   ```
   
   I think here is when the listener is triggered. The OutletEventAccessors is 
**0x7fe888329820**
   ```
   [2025-08-03, 23:48:38] INFO - on_task_instance_success: produce: 
source="test_listener"
   [2025-08-03, 23:48:38] INFO - on_task_instance_success outlets: 
[Asset(name='test-asset', uri='test-asset', group='asset', extra={}, 
watchers=[])]: source="test_listener"
   [2025-08-03, 23:48:38] INFO - 
execution_time.task_runner.RuntimeTaskInstance.get_template_context:: 
chan="stdout": source="task"
   [2025-08-03, 23:48:38] INFO -  {'dag': <DAG: asset-producer>, 'inlets': [], 
'map_index_template': None, 'outlets': [Asset(name='test-asset', 
uri='test-asset', group='asset', extra={}, watchers=[])], ... 'outlet_events': 
<airflow.sdk.execution_time.context.OutletEventAccessors object at 
0x7fe888329820>
   ```
   
   I attempted to add a simple cache as below, but got the following error.
   
   ```python
   @functools.cache
   def get_template_context(self) -> Context:
      ...
   ```
   
   <img width="902" height="252" alt="Image" 
src="https://github.com/user-attachments/assets/303ed000-28ee-41d5-affc-6392fe91df0f";
 />
   
   If I understand correctly. I think here the hash key for the cache is 
`self`, which is of type `RuntimeTaskInstance`. So, the same task instance 
invokes `get_template_context` will return the same context without going 
through the logic in the functions to construct the context again. If it is an 
expected behavior. Wondering if I could get some hints on the recommended way 
to implement the cache.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to