CincyBC opened a new issue, #55198: URL: https://github.com/apache/airflow/issues/55198
### Apache Airflow version 3.0.6 ### If "Other Airflow 2 version" selected, which one? N/A ### What happened? This [line](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/triggerer_job_runner.py#L774) is failing with the KubernetesPodOperator when it's deferred when it tries to mask the extra_dejson on the async_to_sync function. Specifically, `You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.` It is most likely related to the code @ashb pushed with this [PR](https://github.com/apache/airflow/pull/54574/files#diff-8242f681730abed9fe83b6fcf221a3dfcb8c29bc1c9b3334f853edd4763c222c). See the full error message in the log below. You can see my example dag below where I have the same exact KubernetesPodOperator configuration except making one deferrable and one not and only the deferrable one fails (as expected from the error). I also tried this on multiple versions of the CNCF-Kubernetes provider from versions 10.4.3 to 10.7.0 to ensure it wasn't anything in particular with a recent change to the provider. My dags did not have this issue on Airflow 3.0.4 with the CNCF-Kubernetes provider pinned at 10.4.3. Full Error Message ``` [2025-09-02, 21:40:38] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager" [2025-09-02, 21:40:38] INFO - Filling up the DagBag from /opt/airflow/dags/k8sexample.py: source="airflow.models.dagbag.DagBag" [2025-09-02, 21:40:45] WARNING - /home/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py:471: DeprecationWarning: Using Connection.get_connection_from_secrets from `airflow.models` is deprecated.Please use `get` on Connection from sdk(`airflow.sdk.Connection`) instead warnings.warn( : source="py.warnings" [2025-09-02, 21:40:45] INFO - Connection Retrieved 'kubernetes_default': source="airflow.hooks.base" [2025-09-02, 21:40:45] INFO - Building pod hello-dry-run-vjinmuwx with labels: {'dag_id': 'k8s_example', 'task_id': 'dry_run_demo', 'run_id': 'manual__2025-09-03T014028.9013550000-082a6acc9', 'kubernetes_pod_operator': 'True', 'try_number': '1'}: source="airflow.task.operators.airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator" [2025-09-02, 21:40:45] INFO - Pausing task as DEFERRED. : dag_id="k8s_example": task_id="dry_run_demo": run_id="manual__2025-09-03T01:40:28.901355+00:00": source="task" [2025-09-02, 21:40:49] INFO - trigger k8s_example/manual__2025-09-03T01:40:28.901355+00:00/dry_run_demo/-1/1 (ID 515) starting [2025-09-02, 21:40:49] INFO - Checking pod 'hello-dry-run-vjinmuwx' in namespace 'airflow'.: source="airflow.providers.cncf.kubernetes.triggers.pod.KubernetesPodTrigger" [2025-09-02, 21:40:49] WARNING - Using Connection.get_connection_from_secrets from `airflow.models` is deprecated.Please use `get` on Connection from sdk(`airflow.sdk.Connection`) instead: category="DeprecationWarning": filename="/home/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py": lineno=471: source="py.warnings" [2025-09-02, 21:40:52] INFO - Connection Retrieved 'kubernetes_default': source="airflow.hooks.base" [2025-09-02, 21:40:53] INFO - Connection Retrieved 'kubernetes_default': source="airflow.hooks.base" [2025-09-02, 21:40:55] INFO - Connection Retrieved 'kubernetes_default': source="airflow.hooks.base" [2025-09-02, 21:40:55] INFO - Trigger fired event: name="k8s_example/manual__2025-09-03T01:40:28.901355+00:00/dry_run_demo/-1/1 (ID 515)": result="TriggerEvent<{'name': 'hello-dry-run-vjinmuwx', 'namespace': 'airflow', 'status': 'error', 'message': 'You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.', 'stack_trace': 'Traceback (most recent call last):\\n File \"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py\", line 148, in run\\n state = await self._wait_for_pod_start()\\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n File \"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py\", line 213, in _wait_for_pod_start\\n pod = await self._get_pod()\\n ^^^^^^^^^^^^^^^^^^^^^\\n File \"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py\", line 189, in async_wrapped\\n return await co py(fn, *args, **kwargs)\\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n File \"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py\", line 111, in __call__\\n do = await self.iter(retry_state=retry_state)\\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n File \"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py\", line 153, in iter\\n result = await action(retry_state)\\n ^^^^^^^^^^^^^^^^^^^^^^^^^\\n File \"/home/airflow/.local/lib/python3.12/site-packages/tenacity/_utils.py\", line 99, in inner\\n return call(*args, **kwargs)\\n ^^^^^^^^^^^^^^^^^^^^^\\n File \"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py\", line 420, in exc_check\\n raise retry_exc.reraise()\\n ^^^^^^^^^^^^^^^^^^^\\n File \"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py\", line 187, in reraise\\n raise self.last_attempt.result()\\n ^^^^^^^^^^^^^^^^ ^^^^^^^^^^\\n File \"/usr/local/lib/python3.12/concurrent/futures/_base.py\", line 449, in result\\n return self.__get_result()\\n ^^^^^^^^^^^^^^^^^^^\\n File \"/usr/local/lib/python3.12/concurrent/futures/_base.py\", line 401, in __get_result\\n raise self._exception\\n File \"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py\", line 114, in __call__\\n result = await fn(*args, **kwargs)\\n ^^^^^^^^^^^^^^^^^^^^^^^^^\\n File \"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py\", line 276, in _get_pod\\n pod = await self.hook.get_pod(name=self.pod_name, namespace=self.pod_namespace)\\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n File \"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py\", line 851, in get_pod\\n async with self.get_conn() as connection:\\n ^ ^^^^^^^^^^^^^^\\n File \"/usr/local/lib/python3.12/contextlib.py\", line 210, in __aenter__\\n return await anext(self.gen)\\n ^^^^^^^^^^^^^^^^^^^^^\\n File \"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py\", line 838, in get_conn\\n kube_client = await self._load_config() or async_client.ApiClient()\\n ^^^^^^^^^^^^^^^^^^^^^^^^^\\n File \"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py\", line 757, in _load_config\\n in_cluster = self._coalesce_param(self.in_cluster, await self._get_field(\"in_cluster\"))\\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n File \"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py\", line 828, in _get_field\\n extras = await self.get_conn_extras()\\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n File \" /home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py\", line 817, in get_conn_extras\\n self._extras = connection.extra_dejson\\n ^^^^^^^^^^^^^^^^^^^^^^^\\n File \"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/definitions/connection.py\", line 162, in extra_dejson\\n mask_secret(extra)\\n File \"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/secrets_masker.py\", line 134, in mask_secret\\n comms.send(MaskSecret(value=secret, name=name))\\n File \"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py\", line 740, in send\\n return async_to_sync(self.asend)(msg)\\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n File \"/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py\", line 186, in __call__\\n raise RuntimeError(\\nRuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just awa it the async function directly.\\n'}>" [2025-09-02, 21:40:55] INFO - trigger completed: name="k8s_example/manual__2025-09-03T01:40:28.901355+00:00/dry_run_demo/-1/1 (ID 515)" [2025-09-02, 21:40:58] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager" [2025-09-02, 21:40:58] INFO - Filling up the DagBag from /opt/airflow/dags/k8sexample.py: source="airflow.models.dagbag.DagBag" [2025-09-02, 21:40:59] WARNING - /home/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py:471: DeprecationWarning: Using Connection.get_connection_from_secrets from `airflow.models` is deprecated.Please use `get` on Connection from sdk(`airflow.sdk.Connection`) instead warnings.warn( : source="py.warnings" [2025-09-02, 21:40:59] INFO - Connection Retrieved 'kubernetes_default': source="airflow.hooks.base" [2025-09-02, 21:40:59] ERROR - Trigger emitted an error event, failing the task: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.: source="airflow.task.operators.airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator" [2025-09-02, 21:40:59] INFO - [base] logs: + echo: source="airflow.task.operators.airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator" [2025-09-02, 21:40:59] INFO - Running command... if [ -s /airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo __airflow_xcom_result_empty__; fi: source="airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager" [2025-09-02, 21:40:59] INFO - Running command... kill -2 $(pgrep -u $(id -u) -f 'sh'): source="airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager" [2025-09-02, 21:40:59] INFO - xcom result file is empty.: source="airflow.task.operators.airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator" [2025-09-02, 21:41:00] INFO - Pod hello-dry-run-vjinmuwx has phase Running: source="airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager" [2025-09-02, 21:41:02] INFO - Deleting pod: hello-dry-run-vjinmuwx: source="airflow.task.operators.airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator" [2025-09-02, 21:41:02] ERROR - Task failed with exception: source="task" AirflowException: Traceback (most recent call last): File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py", line 148, in run state = await self._wait_for_pod_start() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py", line 213, in _wait_for_pod_start pod = await self._get_pod() ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 189, in async_wrapped return await copy(fn, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 111, in __call__ do = await self.iter(retry_state=retry_state) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 153, in iter result = await action(retry_state) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/_utils.py", line 99, in inner return call(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 420, in exc_check raise retry_exc.reraise() ^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 187, in reraise raise self.last_attempt.result() ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in result return self.__get_result() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result raise self._exception File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line 114, in __call__ result = await fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py", line 276, in _get_pod pod = await self.hook.get_pod(name=self.pod_name, namespace=self.pod_namespace) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 851, in get_pod async with self.get_conn() as connection: ^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/contextlib.py", line 210, in __aenter__ return await anext(self.gen) ^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 838, in get_conn kube_client = await self._load_config() or async_client.ApiClient() ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 757, in _load_config in_cluster = self._coalesce_param(self.in_cluster, await self._get_field("in_cluster")) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 828, in _get_field extras = await self.get_conn_extras() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 817, in get_conn_extras self._extras = connection.extra_dejson ^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/definitions/connection.py", line 162, in extra_dejson mask_secret(extra) File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/secrets_masker.py", line 134, in mask_secret comms.send(MaskSecret(value=secret, name=name)) File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 740, in send return async_to_sync(self.asend)(msg) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py", line 186, in __call__ raise RuntimeError( RuntimeError: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly. File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 920 in run File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 1215 in _execute_task File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py", line 1606 in resume_execution File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 906 in trigger_reentry ``` ### What you think should happen instead? After the task completes and the pod is deleted, it should exit gracefully without error. ### How to reproduce I created this dag/Dag/DAG to test if the `deferrable=False` works properly and it did. Only `deferrable=True` fails. ``` from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from airflow.sdk import dag from airflow.providers.standard.operators.empty import EmptyOperator from datetime import datetime @dag(dag_id="k8s_example", schedule=None, start_date=datetime(2025, 9, 2)) def k8s_example_dag(): start_task = EmptyOperator(task_id="start") end_task = EmptyOperator(task_id="end") k = KubernetesPodOperator( name="hello-dry-run", image="debian", cmds=["bash", "-cx"], arguments=["echo", "10"], labels={"foo": "bar"}, task_id="dry_run_demo", do_xcom_push=True, on_finish_action="delete_pod", in_cluster=True, namespace="airflow", get_logs=True, deferrable=True, random_name_suffix=True, ) k2 = KubernetesPodOperator( name="hello-dry-run-2", image="debian", cmds=["bash", "-cx"], arguments=["echo", "10"], labels={"foo": "bar"}, task_id="dry_run_demo_2", do_xcom_push=True, on_finish_action="delete_pod", in_cluster=True, namespace="airflow", get_logs=True, deferrable=False, random_name_suffix=True, ) start_task >> [k, k2] >> end_task k8s_example_dag() ``` ### Operating System Ubuntu ### Versions of Apache Airflow Providers apache-airflow-providers-amazon==9.12.0 apache-airflow-providers-celery==3.12.2 apache-airflow-providers-cncf-kubernetes==10.7.0 [Also Failed in lower versions down to 10.4.3] apache-airflow-providers-common-compat==1.7.3 apache-airflow-providers-common-io==1.6.2 apache-airflow-providers-common-sql==1.27.5 apache-airflow-providers-fab==2.4.1 apache-airflow-providers-http==5.3.3 apache-airflow-providers-openlineage==2.6.1 apache-airflow-providers-postgres==6.2.3 apache-airflow-providers-redis==4.2.0 apache-airflow-providers-slack==9.1.4 apache-airflow-providers-smtp==2.2.0 apache-airflow-providers-standard==1.6.0 ### Deployment Official Apache Airflow Helm Chart ### Deployment details Dags are baked into the image and deployed with the official Airflow Helm Chart with ArgoCD ### Anything else? Every KubernetesPodOperator task with deferrable=True in my dags fail ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org