sjyangkevin commented on issue #52574:
URL: https://github.com/apache/airflow/issues/52574#issuecomment-3149060840
> I took a quick look at the code. In `task_runner.py`, the task runner does
not commit the asset changes to the database (by sending a request through API)
before calling the callbacks and listeners (not a bad implementation; those
functions may further modify the task instance so we want to wait until they
are finished), but when you call `get_template_context` in the listener you get
a brand new context object that does not contain changes made during task
execution.
>
> If my understanding is correct, the easiest fix is probably to add a cache
to `get_template_context` so all calls inside a task worker always return the
same context object instead of building new ones.
Have another look into it. I think `get_template_context` refers to the one
in `task_runner.py` which is defined under `sdk/execution_time`. I was looking
into the `taskinstance.py` which is under `airflow-core/src/airflow/models`.
I added a logging to print the context, and found that the two invocations
referring to different OutletEventAccessors. Below is the one when the
"producer" dag is triggered, the OutletEventAccessors object is
**0x7fe889c50fb0**.
```
[2025-08-03, 23:48:37] INFO - DAG bundles loaded: dags-folder:
source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-08-03, 23:48:37] INFO - Filling up the DagBag from
/files/dags/outlet_test_dag.py: source="airflow.models.dagbag.DagBag"
[2025-08-03, 23:48:38] INFO -
execution_time.task_runner.RuntimeTaskInstance.get_template_context::
chan="stdout": source="task"
[2025-08-03, 23:48:38] INFO - {'dag': <DAG: asset-producer>, 'inlets': [],
'map_index_template': None, 'outlets': [Asset(name='test-asset',
uri='test-asset', group='asset', extra={}, watchers=[])], ....,
'outlet_events': <airflow.sdk.execution_time.context.OutletEventAccessors
object at 0x7fe889c50fb0>
```
I think here is when the listener is triggered. The OutletEventAccessors is
**0x7fe888329820**
```
[2025-08-03, 23:48:38] INFO - on_task_instance_success: produce:
source="test_listener"
[2025-08-03, 23:48:38] INFO - on_task_instance_success outlets:
[Asset(name='test-asset', uri='test-asset', group='asset', extra={},
watchers=[])]: source="test_listener"
[2025-08-03, 23:48:38] INFO -
execution_time.task_runner.RuntimeTaskInstance.get_template_context::
chan="stdout": source="task"
[2025-08-03, 23:48:38] INFO - {'dag': <DAG: asset-producer>, 'inlets': [],
'map_index_template': None, 'outlets': [Asset(name='test-asset',
uri='test-asset', group='asset', extra={}, watchers=[])], ... 'outlet_events':
<airflow.sdk.execution_time.context.OutletEventAccessors object at
0x7fe888329820>
```
I attempted to add a simple cache as below, but got the following error.
```python
@functools.cache
def get_template_context(self) -> Context:
...
```
<img width="902" height="252" alt="Image"
src="https://github.com/user-attachments/assets/303ed000-28ee-41d5-affc-6392fe91df0f"
/>
If I understand correctly. I think here the hash key for the cache is
`self`, which is of type `RuntimeTaskInstance`. So, the same task instance
invokes `get_template_context` will return the same context without going
through the logic in the functions to construct the context again. If it is an
expected behavior. Wondering if I could get some hints on the recommended way
to implement the cache.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]