o-nikolas commented on PR #54692:
URL: https://github.com/apache/airflow/pull/54692#issuecomment-3251207277

   I've done a lot of deep diving and I think I've identified what's happening 
here. So the above exception is actually coming from the reentry point when 
we're returning from the triggerer. Just before that exception in the logs we 
see:
   ```
   [2025-09-03 19:16:03,685] {pod.py:909} ERROR - Trigger emitted an error 
event, failing the task: cannot import name 'SUPERVISOR_COMMS' from 
'airflow.sdk.execution_time.task_runner' 
(/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py)
   [2025-09-03T19:16:03.685+0000] {pod.py:909} ERROR - Trigger emitted an error 
event, failing the task: cannot import name 'SUPERVISOR_COMMS' from 
'airflow.sdk.execution_time.task_runner' 
(/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py)
   [2025-09-03T19:16:20.896+0000] {pod_manager.py:765} INFO - Pod 
run-pod-xxeffbtb has phase Pending
   [2025-09-03T19:16:28.631+0000] {pod_manager.py:765} INFO - Pod 
run-pod-xxeffbtb has phase Pending
   [2025-09-03T19:16:36.345+0000] {pod_manager.py:765} INFO - Pod 
run-pod-xxeffbtb has phase Pending
   [2025-09-03T19:16:44.060+0000] {pod_manager.py:765} INFO - Pod 
run-pod-xxeffbtb has phase Pending
   [2025-09-03 19:17:03,348] {pod.py:1181} INFO - Skipping deleting pod: 
run-pod-xxeffbtb
   [2025-09-03T19:17:03.348+0000] {pod.py:1181} INFO - Skipping deleting pod: 
run-pod-xxeffbtb
   2025-09-03 19:17:03 [error    ] Task failed with exception     [task]
   ```
   
   And I see successful communication with task API comms above that. So it's 
actually within the Triggerer scope that we're failing to communicate with task 
api to get connection information.
   
   This is because in the dag.test() world, we run triggerers inline 
[here](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/dag.py#L1376)
 and we do not setup the inprocess supervisor comms like we do for regular task 
execution in dag.test() 
[here](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/supervisor.py#L1574-L1577).
   Triggers definitely need a supervisor comms setup, you can see we do that in 
the normal case (i.e. not dag.test()) 
[here](https://github.com/apache/airflow/blob/f1e9e522fb81894c1e5ab8bf2eba1269011fd757/airflow-core/src/airflow/jobs/triggerer_job_runner.py#L905-L910).
   
   So I believe we need to update the inline execution of triggers in 
dag.test() to setup a supervisor comms.
   
   I have a draft PR here with what I think that might look like. But likely 
needs refinement: #55236


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to