droppoint commented on code in PR #35800:
URL: https://github.com/apache/airflow/pull/35800#discussion_r1433692335
##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -642,38 +642,37 @@ def adopt_launched_task(
del tis_to_flush_by_key[ti_key]
self.running.add(ti_key)
- def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
+ @provide_session
+ def _delete_orphaned_completed_pods(self, session: Session = NEW_SESSION)
-> None:
"""
- Patch completed pods so that the KubernetesJobWatcher can delete them.
+ Delete orphaned completed pods with completed TaskInstances.
- :param kube_client: kubernetes client for speaking to kube API
+ Pods that have reached the Completed status are usually deleted by the
scheduler to which
+ they are attached. In case when the scheduler crashes, there is no one
to delete these
+ pods. Therefore, they are deleted from another scheduler using this
function.
"""
+ from airflow.jobs.job import Job, JobState
+
if TYPE_CHECKING:
- assert self.scheduler_job_id
+ assert self.kube_scheduler
- new_worker_id_label =
self._make_safe_label_value(self.scheduler_job_id)
- query_kwargs = {
- "field_selector": "status.phase=Succeeded",
- "label_selector": (
- "kubernetes_executor=True,"
-
f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True"
- ),
- }
+ alive_schedulers_ids = session.scalars(
+ select(Job.id).where(Job.job_type == "SchedulerJob", Job.state ==
JobState.RUNNING)
+ ).all()
+ labels = ["kubernetes_executor=True", f"{POD_EXECUTOR_DONE_KEY}!=True"]
+ for alive_scheduler_id in alive_schedulers_ids:
+
labels.append(f"airflow-worker!={self._make_safe_label_value(str(alive_scheduler_id))}")
+
+ query_kwargs = {"field_selector": "status.phase=Succeeded",
"label_selector": ",".join(labels)}
pod_list = self._list_pods(query_kwargs)
for pod in pod_list:
- self.log.info("Attempting to adopt pod %s", pod.metadata.name)
from kubernetes.client.rest import ApiException
try:
- kube_client.patch_namespaced_pod(
- name=pod.metadata.name,
- namespace=pod.metadata.namespace,
- body={"metadata": {"labels": {"airflow-worker":
new_worker_id_label}}},
- )
+ self.kube_scheduler.delete_pod(pod_name=pod.metadata.name,
namespace=pod.metadata.namespace)
+ self.log.info("Orphaned completed pod %s has been deleted",
pod.metadata.name)
except ApiException as e:
- self.log.info("Failed to adopt pod %s. Reason: %s",
pod.metadata.name, e)
- ti_id = annotations_to_key(pod.metadata.annotations)
- self.running.add(ti_id)
Review Comment:
Can you clarify what you mean by a 'concerned scheduler'? I understand that
_adopt_completed_pods function is needed when one of the schedulers is down
(i.e., killed by OOM). When your pod is completed, and if the scheduler that it
was attached to is down, the TaskInstance corresponding to this pod is not in
any running set of the rest of the schedulers. And you don't have to adopt it
(or add it to the running set of any alive scheduler) because it's already been
completed. Perhaps this comment will provide more information for you: link
(https://github.com/apache/airflow/pull/35800#issuecomment-1845545959).
Are you referring to the situation when the scheduler missed its heartbeat
and was announced as 'failed' by another scheduler? Shouldn't we fail loudly in
this situation? Should we really mask the problem with skipping heartbeats this
way?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]