droppoint commented on code in PR #35800:
URL: https://github.com/apache/airflow/pull/35800#discussion_r1404083513
##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -642,39 +641,6 @@ def adopt_launched_task(
del tis_to_flush_by_key[ti_key]
self.running.add(ti_key)
- def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
- """
- Patch completed pods so that the KubernetesJobWatcher can delete them.
-
- :param kube_client: kubernetes client for speaking to kube API
- """
- if TYPE_CHECKING:
- assert self.scheduler_job_id
-
- new_worker_id_label =
self._make_safe_label_value(self.scheduler_job_id)
- query_kwargs = {
- "field_selector": "status.phase=Succeeded",
- "label_selector": (
- "kubernetes_executor=True,"
-
f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True"
- ),
- }
- pod_list = self._list_pods(query_kwargs)
- for pod in pod_list:
- self.log.info("Attempting to adopt pod %s", pod.metadata.name)
- from kubernetes.client.rest import ApiException
-
- try:
- kube_client.patch_namespaced_pod(
- name=pod.metadata.name,
- namespace=pod.metadata.namespace,
- body={"metadata": {"labels": {"airflow-worker":
new_worker_id_label}}},
- )
- except ApiException as e:
- self.log.info("Failed to adopt pod %s. Reason: %s",
pod.metadata.name, e)
Review Comment:
It's not that simple. While it's true that placing "continue" in the except
block will fix part of the problem, it won't resolve the race condition. Slots
will continue to leak after that. Please read this
[comment](https://github.com/apache/airflow/issues/32928#issuecomment-1820413530)
from issue #32928. In short, the race happens because this function attempts
to adopt completed pods every 5 minutes (by default) from other fully
operational schedulers. When this 'adoption' occurs, the TaskInstance key of
normally completed pod is added to KubernetesExecutor.running set. Since both
the pod and TaskInstance are complete, no change_state will occur, leaving the
TaskInstance key in the KubernetesExecutor.running set indefinitely (or until
the scheduler is restarted). Once the count of KubernetesExecutor.running
exceeds KubernetesExecutor.parallelism, the executor will cease scheduling new
tasks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]