sjyangkevin commented on code in PR #54568:
URL: https://github.com/apache/airflow/pull/54568#discussion_r2312125177


##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -2538,6 +2564,118 @@ def execute(self, context):
         assert listener.state == [TaskInstanceState.RUNNING, 
TaskInstanceState.FAILED]
         assert listener.error == error
 
+    def test_listener_access_outlet_event_on_running_and_success(self, 
mocked_parse, mock_supervisor_comms):
+        """Test listener can access outlet events through invoking 
get_template_context() while task running and success"""
+        listener = self.CustomOutletEventsListener()
+        get_listener_manager().add_listener(listener)
+
+        test_asset = Asset("test-asset")
+        test_key = AssetUniqueKey(name="test-asset", uri="test-asset")
+        test_extra = {"name1": "value1", "nested_obj": {"name2": "value2"}}
+
+        class Producer(BaseOperator):
+            def execute(self, context):
+                outlet_events = context["outlet_events"]
+                outlet_events[test_asset].extra = {"name1": "value1", 
"nested_obj": {"name2": "value2"}}
+
+        task = Producer(
+            
task_id="test_listener_access_outlet_event_on_running_and_success", 
outlets=[test_asset]
+        )
+        dag = get_inline_dag(dag_id="test_dag", task=task)
+        ti = TaskInstance(
+            id=uuid7(),
+            task_id=task.task_id,
+            dag_id=dag.dag_id,
+            run_id="test_run",
+            try_number=1,
+            dag_version_id=uuid7(),
+        )
+
+        runtime_ti = RuntimeTaskInstance.model_construct(
+            **ti.model_dump(exclude_unset=True), task=task, 
start_date=timezone.utcnow()
+        )
+
+        log = mock.MagicMock()
+        context = runtime_ti.get_template_context()
+
+        with mock.patch(
+            
"airflow.sdk.execution_time.task_runner._validate_task_inlets_and_outlets"

Review Comment:
   Hi @Lee-W , I think I kind of figuring out what happens here. Let me restate 
the original issue in the test. When the following patch is not used, the 
listener is not triggered. So, the test will fail at the upcoming assertion 
when looking for an outlet event in the listener's attribute 
`listener.outlet_events`. 
   
https://github.com/apache/airflow/blob/40c2c910ad02be1c6000d65585477f18bb655fa6/task-sdk/tests/task_sdk/execution_time/test_task_runner.py#L2685-L2688
   
   ### Why the listener is not triggered?
   In the task runner, the following exception is always raised during the 
tests because `inactive_assets_resp.inactive_assets` is a mock object, and it 
is always evaluated to `True`. It means that if 
`_validate_task_inlets_and_outlets` is not mock through the patch, the 
exception will always be raised.
   
https://github.com/apache/airflow/blob/40c2c910ad02be1c6000d65585477f18bb655fa6/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L836-L841
   
   I believe the listener invocation 
`get_listener_manager().hook.on_task_instance_running` is not reached when the 
exception is raised.
   
https://github.com/apache/airflow/blob/40c2c910ad02be1c6000d65585477f18bb655fa6/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L815-L823
   
   ### Can we make the test run without using the patch for the validation?
   
   Yes. In this case, what we need to do is to make `inactive_assets` an empty 
list for the supervisor mock fixture. In this case, the exception will not be 
raised, but here we assume the asset in the outlets is an valid asset.
   ```
   mock_supervisor_comms.send.return_value.inactive_assets = []
   ```
   
   Let me know if it is more suitable to use the patch for 
`_validate_task_inlets_and_outlets` or we should update the mock supervisor 
comms. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to