JCoder01 commented on code in PR #35800:
URL: https://github.com/apache/airflow/pull/35800#discussion_r1404509646


##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -642,39 +641,6 @@ def adopt_launched_task(
         del tis_to_flush_by_key[ti_key]
         self.running.add(ti_key)
 
-    def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
-        """
-        Patch completed pods so that the KubernetesJobWatcher can delete them.
-
-        :param kube_client: kubernetes client for speaking to kube API
-        """
-        if TYPE_CHECKING:
-            assert self.scheduler_job_id
-
-        new_worker_id_label = 
self._make_safe_label_value(self.scheduler_job_id)
-        query_kwargs = {
-            "field_selector": "status.phase=Succeeded",
-            "label_selector": (
-                "kubernetes_executor=True,"
-                
f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True"
-            ),
-        }
-        pod_list = self._list_pods(query_kwargs)
-        for pod in pod_list:
-            self.log.info("Attempting to adopt pod %s", pod.metadata.name)
-            from kubernetes.client.rest import ApiException
-
-            try:
-                kube_client.patch_namespaced_pod(
-                    name=pod.metadata.name,
-                    namespace=pod.metadata.namespace,
-                    body={"metadata": {"labels": {"airflow-worker": 
new_worker_id_label}}},
-                )
-            except ApiException as e:
-                self.log.info("Failed to adopt pod %s. Reason: %s", 
pod.metadata.name, e)

Review Comment:
   Apologies. I had read your comment but due to the hazards of reading too 
fast, too early in the morning I missed that you had already tried the 
`continue` approach. Reviewing your comment again, it seems adding the 
`continue` solves the majority of the issues. This leaves the remaining 
condition which if I understand it, is that it's possible for 2 schedulers to 
adopt a completed pod, which means it's added to the `running` set. And then 
once the pod is deleted by one scheduler there is nothing to signal to the 
other scheduler(s) that the pod needs to be removed from the `running` set? So 
we need to either :
   A. only adopt pods not owned by a running executor, or 
   B. Clean up all schedulers when a pod is deleted. 
   The proper approach seems to be what @dstandish recommended to check if the 
pod is owned by a live scheduler before adopting. Or alternatively, perhaps 
KubernetesJobWatcher to could react to `DELETE` events rather than ignoring 
them, but this seems far less desirable to just not adopting the pods to start 
with if running multiple schedulers.
   The CronJob approach just seems like the wrong direction. I don't think the 
option to create it as part of the chart exists in charts other than the 
official chart, and even so, in the official chart it's disabled by default.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to