droppoint commented on code in PR #35800:
URL: https://github.com/apache/airflow/pull/35800#discussion_r1433671156


##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -642,38 +642,37 @@ def adopt_launched_task(
         del tis_to_flush_by_key[ti_key]
         self.running.add(ti_key)
 
-    def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
+    @provide_session
+    def _delete_orphaned_completed_pods(self, session: Session = NEW_SESSION) 
-> None:
         """
-        Patch completed pods so that the KubernetesJobWatcher can delete them.
+        Delete orphaned completed pods with completed TaskInstances.
 
-        :param kube_client: kubernetes client for speaking to kube API
+        Pods that have reached the Completed status are usually deleted by the 
scheduler to which
+        they are attached. In case when the scheduler crashes, there is no one 
to delete these
+        pods. Therefore, they are deleted from another scheduler using this 
function.
         """
+        from airflow.jobs.job import Job, JobState
+
         if TYPE_CHECKING:
-            assert self.scheduler_job_id
+            assert self.kube_scheduler
 
-        new_worker_id_label = 
self._make_safe_label_value(self.scheduler_job_id)
-        query_kwargs = {
-            "field_selector": "status.phase=Succeeded",
-            "label_selector": (
-                "kubernetes_executor=True,"
-                
f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True"
-            ),
-        }
+        alive_schedulers_ids = session.scalars(
+            select(Job.id).where(Job.job_type == "SchedulerJob", Job.state == 
JobState.RUNNING)
+        ).all()
+        labels = ["kubernetes_executor=True", f"{POD_EXECUTOR_DONE_KEY}!=True"]
+        for alive_scheduler_id in alive_schedulers_ids:
+            
labels.append(f"airflow-worker!={self._make_safe_label_value(str(alive_scheduler_id))}")

Review Comment:
   I'm afraid I don't fully understand what are you suggesting. Can you please 
explain it more?
   
   If your concern is about performance, then I think you shouldn't be 
concerned at all. The situation where a completed pod becomes an orphan is 
pretty rare. It only occurs in Airflow setups that fulfill two conditions: 1) 
One of the schedulers died, and 2) Your Airflow setup has a lot of 
simultaneously working dags/pods. The time window when a pod becomes completed 
and before it's normally deleted is quite small. My Airflow setup has about 2k 
simultaneously working dags, but I have only several completed pods that were 
orphaned after one of the schedulers was down. You can make delete calls for 
several pods quickly.
   
   In addition to that, I don't see how we can distribute the deletion of 
orphaned completed pods between the rest of the schedulers. To find which pods 
should be deleted, you need to send a list_pods request through the Kubernetes 
client and supply the list of all ids of the schedulers that are still alive. 
You can't submit a part of the list, as it would lead to the deletion of 
completed pods from normally working schedulers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to