o-nikolas commented on PR #54692: URL: https://github.com/apache/airflow/pull/54692#issuecomment-3251207277
I've done a lot of deep diving and I think I've identified what's happening here. So the above exception is actually coming from the reentry point when we're returning from the triggerer. Just before that exception in the logs we see: ``` [2025-09-03 19:16:03,685] {pod.py:909} ERROR - Trigger emitted an error event, failing the task: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py) [2025-09-03T19:16:03.685+0000] {pod.py:909} ERROR - Trigger emitted an error event, failing the task: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py) [2025-09-03T19:16:20.896+0000] {pod_manager.py:765} INFO - Pod run-pod-xxeffbtb has phase Pending [2025-09-03T19:16:28.631+0000] {pod_manager.py:765} INFO - Pod run-pod-xxeffbtb has phase Pending [2025-09-03T19:16:36.345+0000] {pod_manager.py:765} INFO - Pod run-pod-xxeffbtb has phase Pending [2025-09-03T19:16:44.060+0000] {pod_manager.py:765} INFO - Pod run-pod-xxeffbtb has phase Pending [2025-09-03 19:17:03,348] {pod.py:1181} INFO - Skipping deleting pod: run-pod-xxeffbtb [2025-09-03T19:17:03.348+0000] {pod.py:1181} INFO - Skipping deleting pod: run-pod-xxeffbtb 2025-09-03 19:17:03 [error ] Task failed with exception [task] ``` And I see successful communication with task API comms above that. So it's actually within the Triggerer scope that we're failing to communicate with task api to get connection information. This is because in the dag.test() world, we run triggerers inline [here](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/dag.py#L1376) and we do not setup the inprocess supervisor comms like we do for regular task execution in dag.test() [here](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/supervisor.py#L1574-L1577). Triggers definitely need a supervisor comms setup, you can see we do that in the normal case (i.e. not dag.test()) [here](https://github.com/apache/airflow/blob/f1e9e522fb81894c1e5ab8bf2eba1269011fd757/airflow-core/src/airflow/jobs/triggerer_job_runner.py#L905-L910). So I believe we need to update the inline execution of triggers in dag.test() to setup a supervisor comms. I have a draft PR here with what I think that might look like. But likely needs refinement: #55236 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org