potiuk commented on code in PR #35800:
URL: https://github.com/apache/airflow/pull/35800#discussion_r1434054803
##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -642,38 +642,37 @@ def adopt_launched_task(
del tis_to_flush_by_key[ti_key]
self.running.add(ti_key)
- def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
+ @provide_session
+ def _delete_orphaned_completed_pods(self, session: Session = NEW_SESSION)
-> None:
"""
- Patch completed pods so that the KubernetesJobWatcher can delete them.
+ Delete orphaned completed pods with completed TaskInstances.
- :param kube_client: kubernetes client for speaking to kube API
+ Pods that have reached the Completed status are usually deleted by the
scheduler to which
+ they are attached. In case when the scheduler crashes, there is no one
to delete these
+ pods. Therefore, they are deleted from another scheduler using this
function.
"""
+ from airflow.jobs.job import Job, JobState
+
if TYPE_CHECKING:
- assert self.scheduler_job_id
+ assert self.kube_scheduler
- new_worker_id_label =
self._make_safe_label_value(self.scheduler_job_id)
- query_kwargs = {
- "field_selector": "status.phase=Succeeded",
- "label_selector": (
- "kubernetes_executor=True,"
-
f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True"
- ),
- }
+ alive_schedulers_ids = session.scalars(
+ select(Job.id).where(Job.job_type == "SchedulerJob", Job.state ==
JobState.RUNNING)
+ ).all()
+ labels = ["kubernetes_executor=True", f"{POD_EXECUTOR_DONE_KEY}!=True"]
+ for alive_scheduler_id in alive_schedulers_ids:
+
labels.append(f"airflow-worker!={self._make_safe_label_value(str(alive_scheduler_id))}")
+
+ query_kwargs = {"field_selector": "status.phase=Succeeded",
"label_selector": ",".join(labels)}
pod_list = self._list_pods(query_kwargs)
for pod in pod_list:
- self.log.info("Attempting to adopt pod %s", pod.metadata.name)
from kubernetes.client.rest import ApiException
try:
- kube_client.patch_namespaced_pod(
- name=pod.metadata.name,
- namespace=pod.metadata.namespace,
- body={"metadata": {"labels": {"airflow-worker":
new_worker_id_label}}},
- )
+ self.kube_scheduler.delete_pod(pod_name=pod.metadata.name,
namespace=pod.metadata.namespace)
+ self.log.info("Orphaned completed pod %s has been deleted",
pod.metadata.name)
except ApiException as e:
- self.log.info("Failed to adopt pod %s. Reason: %s",
pod.metadata.name, e)
- ti_id = annotations_to_key(pod.metadata.annotations)
- self.running.add(ti_id)
Review Comment:
> Adoption should be performed only on pods that are bound to failed
schedulers. It should not be performed on pods that are bound to normally
working schedulers.
Why would that apply to completed PODs as well? What problem would it cause?
I understand that what we are trying to do here is not to `adopt` those
pods, but to `delete` completed ones (this is the gist of that change as I see
it). There is a bit of difference in `adopting` and taking care about the POD,
than `deleting` the POD (which is the last thing that happens to it). As I see
see it, yes, adoption means "I will take care about this POD in the future",
where "deletion" means "let's delete this pod and not care about it any more".
Those are two completely differnt cases.
Or am I missing something?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]