mayorblock opened a new issue, #41706: URL: https://github.com/apache/airflow/issues/41706
### Apache Airflow version 2.10.0 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? I try to run a Kubernetes Pod Operator (v8.4.0) with `deferrable=True` (and `do_xcom_push=True` which was also an issue in [earlier versions](https://github.com/apache/airflow/issues/32458) but not now). I define my kubernetes cluster connection in the code using `airflow.models.connection.Connection` and supply it via an [environment variable](https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#storing-connections-in-environment-variables). After all containers (init-container, sidecar and main container) have completed succesfully I get ```python Traceback (most recent call last): File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable return ExecutionCallableRunner( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run return self.func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1792, in resume_execution return execute_callable(context) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 773, in trigger_reentry raise AirflowException(message) airflow.exceptions.AirflowException: Traceback (most recent call last): File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/triggers/pod.py", line 162, in run state = await self._wait_for_pod_start() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/triggers/pod.py", line 223, in _wait_for_pod_start pod = await self.hook.get_pod(self.pod_name, self.pod_namespace) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 747, in get_pod async with self.get_conn() as connection: File "/usr/local/lib/python3.12/contextlib.py", line 210, in __aenter__ return await anext(self.gen) ^^^^^^^^^^^^^^^^^^^^^ File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 734, in get_conn kube_client = await self._load_config() or async_client.ApiClient() ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 664, in _load_config in_cluster = self._coalesce_param(self.in_cluster, await self._get_field("in_cluster")) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 724, in _get_field extras = await self.get_conn_extras() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 712, in get_conn_extras connection = await sync_to_async(self.get_connection)(self.conn_id) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/***/.local/lib/python3.12/site-packages/asgiref/sync.py", line 468, in __call__ ret = await asyncio.shield(exec_coro) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 58, in run result = self.fn(*self.args, **self.kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/***/.local/lib/python3.12/site-packages/asgiref/sync.py", line 522, in thread_handler return func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^ File "/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py", line 169, in get_connection return super().get_connection(conn_id) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/***/.local/lib/python3.12/site-packages/***/hooks/base.py", line 83, in get_connection conn = Connection.get_connection_from_secrets(conn_id) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/***/.local/lib/python3.12/site-packages/***/models/connection.py", line 537, in get_connection_from_secrets raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") ***.exceptions.AirflowNotFoundException: The conn_id `k8s_conn_id` isn't defined ``` Now if I: - set `deferrable=False` it works with environment variable connection - make an explicit k8s cluster connection in the UI, then `deferrable=True` works <img width="703" alt="image" src="https://github.com/user-attachments/assets/7cb40a9f-d3c0-4371-9672-e92a8a060c9b"> ### What you think should happen instead? supplying connection via environment variable as described in docs should work exactly like connections via UI ### How to reproduce make a kubernetes cluster connection ```python # create connection import os from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, ) extra = { "kube_config": <kube_config to your cluser>, "namespace": <your namespace>, "in_cluster": False, } conn = Connection(conn_id="k8s_conn_id", conn_type="kubernetes", description="k8s connection", extra=extra) os.environ[f"AIRFLOW_CONN_{conn.conn_id.upper()}"] = conn.get_uri() # define minimal DAG with deferrable=True and do_xcom_push=True with DAG( dag_id="k8s-example") as dag: k = KubernetesPodOperator( kubernetes_conn_id="k8s_conn_id", name="k8s-pod", cmds=["bash", "-cx"], arguments=["mkdir -p /airflow/xcom/;cat /tmp/mnt/hello.txt > /airflow/xcom/return.json"], task_id="k8s-pod", startup_timeout_seconds=1000, do_xcom_push=True, pod_template_file="k8s_tempalte.yaml"), on_finish_action="keep_pod", deferrable=True ) b = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('k8s-pod')[0] }}\"", task_id="pod_task_xcom_result", ) k >> b ``` and the manifest ```yaml apiVersion: v1 kind: Pod spec: initContainers: - name: init-container image: "ubuntu:latest" command: ["bash", "-cx"] args: ["echo '[1,2,3,4]' > /tmp/mnt/hello.txt"] resources: limits: cpu: 500m memory: 1Gi requests: cpu: 100m memory: 1Gi volumeMounts: - name: shared-volume mountPath: "/tmp/mnt" containers: - name: base image: "ubuntu:latest" imagePullPolicy: IfNotPresent ports: [] resources: limits: cpu: 500m memory: 1Gi requests: cpu: 100m memory: 1Gi volumeMounts: - name: shared-volume mountPath: "/tmp/mnt" restartPolicy: Never volumes: - name: shared-volume emptyDir: {} ``` ### Operating System docker ### Versions of Apache Airflow Providers apache-airflow-providers-cncf-kubernetes==8.4.0 apache-airflow-providers-microsoft-azure==10.3.0 ### Deployment Docker-Compose ### Deployment details I am running Docker locally with the official [docker-compose](https://airflow.apache.org/docs/apache-airflow/2.10.0/docker-compose.yaml) and deploying pods on Azure Kubernetes Services ### Anything else? _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
