CincyBC opened a new issue, #55198:
URL: https://github.com/apache/airflow/issues/55198

   ### Apache Airflow version
   
   3.0.6
   
   ### If "Other Airflow 2 version" selected, which one?
   
   N/A
   
   ### What happened?
   
   This 
[line](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/triggerer_job_runner.py#L774)
 is failing with the KubernetesPodOperator when it's deferred when it tries to 
mask the extra_dejson on the async_to_sync function. Specifically, `You cannot 
use AsyncToSync in the same thread as an async event loop - just await the 
async function directly.` It is most likely related to the code @ashb pushed 
with this 
[PR](https://github.com/apache/airflow/pull/54574/files#diff-8242f681730abed9fe83b6fcf221a3dfcb8c29bc1c9b3334f853edd4763c222c).
 See the full error message in the log below. 
   
   You can see my example dag below where I have the same exact 
KubernetesPodOperator configuration except making one deferrable and one not 
and only the deferrable one fails (as expected from the error). I also tried 
this on multiple versions of the CNCF-Kubernetes provider from versions 10.4.3 
to 10.7.0 to ensure it wasn't anything in particular with a recent change to 
the provider. My dags did not have this issue on Airflow 3.0.4 with the 
CNCF-Kubernetes provider pinned at 10.4.3.
   
   Full Error Message
   ```
   [2025-09-02, 21:40:38] INFO - DAG bundles loaded: dags-folder: 
source="airflow.dag_processing.bundles.manager.DagBundlesManager"
   [2025-09-02, 21:40:38] INFO - Filling up the DagBag from 
/opt/airflow/dags/k8sexample.py: source="airflow.models.dagbag.DagBag"
   [2025-09-02, 21:40:45] WARNING - 
/home/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py:471:
 DeprecationWarning: Using Connection.get_connection_from_secrets from 
`airflow.models` is deprecated.Please use `get` on Connection from 
sdk(`airflow.sdk.Connection`) instead
     warnings.warn(
   : source="py.warnings"
   [2025-09-02, 21:40:45] INFO - Connection Retrieved 'kubernetes_default': 
source="airflow.hooks.base"
   [2025-09-02, 21:40:45] INFO - Building pod hello-dry-run-vjinmuwx with 
labels: {'dag_id': 'k8s_example', 'task_id': 'dry_run_demo', 'run_id': 
'manual__2025-09-03T014028.9013550000-082a6acc9', 'kubernetes_pod_operator': 
'True', 'try_number': '1'}: 
source="airflow.task.operators.airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator"
   [2025-09-02, 21:40:45] INFO - Pausing task as DEFERRED. : 
dag_id="k8s_example": task_id="dry_run_demo": 
run_id="manual__2025-09-03T01:40:28.901355+00:00": source="task"
   [2025-09-02, 21:40:49] INFO - trigger 
k8s_example/manual__2025-09-03T01:40:28.901355+00:00/dry_run_demo/-1/1 (ID 515) 
starting
   [2025-09-02, 21:40:49] INFO - Checking pod 'hello-dry-run-vjinmuwx' in 
namespace 'airflow'.: 
source="airflow.providers.cncf.kubernetes.triggers.pod.KubernetesPodTrigger"
   [2025-09-02, 21:40:49] WARNING - Using 
Connection.get_connection_from_secrets from `airflow.models` is 
deprecated.Please use `get` on Connection from sdk(`airflow.sdk.Connection`) 
instead: category="DeprecationWarning": 
filename="/home/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py":
 lineno=471: source="py.warnings"
   [2025-09-02, 21:40:52] INFO - Connection Retrieved 'kubernetes_default': 
source="airflow.hooks.base"
   [2025-09-02, 21:40:53] INFO - Connection Retrieved 'kubernetes_default': 
source="airflow.hooks.base"
   [2025-09-02, 21:40:55] INFO - Connection Retrieved 'kubernetes_default': 
source="airflow.hooks.base"
   [2025-09-02, 21:40:55] INFO - Trigger fired event: 
name="k8s_example/manual__2025-09-03T01:40:28.901355+00:00/dry_run_demo/-1/1 
(ID 515)": result="TriggerEvent<{'name': 'hello-dry-run-vjinmuwx', 'namespace': 
'airflow', 'status': 'error', 'message': 'You cannot use AsyncToSync in the 
same thread as an async event loop - just await the async function directly.', 
'stack_trace': 'Traceback (most recent call last):\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py\",
 line 148, in run\\n    state = await self._wait_for_pod_start()\\n            
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py\",
 line 213, in _wait_for_pod_start\\n    pod = await self._get_pod()\\n          
^^^^^^^^^^^^^^^^^^^^^\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py\",
 line 189, in async_wrapped\\n    return await co
 py(fn, *args, **kwargs)\\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py\",
 line 111, in __call__\\n    do = await self.iter(retry_state=retry_state)\\n   
      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py\",
 line 153, in iter\\n    result = await action(retry_state)\\n             
^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/tenacity/_utils.py\", line 
99, in inner\\n    return call(*args, **kwargs)\\n           
^^^^^^^^^^^^^^^^^^^^^\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py\", 
line 420, in exc_check\\n    raise retry_exc.reraise()\\n          
^^^^^^^^^^^^^^^^^^^\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py\", 
line 187, in reraise\\n    raise self.last_attempt.result()\\n          
^^^^^^^^^^^^^^^^
 ^^^^^^^^^^\\n  File \"/usr/local/lib/python3.12/concurrent/futures/_base.py\", 
line 449, in result\\n    return self.__get_result()\\n           
^^^^^^^^^^^^^^^^^^^\\n  File 
\"/usr/local/lib/python3.12/concurrent/futures/_base.py\", line 401, in 
__get_result\\n    raise self._exception\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py\",
 line 114, in __call__\\n    result = await fn(*args, **kwargs)\\n             
^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py\",
 line 276, in _get_pod\\n    pod = await self.hook.get_pod(name=self.pod_name, 
namespace=self.pod_namespace)\\n          
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  
File 
\"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py\",
 line 851, in get_pod\\n    async with self.get_conn() as connection:\\n        
       ^
 ^^^^^^^^^^^^^^\\n  File \"/usr/local/lib/python3.12/contextlib.py\", line 210, 
in __aenter__\\n    return await anext(self.gen)\\n           
^^^^^^^^^^^^^^^^^^^^^\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py\",
 line 838, in get_conn\\n    kube_client = await self._load_config() or 
async_client.ApiClient()\\n                  ^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py\",
 line 757, in _load_config\\n    in_cluster = 
self._coalesce_param(self.in_cluster, await self._get_field(\"in_cluster\"))\\n 
                                                      
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py\",
 line 828, in _get_field\\n    extras = await self.get_conn_extras()\\n         
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File \"
 
/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py\",
 line 817, in get_conn_extras\\n    self._extras = connection.extra_dejson\\n   
                ^^^^^^^^^^^^^^^^^^^^^^^\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/definitions/connection.py\",
 line 162, in extra_dejson\\n    mask_secret(extra)\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/secrets_masker.py\",
 line 134, in mask_secret\\n    comms.send(MaskSecret(value=secret, 
name=name))\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py\",
 line 740, in send\\n    return async_to_sync(self.asend)(msg)\\n           
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File 
\"/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py\", line 
186, in __call__\\n    raise RuntimeError(\\nRuntimeError: You cannot use 
AsyncToSync in the same thread as an async event loop - just awa
 it the async function directly.\\n'}>"
   [2025-09-02, 21:40:55] INFO - trigger completed: 
name="k8s_example/manual__2025-09-03T01:40:28.901355+00:00/dry_run_demo/-1/1 
(ID 515)"
   [2025-09-02, 21:40:58] INFO - DAG bundles loaded: dags-folder: 
source="airflow.dag_processing.bundles.manager.DagBundlesManager"
   [2025-09-02, 21:40:58] INFO - Filling up the DagBag from 
/opt/airflow/dags/k8sexample.py: source="airflow.models.dagbag.DagBag"
   [2025-09-02, 21:40:59] WARNING - 
/home/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py:471:
 DeprecationWarning: Using Connection.get_connection_from_secrets from 
`airflow.models` is deprecated.Please use `get` on Connection from 
sdk(`airflow.sdk.Connection`) instead
     warnings.warn(
   : source="py.warnings"
   [2025-09-02, 21:40:59] INFO - Connection Retrieved 'kubernetes_default': 
source="airflow.hooks.base"
   [2025-09-02, 21:40:59] ERROR - Trigger emitted an error event, failing the 
task: You cannot use AsyncToSync in the same thread as an async event loop - 
just await the async function directly.: 
source="airflow.task.operators.airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator"
   [2025-09-02, 21:40:59] INFO - [base] logs: + echo: 
source="airflow.task.operators.airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator"
   [2025-09-02, 21:40:59] INFO - Running command... if [ -s 
/airflow/xcom/return.json ]; then cat /airflow/xcom/return.json; else echo 
__airflow_xcom_result_empty__; fi: 
source="airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager"
   [2025-09-02, 21:40:59] INFO - Running command... kill -2 $(pgrep -u $(id -u) 
-f 'sh'): 
source="airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager"
   [2025-09-02, 21:40:59] INFO - xcom result file is empty.: 
source="airflow.task.operators.airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator"
   [2025-09-02, 21:41:00] INFO - Pod hello-dry-run-vjinmuwx has phase Running: 
source="airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager"
   [2025-09-02, 21:41:02] INFO - Deleting pod: hello-dry-run-vjinmuwx: 
source="airflow.task.operators.airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator"
   [2025-09-02, 21:41:02] ERROR - Task failed with exception: source="task"
   AirflowException: Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py",
 line 148, in run
       state = await self._wait_for_pod_start()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py",
 line 213, in _wait_for_pod_start
       pod = await self._get_pod()
             ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py",
 line 189, in async_wrapped
       return await copy(fn, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py",
 line 111, in __call__
       do = await self.iter(retry_state=retry_state)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py",
 line 153, in iter
       result = await action(retry_state)
                ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/_utils.py", line 
99, in inner
       return call(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
420, in exc_check
       raise retry_exc.reraise()
             ^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 
187, in reraise
       raise self.last_attempt.result()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in 
result
       return self.__get_result()
              ^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in 
__get_result
       raise self._exception
     File 
"/home/airflow/.local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py",
 line 114, in __call__
       result = await fn(*args, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py",
 line 276, in _get_pod
       pod = await self.hook.get_pod(name=self.pod_name, 
namespace=self.pod_namespace)
             
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 851, in get_pod
       async with self.get_conn() as connection:
                  ^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/contextlib.py", line 210, in __aenter__
       return await anext(self.gen)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 838, in get_conn
       kube_client = await self._load_config() or async_client.ApiClient()
                     ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 757, in _load_config
       in_cluster = self._coalesce_param(self.in_cluster, await 
self._get_field("in_cluster"))
                                                          
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 828, in _get_field
       extras = await self.get_conn_extras()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 817, in get_conn_extras
       self._extras = connection.extra_dejson
                      ^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/definitions/connection.py",
 line 162, in extra_dejson
       mask_secret(extra)
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/secrets_masker.py",
 line 134, in mask_secret
       comms.send(MaskSecret(value=secret, name=name))
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 740, in send
       return async_to_sync(self.asend)(msg)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/airflow/.local/lib/python3.12/site-packages/asgiref/sync.py", 
line 186, in __call__
       raise RuntimeError(
   RuntimeError: You cannot use AsyncToSync in the same thread as an async 
event loop - just await the async function directly.
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 920 in run
   
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 1215 in _execute_task
   
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py",
 line 1606 in resume_execution
   
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 906 in trigger_reentry
   ```
   
   ### What you think should happen instead?
   
   After the task completes and the pod is deleted, it should exit gracefully 
without error.
   
   ### How to reproduce
   
   I created this dag/Dag/DAG to test if the `deferrable=False` works properly 
and it did. Only `deferrable=True` fails.
   
   ```
   from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
   from airflow.sdk import dag
   from airflow.providers.standard.operators.empty import EmptyOperator
   from datetime import datetime
   
   
   @dag(dag_id="k8s_example", schedule=None, start_date=datetime(2025, 9, 2))
   def k8s_example_dag():
       start_task = EmptyOperator(task_id="start")
       end_task = EmptyOperator(task_id="end")
       k = KubernetesPodOperator(
           name="hello-dry-run",
           image="debian",
           cmds=["bash", "-cx"],
           arguments=["echo", "10"],
           labels={"foo": "bar"},
           task_id="dry_run_demo",
           do_xcom_push=True,
           on_finish_action="delete_pod",
           in_cluster=True,
           namespace="airflow",
           get_logs=True,
           deferrable=True,
           random_name_suffix=True,
       )
   
       k2 = KubernetesPodOperator(
           name="hello-dry-run-2",
           image="debian",
           cmds=["bash", "-cx"],
           arguments=["echo", "10"],
           labels={"foo": "bar"},
           task_id="dry_run_demo_2",
           do_xcom_push=True,
           on_finish_action="delete_pod",
           in_cluster=True,
           namespace="airflow",
           get_logs=True,
           deferrable=False,
           random_name_suffix=True,
       )
       start_task >> [k, k2] >> end_task
   
   
   k8s_example_dag()
   ```
   
   ### Operating System
   
   Ubuntu
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==9.12.0
   apache-airflow-providers-celery==3.12.2
   apache-airflow-providers-cncf-kubernetes==10.7.0 [Also Failed in lower 
versions down to 10.4.3]
   apache-airflow-providers-common-compat==1.7.3
   apache-airflow-providers-common-io==1.6.2
   apache-airflow-providers-common-sql==1.27.5
   apache-airflow-providers-fab==2.4.1
   apache-airflow-providers-http==5.3.3
   apache-airflow-providers-openlineage==2.6.1
   apache-airflow-providers-postgres==6.2.3
   apache-airflow-providers-redis==4.2.0
   apache-airflow-providers-slack==9.1.4
   apache-airflow-providers-smtp==2.2.0
   apache-airflow-providers-standard==1.6.0
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   Dags are baked into the image and deployed with the official Airflow Helm 
Chart with ArgoCD
   
   ### Anything else?
   
   Every KubernetesPodOperator task with deferrable=True in my dags fail
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to