Vasu-Madaan opened a new issue, #60943:
URL: https://github.com/apache/airflow/issues/60943

   ### Apache Airflow version
   
   2.11.X
   
   ### If "Other Airflow 3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   **Describe the bug**
   
   When using the KubernetesPodOperator (KPO) in an Airflow deployment running 
with the CeleryExecutor, certain tasks may fail with a FileExistsError 
originating from the AWS CLI cache directory (~/.aws/cli/cache).
   This happens before the Kubernetes pod is actually created, during the 
authentication phase against the Kubernetes API server.
   
   The issue stems from a race condition that occurs when multiple KPO tasks 
start simultaneously on the same Airflow worker.
   
   **Where the Error Originates**
   
   When a KPO task begins, the Airflow worker (Celery process) must 
authenticate to the Kubernetes API server using the configuration defined in 
the kubernetes_default connection.
   
   In EKS-based environments, this authentication uses the AWS EKS exec plugin, 
which internally executes:
   `aws eks get-token
   `
   to obtain a temporary authentication token.
   
   During that call, the AWS CLI attempts to create a local cache directory at:
   `/home/astro/.aws/cli/cache
   `
   to store token metadata and credentials.
   
   **Why the “File Exists” Error Occurs**
   
   Airflow Celery workers persist across multiple DAG runs and may execute 
several tasks concurrently.
   
   When two or more KPO tasks are launched in parallel on the same worker, they 
each attempt to run aws eks get-token nearly simultaneously.
   
   Both processes try to create the same cache directory:
   
   ~/.aws/cli/cache
   
   
   Because the AWS CLI uses os.makedirs() without an existence check or lock, a 
race condition occurs.
   The second process raises:
   
   [Errno 17] File exists: '/home/astro/.aws/cli/cache'
   
   
   This is not caused by stale files from previous runs — it’s a transient 
concurrency issue caused by multiple authentication calls happening in parallel.
   
   ```
   [2025-09-20, 03:59:02 UTC] {pod.py:1134} INFO - Building pod 
quotebar-eod-etl-hgq3qfc5 with labels: {'dag_id': 'intraday_mktdata_eod_etl', 
'task_id': 'process_quotes', 'run_id': 
'scheduled__2025-09-20T0359000000-c29cee297', 'kubernetes_pod_operator': 
'True', 'try_number': '1'}
   [2025-09-20, 03:59:02 UTC] {base.py:84} INFO - Retrieving connection 
'kubernetes_default'
   [2025-09-20, 03:59:05 UTC] {kube_config.py:520} ERROR - exec: process 
returned 255. [Errno 17] File exists: '/home/astro/.aws/cli/cache'
   [2025-09-20, 03:59:05 UTC] {taskinstance.py:3313} ERROR - Task failed with 
exception
   Traceback (most recent call last):
   File 
"/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 
768, in _execute_task
   result = _execute_callable(context=context, **execute_callable_kwargs)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 
734, in _execute_callable
   return ExecutionCallableRunner(
   ^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/usr/local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", 
line 252, in run
   return self.func(*args, **kwargs)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/usr/local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 
424, in wrapper
   return func(self, *args, **kwargs)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes.py",
 line 128, in execute
   return super().execute(context)
   ^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/usr/local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 
424, in wrapper
   return func(self, *args, **kwargs)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File "/usr/local/lib/python3.12/site-packages/airflow/decorators/base.py", 
line 266, in execute
   return_value = super().execute(context)
   ^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/usr/local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 
424, in wrapper
   return func(self, *args, **kwargs)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 594, in execute
   return self.execute_sync(context)
   ^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 604, in execute_sync
   self.pod = self.get_or_create_pod( # must set `self.pod` for `on_kill`
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 562, in get_or_create_pod
   pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, 
context=context)
   
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 535, in find_pod
   pod_list = self.client.list_namespaced_pod(
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/usr/local/lib/python3.12/site-packages/kubernetes/client/api/core_v1_api.py", 
line 15823, in list_namespaced_pod
   return self.list_namespaced_pod_with_http_info(namespace, **kwargs) # noqa: 
E501
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/usr/local/lib/python3.12/site-packages/kubernetes/client/api/core_v1_api.py", 
line 15942, in list_namespaced_pod_with_http_info
   return self.api_client.call_api(
   ^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/usr/local/lib/python3.12/site-packages/kubernetes/client/api_client.py", line 
348, in call_api
   return self.__call_api(resource_path, method,
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   File 
"/usr/local/lib/python3.12/site-packages/kubernetes/client/api_client.py", line 
180, in __call_api
   response_data = self.request(
   ^^^^^^^^^^^^^
   File 
"/usr/local/lib/python3.12/site-packages/kubernetes/client/api_client.py", line 
373, in request
   return self.rest_client.GET(url,
   ^^^^^^^^^^^^^^^^^^^^^^^^^
   File "/usr/local/lib/python3.12/site-packages/kubernetes/client/rest.py", 
line 244, in GET
   return self.request("GET", url,
   ^^^^^^^^^^^^^^^^^^^^^^^^
   File "/usr/local/lib/python3.12/site-packages/kubernetes/client/rest.py", 
line 238, in request
   raise ApiException(http_resp=r)
   kubernetes.client.exceptions.ApiException: (403)
   Reason: Forbidden
   HTTP response headers: HTTPHeaderDict({'Audit-Id': 
'6a0a7506-a6f0-42bc-9519-22c0a1a848bd', 'Cache-Control': 'no-cache, private', 
'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 
'X-Kubernetes-Pf-Flowschema-Uid': 'ac181d0d-6f1d-4ff8-88c3-09378d311aca', 
'X-Kubernetes-Pf-Prioritylevel-Uid': 'e54b4844-0114-47f7-9e80-a2462ac663c9', 
'Date': 'Sat, 20 Sep 2025 03:59:05 GMT', 'Content-Length': '262'})
   
   ```
   
   **Environment(s) Affected**
   
   PROD 
   
   **🪜 Steps To Reproduce**
   Reproduction Steps
   
   Configure Airflow to use:
   
   CeleryExecutor
   
   KubernetesPodOperator
   
   EKS authentication through aws eks get-token
   
   Launch multiple KPO tasks that start at the same time and share the same 
worker.
   
   Observe that some tasks intermittently fail before pod creation with a 
FileExistsError referencing ~/.aws/cli/cache.
   
   Example Error
   FileExistsError: [Errno 17] File exists: '/home/astro/.aws/cli/cache'
   
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   ## How to reproduce
   
   ### 1. Set up Airflow with CeleryExecutor and EKS authentication
   
   * Use `CeleryExecutor` with long-lived worker processes.
   * Configure the `kubernetes_default` connection to authenticate to an Amazon 
EKS cluster using the AWS exec plugin:
   
     ```bash
     aws eks get-token
     ```
   * Ensure AWS CLI v2 is installed and available on the Airflow worker image.
   
   ---
   
   ### 2. Create a DAG with multiple parallel `KubernetesPodOperator` tasks
   
   * Define a DAG containing multiple `KubernetesPodOperator` tasks that can 
run in parallel (e.g., no task dependencies and sufficient DAG/task 
concurrency).
   * Ensure that multiple KPO tasks can be scheduled onto the **same Celery 
worker**.
   
   ---
   
   ### 3. Trigger concurrent execution
   
   * Trigger the DAG manually or allow it to run on schedule so that multiple 
KPO tasks start at the same time.
   * Under parallel start conditions, one or more tasks will intermittently 
fail during Kubernetes API authentication with:
   
     ```
     FileExistsError: [Errno 17] File exists: '/home/astro/.aws/cli/cache'
     ```
   * The failure occurs **before pod creation** and surfaces as a Kubernetes 
API authentication failure (`403 Forbidden`).
   
   ---
   
   
   ### Operating System
   
   N/A
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to