sjyangkevin commented on code in PR #54568:
URL: https://github.com/apache/airflow/pull/54568#discussion_r2312125177
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -2538,6 +2564,118 @@ def execute(self, context):
assert listener.state == [TaskInstanceState.RUNNING,
TaskInstanceState.FAILED]
assert listener.error == error
+ def test_listener_access_outlet_event_on_running_and_success(self,
mocked_parse, mock_supervisor_comms):
+ """Test listener can access outlet events through invoking
get_template_context() while task running and success"""
+ listener = self.CustomOutletEventsListener()
+ get_listener_manager().add_listener(listener)
+
+ test_asset = Asset("test-asset")
+ test_key = AssetUniqueKey(name="test-asset", uri="test-asset")
+ test_extra = {"name1": "value1", "nested_obj": {"name2": "value2"}}
+
+ class Producer(BaseOperator):
+ def execute(self, context):
+ outlet_events = context["outlet_events"]
+ outlet_events[test_asset].extra = {"name1": "value1",
"nested_obj": {"name2": "value2"}}
+
+ task = Producer(
+
task_id="test_listener_access_outlet_event_on_running_and_success",
outlets=[test_asset]
+ )
+ dag = get_inline_dag(dag_id="test_dag", task=task)
+ ti = TaskInstance(
+ id=uuid7(),
+ task_id=task.task_id,
+ dag_id=dag.dag_id,
+ run_id="test_run",
+ try_number=1,
+ dag_version_id=uuid7(),
+ )
+
+ runtime_ti = RuntimeTaskInstance.model_construct(
+ **ti.model_dump(exclude_unset=True), task=task,
start_date=timezone.utcnow()
+ )
+
+ log = mock.MagicMock()
+ context = runtime_ti.get_template_context()
+
+ with mock.patch(
+
"airflow.sdk.execution_time.task_runner._validate_task_inlets_and_outlets"
Review Comment:
Hi @Lee-W , I think I kind of figuring out what happens here. Let me restate
the original issue in the test. When the following patch is not used, the
listener is not triggered. So, the test will fail at the upcoming assertion
when looking for an outlet event in the listener's attribute
`listener.outlet_events`.
https://github.com/apache/airflow/blob/40c2c910ad02be1c6000d65585477f18bb655fa6/task-sdk/tests/task_sdk/execution_time/test_task_runner.py#L2685-L2688
### Why the listener is not triggered?
In the task runner, the following exception is always raised during the
tests because `inactive_assets_resp.inactive_assets` is a mock object, and it
is always evaluated to `True`. It means that if
`_validate_task_inlets_and_outlets` is not mock through the patch, the
exception will always be raised.
https://github.com/apache/airflow/blob/40c2c910ad02be1c6000d65585477f18bb655fa6/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L836-L841
I believe the listener invocation
`get_listener_manager().hook.on_task_instance_running` is not reached when the
exception is raised.
https://github.com/apache/airflow/blob/40c2c910ad02be1c6000d65585477f18bb655fa6/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L815-L823
### Can we make the test run without using the patch for the validation?
Yes. In this case, what we need to do is to make `inactive_assets` an empty
list for the supervisor mock fixture. In this case, the exception will not be
raised, but here we assume the asset in the outlets is an valid asset.
```
mock_supervisor_comms.send.return_value.inactive_assets = []
```
Let me know if it is more suitable to use the patch for
`_validate_task_inlets_and_outlets` or we should update the mock supervisor
comms. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]