dirrao commented on code in PR #35800:
URL: https://github.com/apache/airflow/pull/35800#discussion_r1433383053


##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -642,38 +642,37 @@ def adopt_launched_task(
         del tis_to_flush_by_key[ti_key]
         self.running.add(ti_key)
 
-    def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
+    @provide_session
+    def _delete_orphaned_completed_pods(self, session: Session = NEW_SESSION) 
-> None:
         """
-        Patch completed pods so that the KubernetesJobWatcher can delete them.
+        Delete orphaned completed pods with completed TaskInstances.
 
-        :param kube_client: kubernetes client for speaking to kube API
+        Pods that have reached the Completed status are usually deleted by the 
scheduler to which
+        they are attached. In case when the scheduler crashes, there is no one 
to delete these
+        pods. Therefore, they are deleted from another scheduler using this 
function.
         """
+        from airflow.jobs.job import Job, JobState
+
         if TYPE_CHECKING:
-            assert self.scheduler_job_id
+            assert self.kube_scheduler
 
-        new_worker_id_label = 
self._make_safe_label_value(self.scheduler_job_id)
-        query_kwargs = {
-            "field_selector": "status.phase=Succeeded",
-            "label_selector": (
-                "kubernetes_executor=True,"
-                
f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True"
-            ),
-        }
+        alive_schedulers_ids = session.scalars(
+            select(Job.id).where(Job.job_type == "SchedulerJob", Job.state == 
JobState.RUNNING)
+        ).all()
+        labels = ["kubernetes_executor=True", f"{POD_EXECUTOR_DONE_KEY}!=True"]
+        for alive_scheduler_id in alive_schedulers_ids:
+            
labels.append(f"airflow-worker!={self._make_safe_label_value(str(alive_scheduler_id))}")
+
+        query_kwargs = {"field_selector": "status.phase=Succeeded", 
"label_selector": ",".join(labels)}
         pod_list = self._list_pods(query_kwargs)
         for pod in pod_list:
-            self.log.info("Attempting to adopt pod %s", pod.metadata.name)
             from kubernetes.client.rest import ApiException
 
             try:
-                kube_client.patch_namespaced_pod(
-                    name=pod.metadata.name,
-                    namespace=pod.metadata.namespace,
-                    body={"metadata": {"labels": {"airflow-worker": 
new_worker_id_label}}},
-                )
+                self.kube_scheduler.delete_pod(pod_name=pod.metadata.name, 
namespace=pod.metadata.namespace)
+                self.log.info("Orphaned completed pod %s has been deleted", 
pod.metadata.name)
             except ApiException as e:
-                self.log.info("Failed to adopt pod %s. Reason: %s", 
pod.metadata.name, e)
-            ti_id = annotations_to_key(pod.metadata.annotations)
-            self.running.add(ti_id)

Review Comment:
   We need to gracefully free the executor running slot from the concerned 
scheduler. Right now, this happens through the pod adoption of completed pods. 
   
   ```
           elif status == "Succeeded":
               # We get multiple events once the pod hits a terminal state, and 
we only want to
               # send it along to the scheduler once.
               # If our event type is DELETED, we have the 
POD_EXECUTOR_DONE_KEY, or the pod has
               # a deletion timestamp, we've already seen the initial Succeeded 
event and sent it
               # along to the scheduler.
               if (
                   event["type"] == "DELETED"
                   or POD_EXECUTOR_DONE_KEY in pod.metadata.labels
                   or pod.metadata.deletion_timestamp
               ):
                   self.log.info(
                       "Skipping event for Succeeded pod %s - event for this 
pod already sent to executor",
                       pod_name,
                   )
                   return
               self.log.info("Event: %s Succeeded, annotations: %s", pod_name, 
annotations_string)
               self.watcher_queue.put((pod_name, namespace, None, annotations, 
resource_version))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to