pmcquighan-camus opened a new issue, #56596: URL: https://github.com/apache/airflow/issues/56596
### Apache Airflow Provider(s) cncf-kubernetes ### Versions of Apache Airflow Providers apache-airflow-providers-cncf-kubernetes==10.7.0 ### Apache Airflow version 3.0.6 ### Operating System debian 12 ### Deployment Official Apache Airflow Helm Chart ### Deployment details Running on GKE , kubernetes version 1.33 ### What happened Occasionally, the KubernetesJobOperator ends up creating both a Job object and a Pod object. Kubernetes' controller-manager also creates a pod, so there are then 2 pods running for this airflow task. One will have a prefix like `job-` created by the job from the KubernetesJobOperator, and the other will not. This *could* be okay, except that if you have `do_xcom_push=True` on the job, then there is the XCom sidecar container running which will indefinitely until it is [terminated by airflow](https://github.com/apache/airflow/blob/ca9f3f0557bb93e63fc2403a4fd5c17d816f5f8f/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py#L973) by doing an `exec` and sending a signal to the main process. So, when there are 2 pods that exist, the query to find the appropriate pod to exec into possibly choose the wrong pod, and the main job will run indefinitely (or until kubernetes kills it due to an `active_deadline_seconds`) or other airflow timeout and ultima tely fail the task. Side note, the operator [does not sleep in between retries](https://github.com/apache/airflow/blob/ca9f3f0557bb93e63fc2403a4fd5c17d816f5f8f/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py#L457-L462) when trying to find pods for the job, and does `len(pod_list) != self.parallelism`, instead of a `len(pod_list) > self.parallelism` (or something) since a kubernetes Job can have retries on Pods and so there might be 3 pods for a job with parallelism 1 (if the first 2 are `Failed`). ### What you think should happen instead The problem, I believe is that *if paralellism is None* the operator will call a method [get_or_create_pod](https://github.com/apache/airflow/blob/ca9f3f0557bb93e63fc2403a4fd5c17d816f5f8f/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py#L208-L217) which looks for an existing pod (by label). However, the job controller in k8s might not have created a matching pod yet, and so this returns `None`, and a second pod gets created (not associated to the job). I think I have been able to work around this race condition by always setting `parallelism` flag on the Job, but this seems like unexpected behavior, and the job operator should never result in launching a `pod`, and should always launch a `job`. ### How to reproduce Using a simple DAG that just writes something to the XCom file and then tries to read it, if I trigger this 10 times it will typically fail about 3-4 of them (i.e. the task will run indefinitely until being killed by k8s) ```python with DAG( dag_id="k8s-test", schedule=None, catchup=False, start_date=datetime(2025, 8, 1, 0, 0, 0, 0, timezone.utc), max_active_runs=2, default_args={ "retries": 2, "retry_delay": timedelta(seconds=30), }, ) as dag: k8s_job = GKEStartJobOperator( # Task config task_id="k8s_output", cluster_name="xx", location="xx", deferrable=True, poll_interval=30.0, backoff_limit=3, # Number of times pod will be retried (independent of task being retried) do_xcom_push=True, # Need to push the output file paths on to later stages base_container_name="k8s-output", # Don't mark this stage as complete until the job is actually done wait_until_job_complete=True, # Job config - write out a file with a string-wrapped dictionary arguments=[ "python3", "-c", """from pathlib import Path import time import json Path("/airflow/xcom/return.json").parent.mkdir(parents=True, exist_ok=True) with open("/airflow/xcom/return.json", "w") as f: f.write(json.dumps({"hello":"world"})) """, ], image="python:3.12.10-alpine", namespace="default", ) parse_xcom = GKEStartJobOperator( # Task config task_id="parse_xcom", cluster_name="xx", location="xx", deferrable=True, poll_interval=30.0, backoff_limit=3, base_container_name="parse-xcom", wait_until_job_complete=True, arguments=[ "echo", "{{ ti.xcom_pull('k8s_output')", ], image="python:3.12.10-alpine", namespace="default", ttl_seconds_after_finished=180, ) _ = parse_xcom << k8s_job ``` ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
