sjyangkevin commented on code in PR #54568:
URL: https://github.com/apache/airflow/pull/54568#discussion_r2299675405
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -2538,6 +2564,118 @@ def execute(self, context):
assert listener.state == [TaskInstanceState.RUNNING,
TaskInstanceState.FAILED]
assert listener.error == error
+ def test_listener_access_outlet_event_on_running_and_success(self,
mocked_parse, mock_supervisor_comms):
+ """Test listener can access outlet events through invoking
get_template_context() while task running and success"""
+ listener = self.CustomOutletEventsListener()
+ get_listener_manager().add_listener(listener)
+
+ test_asset = Asset("test-asset")
+ test_key = AssetUniqueKey(name="test-asset", uri="test-asset")
+ test_extra = {"name1": "value1", "nested_obj": {"name2": "value2"}}
+
+ class Producer(BaseOperator):
+ def execute(self, context):
+ outlet_events = context["outlet_events"]
+ outlet_events[test_asset].extra = {"name1": "value1",
"nested_obj": {"name2": "value2"}}
+
+ task = Producer(
+
task_id="test_listener_access_outlet_event_on_running_and_success",
outlets=[test_asset]
+ )
+ dag = get_inline_dag(dag_id="test_dag", task=task)
+ ti = TaskInstance(
+ id=uuid7(),
+ task_id=task.task_id,
+ dag_id=dag.dag_id,
+ run_id="test_run",
+ try_number=1,
+ dag_version_id=uuid7(),
+ )
+
+ runtime_ti = RuntimeTaskInstance.model_construct(
+ **ti.model_dump(exclude_unset=True), task=task,
start_date=timezone.utcnow()
+ )
+
+ log = mock.MagicMock()
+ context = runtime_ti.get_template_context()
+
+ with mock.patch(
+
"airflow.sdk.execution_time.task_runner._validate_task_inlets_and_outlets"
Review Comment:
Thanks for the insightful feedback!
>As we already know, it's an activated asset (we create it and activate it),
this check doesn't really needed here
Yeah, from my understanding, looks like it is the case. Because we've put
the asset into `outlets` when instantiating the task. This should create an
asset and set that to active.
However, when the patching is not included here, the listener is not
triggered. As shown below, the listener's attribute `listener.outlet_events`
should store the outlet events fetched during task running and at task success,
but an empty list is returned. I also attempted to put some `print` statement
in the listener code, and those are not show up when the patching is not here.
<img width="1385" height="714" alt="Screenshot from 2025-08-25 23-45-11"
src="https://github.com/user-attachments/assets/9dd21bae-59b4-4336-9992-affc7ffb3083"
/>
Probably the issue is not because of the patching. It might be related to
somehow I am not running the task properly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]