Ferdinanddb commented on issue #55747: URL: https://github.com/apache/airflow/issues/55747#issuecomment-3335613676
For info I have the same problem with the [`SparkKubernetesOperator` from `cncf.kubernetes` provider.](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html#sparkkubernetesoperator). Since I migrated to Airflow 3.x.x, I was not able to run any task in the deferrable mode, and everything was working fine on Airflow 2.11 / 2.10. I suspect that something is off in the way the triggerer is behaving, because my task (Spark job in my case) is succeeding, but the Airflow task fails with the following log (similar to the one mentioned in this issue, hence my comment): ``` AttributeError: 'SparkKubernetesOperator' object has no attribute 'launcher' File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 920 in run File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 1215 in _execute_task File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py", line 1606 in resume_execution File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 956 in trigger_reentry File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 978 in _clean File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 1013 in post_complete_action File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 1056 in cleanup File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 265 in process_pod_deletion AirflowException: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py", line 148, in run state = await self._wait_for_pod_start() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py", line 213, in _wait_for_pod_start pod = await self._get_pod() ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 189, in async_wrapped return await copy(fn, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 111, in __call__ do = await self.iter(retry_state=retry_state) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 153, in iter result = await action(retry_state) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/_utils.py", line 99, in inner return call(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 420, in exc_check raise retry_exc.reraise() ^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 187, in reraise raise self.last_attempt.result() ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in result return self.__get_result() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result raise self._exception File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 114, in __call__ result = await fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py", line 276, in _get_pod pod = await self.hook.get_pod(name=self.pod_name, namespace=self.pod_namespace) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 851, in get_pod async with self.get_conn() as connection: ^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/contextlib.py", line 210, in __aenter__ return await anext(self.gen) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 838, in get_conn kube_client = await self._load_config() or async_client.ApiClient() ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 757, in _load_config in_cluster = self._coalesce_param(self.in_cluster, await self._get_field("in_cluster")) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 828, in _get_field extras = await self.get_conn_extras() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 817, in get_conn_extras self._extras = connection.extra_dejson ^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py", line 449, in extra_dejson return self.get_extra_dejson() ^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py", line 442, in get_extra_dejson mask_secret(extra) File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/secrets_masker.py", line 134, in mask_secret comms.send(MaskSecret(value=secret, name=name)) File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 740, in send return async_to_sync(self.asend)(msg) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py", line 186, in __call__ raise RuntimeError( RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly. ``` Hope that helps! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
