rachthree opened a new issue, #58496:
URL: https://github.com/apache/airflow/issues/58496

   ### Apache Airflow Provider(s)
   
   cncf-kubernetes
   
   ### Versions of Apache Airflow Providers
   
   `apache-airflow-providers-cncf-kubernetes`: 10.5.0
   
   However, this affects other versions as well as mentioned in these issues:
   * https://github.com/apache/airflow/issues/56596 (10.7.0)
   * https://github.com/apache/airflow/issues/47780 (10.1.0)
   
   ### Apache Airflow version
   
   2.10.5+composer
   
   ### Operating System
   
   Linux (Google Cloud cos_containerd)
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   We are using Cloud Composer on Google Cloud GKE. We also use Cloud Composer 
/ Airflow to task external GPU clusters.
   
   ### What happened
   
   When using a `KubernetesJobOperator` with `wait_until_job_complete=True` and 
tasking a Kueue-enabled external GPU cluster via `kubernetes_conn_id`, we saw 
the following consistently:
   
   1. A k8s job with a "job-" prefix gets created
   2. A k8s pod with a "job-" prefix gets deployed, controlled by the above job.
   3. A k8s pod without a "job-"  prefix gets deployed, running the same 
process as the above pod, and not controlled by a job.
   
   ### What you think should happen instead
   
   The duplicate pod should never have been deployed. I believe 
`KubernetesJobOperator` should always wait for the k8s job to deploy a pod. It 
should not directly deploy a pod should one not be found in time. This is 
already the responsibility of a k8s job.
   
   I believe this is due to a race condition, and the fundamental issue is the 
same for the aforementioned issues. This is caused by 
`KubernetesJobOperator.get_or_create_pod` and `KubernetesJobOperator.find_pod`, 
which directly deploys a pod should it not find one created by the the k8s job. 
This is especially a problem should a cluster use Kueue.
   
   When using `KubernetesPodOperator`, this never happens. However, this 
bypasses Kueue, so we do need to use `KubernetesJobOperator`. I understand 
`KubernetesStartKueueJobOperator` exists, but we opted to use 
`KubernetesJobOperator` and applied the "kueue.x-k8s.io/queue-name"  
labels/annotations ourselves to ensure `KubernetesStartKueueJobOperator` was 
not the cause.
   
   As for the duplicate pod's name without the "job-" prefix, this is due to 
the original pod spec being stored in `KubernetesJobOperator.pod_request_obj`. 
When the pod corresponding to the job isn't found, a pod is directly deployed 
with the original spec. It is not renamed like the job spec in 
`KubernetesJobOperator.build_job_request_obj`. I don't know why renaming the 
job (which results in the prefixed pod) was necessary, but that can be separate 
discussion.
   
   ### How to reproduce
   
   I believe any DAG can run into this issue, as mentioned by the referenced 
issues. However, this may show up more likely with the following:
   1. Use `wait_until_job_complete=True` (this is likely to be a red herring, 
though)
   2. Use a Kueue enabled cluster and apply the "kueue.x-k8s.io/queue-name"  
labels/annotations in the pod spec (via `pod_template_dict`) for the operator.
   
   When I subclass `KubernetesJobOperator` as below, this no longer happens:
   ```python
   import time
   
   from kubernetes.client import models as k8s
   
   from airflow.providers.cncf.kubernetes.operators.job import 
KubernetesJobOperator
   from airflow.utils.context import Context
   
   POD_POLLING_INTERVAL_SEC = 10
   
   class MyKubernetesJobOperator(KubernetesJobOperator):
   
       def find_pod(self, namespace: str, context: Context, *,
                    exclude_checked: bool = True) -> k8s.V1Pod | None:
           """Return an already-running pod for this task instance if one 
exists."""
           label_selector = self._build_find_pod_label_selector(context,
                                                                
exclude_checked=exclude_checked)
   
           #### MODIFIED CODE STARTS HERE ####
           # pod_list = self.client.list_namespaced_pod(
           #         namespace=namespace,
           #         label_selector=label_selector,
           #     ).items
           pod_list = []
           while not pod_list:
               pod_list = self.client.list_namespaced_pod(
                   namespace=namespace,
                   label_selector=label_selector,
               ).items
               time.sleep(POD_POLLING_INTERVAL_SEC)
           #### MODIFIED CODE ENDS HERE ####
   
           pod = None
           num_pods = len(pod_list)
   
           if num_pods == 1:
               pod = pod_list[0]
               self.log_matching_pod(pod=pod, context=context)
           elif num_pods > 1:
               if self.reattach_on_restart:
                   raise AirflowException(f"More than one pod running with 
labels {label_selector}")
               self.log.warning("Found more than one pod running with labels 
%s, resolving ...",
                                label_selector)
               pod = self.process_duplicate_label_pods(pod_list)
               self.log_matching_pod(pod=pod, context=context)
   
           return pod
   
       def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context: 
Context) -> k8s.V1Pod:
           if self.reattach_on_restart:
               pod = self.find_pod(pod_request_obj.metadata.namespace, 
context=context)
               if pod:
                   return pod
           ### MODIFIED CODE STARTS HERE ###
           # self.log.debug("Starting pod:\n%s", 
yaml.safe_dump(pod_request_obj.to_dict()))
           # self.pod_manager.create_pod(pod=pod_request_obj)
           ### MODIFIED CODE ENDS HERE ###
           return pod_request_obj
   ```
   
   ### Anything else
   
   This happens almost every time on a Kueue-enabled cluster.
   
   That being said though, thank you for implementing KubernetesJobOperator! 
This makes our pipelines much easier to conform with our Kueue-enabled clusters.
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to