mayorblock opened a new issue, #41706:
URL: https://github.com/apache/airflow/issues/41706

   ### Apache Airflow version
   
   2.10.0
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I try to run a Kubernetes Pod Operator (v8.4.0) with `deferrable=True` (and 
`do_xcom_push=True` which was also an issue in [earlier 
versions](https://github.com/apache/airflow/issues/32458) but not now). I 
define my kubernetes cluster connection in the code using 
`airflow.models.connection.Connection` and supply it via an [environment 
variable](https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#storing-connections-in-environment-variables).
   
   After all containers (init-container, sidecar and main container) have 
completed succesfully I get
   ```python
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 767, in _execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
 line 733, in _execute_callable
       return ExecutionCallableRunner(
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py",
 line 252, in run
       return self.func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py",
 line 1792, in resume_execution
       return execute_callable(context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 773, in trigger_reentry
       raise AirflowException(message)
   airflow.exceptions.AirflowException: Traceback (most recent call last):
     File 
"/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/triggers/pod.py",
 line 162, in run
       state = await self._wait_for_pod_start()
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/triggers/pod.py",
 line 223, in _wait_for_pod_start
       pod = await self.hook.get_pod(self.pod_name, self.pod_namespace)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 747, in get_pod
       async with self.get_conn() as connection:
     File "/usr/local/lib/python3.12/contextlib.py", line 210, in __aenter__
       return await anext(self.gen)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 734, in get_conn
       kube_client = await self._load_config() or async_client.ApiClient()
                     ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 664, in _load_config
       in_cluster = self._coalesce_param(self.in_cluster, await 
self._get_field("in_cluster"))
                                                          
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 724, in _get_field
       extras = await self.get_conn_extras()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 712, in get_conn_extras
       connection = await sync_to_async(self.get_connection)(self.conn_id)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/***/.local/lib/python3.12/site-packages/asgiref/sync.py", line 
468, in __call__
       ret = await asyncio.shield(exec_coro)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.12/concurrent/futures/thread.py", line 58, in 
run
       result = self.fn(*self.args, **self.kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/***/.local/lib/python3.12/site-packages/asgiref/sync.py", line 
522, in thread_handler
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/***/.local/lib/python3.12/site-packages/***/providers/cncf/kubernetes/hooks/kubernetes.py",
 line 169, in get_connection
       return super().get_connection(conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/home/***/.local/lib/python3.12/site-packages/***/hooks/base.py", 
line 83, in get_connection
       conn = Connection.get_connection_from_secrets(conn_id)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/home/***/.local/lib/python3.12/site-packages/***/models/connection.py", line 
537, in get_connection_from_secrets
       raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
   ***.exceptions.AirflowNotFoundException: The conn_id `k8s_conn_id` isn't 
defined
   ```
   
   Now if I:
   - set `deferrable=False` it works with environment variable connection 
   - make an explicit k8s cluster connection in the UI, then `deferrable=True` 
works
   <img width="703" alt="image" 
src="https://github.com/user-attachments/assets/7cb40a9f-d3c0-4371-9672-e92a8a060c9b";>
   
   
   ### What you think should happen instead?
   
   supplying connection via environment variable as described in docs should 
work exactly like connections via UI
   
   ### How to reproduce
   
   make a kubernetes cluster connection 
   ```python
   # create connection
   import os
   from airflow import DAG
   from airflow.operators.bash import BashOperator
   from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
       KubernetesPodOperator,
   )
   
   extra = {
           "kube_config": <kube_config to your cluser>,
           "namespace": <your namespace>,  
           "in_cluster": False,
       }
   conn = Connection(conn_id="k8s_conn_id", conn_type="kubernetes", 
description="k8s connection", extra=extra)
   os.environ[f"AIRFLOW_CONN_{conn.conn_id.upper()}"] = conn.get_uri()
   
   # define minimal DAG with  deferrable=True and do_xcom_push=True
   with DAG( dag_id="k8s-example") as dag:
       k = KubernetesPodOperator(
           kubernetes_conn_id="k8s_conn_id",
           name="k8s-pod",
           cmds=["bash", "-cx"],
           arguments=["mkdir -p /airflow/xcom/;cat /tmp/mnt/hello.txt > 
/airflow/xcom/return.json"],
           task_id="k8s-pod",
           startup_timeout_seconds=1000,
           do_xcom_push=True,
           pod_template_file="k8s_tempalte.yaml"),
           on_finish_action="keep_pod", 
           deferrable=True
       )
   
       b = BashOperator(
           bash_command="echo \"{{ task_instance.xcom_pull('k8s-pod')[0] }}\"",
           task_id="pod_task_xcom_result",
       )
   
       k >> b
   ```
   and the manifest
   ```yaml
   apiVersion: v1
   kind: Pod
   spec:
     initContainers:
       - name: init-container
         image: "ubuntu:latest"
         command: ["bash", "-cx"]
         args: ["echo '[1,2,3,4]' > /tmp/mnt/hello.txt"]
         resources:
           limits:
             cpu: 500m
             memory: 1Gi
           requests:
             cpu: 100m
             memory: 1Gi
         volumeMounts:
           - name: shared-volume
             mountPath: "/tmp/mnt"
     containers:
       - name: base
         image: "ubuntu:latest"
         imagePullPolicy: IfNotPresent
         ports: []
         resources:
           limits:
             cpu: 500m
             memory: 1Gi
           requests:
             cpu: 100m
             memory: 1Gi
         volumeMounts:
           - name: shared-volume
             mountPath: "/tmp/mnt"
     restartPolicy: Never
     volumes:
     - name: shared-volume
       emptyDir: {}
   ```
   
   
   
   ### Operating System
   
   docker
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-cncf-kubernetes==8.4.0
   apache-airflow-providers-microsoft-azure==10.3.0
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   I am running Docker locally with the official 
[docker-compose](https://airflow.apache.org/docs/apache-airflow/2.10.0/docker-compose.yaml)
 and deploying pods on Azure Kubernetes Services
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to