rachthree opened a new issue, #58496: URL: https://github.com/apache/airflow/issues/58496
### Apache Airflow Provider(s) cncf-kubernetes ### Versions of Apache Airflow Providers `apache-airflow-providers-cncf-kubernetes`: 10.5.0 However, this affects other versions as well as mentioned in these issues: * https://github.com/apache/airflow/issues/56596 (10.7.0) * https://github.com/apache/airflow/issues/47780 (10.1.0) ### Apache Airflow version 2.10.5+composer ### Operating System Linux (Google Cloud cos_containerd) ### Deployment Official Apache Airflow Helm Chart ### Deployment details We are using Cloud Composer on Google Cloud GKE. We also use Cloud Composer / Airflow to task external GPU clusters. ### What happened When using a `KubernetesJobOperator` with `wait_until_job_complete=True` and tasking a Kueue-enabled external GPU cluster via `kubernetes_conn_id`, we saw the following consistently: 1. A k8s job with a "job-" prefix gets created 2. A k8s pod with a "job-" prefix gets deployed, controlled by the above job. 3. A k8s pod without a "job-" prefix gets deployed, running the same process as the above pod, and not controlled by a job. ### What you think should happen instead The duplicate pod should never have been deployed. I believe `KubernetesJobOperator` should always wait for the k8s job to deploy a pod. It should not directly deploy a pod should one not be found in time. This is already the responsibility of a k8s job. I believe this is due to a race condition, and the fundamental issue is the same for the aforementioned issues. This is caused by `KubernetesJobOperator.get_or_create_pod` and `KubernetesJobOperator.find_pod`, which directly deploys a pod should it not find one created by the the k8s job. This is especially a problem should a cluster use Kueue. When using `KubernetesPodOperator`, this never happens. However, this bypasses Kueue, so we do need to use `KubernetesJobOperator`. I understand `KubernetesStartKueueJobOperator` exists, but we opted to use `KubernetesJobOperator` and applied the "kueue.x-k8s.io/queue-name" labels/annotations ourselves to ensure `KubernetesStartKueueJobOperator` was not the cause. As for the duplicate pod's name without the "job-" prefix, this is due to the original pod spec being stored in `KubernetesJobOperator.pod_request_obj`. When the pod corresponding to the job isn't found, a pod is directly deployed with the original spec. It is not renamed like the job spec in `KubernetesJobOperator.build_job_request_obj`. I don't know why renaming the job (which results in the prefixed pod) was necessary, but that can be separate discussion. ### How to reproduce I believe any DAG can run into this issue, as mentioned by the referenced issues. However, this may show up more likely with the following: 1. Use `wait_until_job_complete=True` (this is likely to be a red herring, though) 2. Use a Kueue enabled cluster and apply the "kueue.x-k8s.io/queue-name" labels/annotations in the pod spec (via `pod_template_dict`) for the operator. When I subclass `KubernetesJobOperator` as below, this no longer happens: ```python import time from kubernetes.client import models as k8s from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator from airflow.utils.context import Context POD_POLLING_INTERVAL_SEC = 10 class MyKubernetesJobOperator(KubernetesJobOperator): def find_pod(self, namespace: str, context: Context, *, exclude_checked: bool = True) -> k8s.V1Pod | None: """Return an already-running pod for this task instance if one exists.""" label_selector = self._build_find_pod_label_selector(context, exclude_checked=exclude_checked) #### MODIFIED CODE STARTS HERE #### # pod_list = self.client.list_namespaced_pod( # namespace=namespace, # label_selector=label_selector, # ).items pod_list = [] while not pod_list: pod_list = self.client.list_namespaced_pod( namespace=namespace, label_selector=label_selector, ).items time.sleep(POD_POLLING_INTERVAL_SEC) #### MODIFIED CODE ENDS HERE #### pod = None num_pods = len(pod_list) if num_pods == 1: pod = pod_list[0] self.log_matching_pod(pod=pod, context=context) elif num_pods > 1: if self.reattach_on_restart: raise AirflowException(f"More than one pod running with labels {label_selector}") self.log.warning("Found more than one pod running with labels %s, resolving ...", label_selector) pod = self.process_duplicate_label_pods(pod_list) self.log_matching_pod(pod=pod, context=context) return pod def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context: Context) -> k8s.V1Pod: if self.reattach_on_restart: pod = self.find_pod(pod_request_obj.metadata.namespace, context=context) if pod: return pod ### MODIFIED CODE STARTS HERE ### # self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict())) # self.pod_manager.create_pod(pod=pod_request_obj) ### MODIFIED CODE ENDS HERE ### return pod_request_obj ``` ### Anything else This happens almost every time on a Kueue-enabled cluster. That being said though, thank you for implementing KubernetesJobOperator! This makes our pipelines much easier to conform with our Kueue-enabled clusters. ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
