This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch v2-5-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 9ad5779a54e3a2a25449336e1e1f91b0048ef17c Author: Jed Cunningham <[email protected]> AuthorDate: Wed Jan 11 15:15:21 2023 -0600 Annotate KubernetesExecutor pods that we don't delete (#28844) We weren't keeping track of which pods we'd finished with yet, so if you had `[kubernetes_executor] delete_worker_pods` false, your KubeExecutor would adopt every single remaining pod when starting up. Every time. We now annotate them with `airflow_executor_done` when processing a pods event from the watcher, so we can ignore the pod when doing adoption. (cherry picked from commit 72da8bff12e3133045b61936952599a84d3f53a2) --- airflow/executors/kubernetes_executor.py | 22 +++++++++++++++++++++- tests/executors/test_kubernetes_executor.py | 18 ++++++++++++------ 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 7dcba12968..8098486ded 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -52,6 +52,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import provide_session from airflow.utils.state import State +POD_EXECUTOR_DONE_KEY = "airflow_executor_done" + # TaskInstance key, command, configuration, pod_template_file KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]] @@ -360,6 +362,18 @@ class AirflowKubernetesScheduler(LoggingMixin): if e.status != 404: raise + def patch_pod_executor_done(self, *, pod_id: str, namespace: str): + """Add a "done" annotation to ensure we don't continually adopt pods""" + self.log.debug("Patching pod %s in namespace %s to mark it as done", pod_id, namespace) + try: + self.kube_client.patch_namespaced_pod( + name=pod_id, + namespace=namespace, + body={"metadata": {"labels": {POD_EXECUTOR_DONE_KEY: "True"}}}, + ) + except ApiException as e: + self.log.info("Failed to patch pod %s with done annotation. Reason: %s", pod_id, e) + def sync(self) -> None: """ The sync function checks the status of all currently running kubernetes jobs. @@ -710,6 +724,9 @@ class KubernetesExecutor(BaseExecutor): if state != State.FAILED or self.kube_config.delete_worker_pods_on_failure: self.kube_scheduler.delete_pod(pod_id, namespace) self.log.info("Deleted pod: %s in namespace %s", str(key), str(namespace)) + else: + self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace) + self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace)) try: self.running.remove(key) except KeyError: @@ -776,7 +793,10 @@ class KubernetesExecutor(BaseExecutor): new_worker_id_label = pod_generator.make_safe_label_value(self.scheduler_job_id) kwargs = { "field_selector": "status.phase=Succeeded", - "label_selector": f"kubernetes_executor=True,airflow-worker!={new_worker_id_label}", + "label_selector": ( + "kubernetes_executor=True," + f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True" + ), } pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs) for pod in pod_list.items: diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index d1210765c0..9f5304ba4a 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -549,10 +549,12 @@ class TestKubernetesExecutor: @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") - @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod") + @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler") def test_change_state_skip_pod_deletion( - self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher + self, mock_kubescheduler, mock_get_kube_client, mock_kubernetes_job_watcher ): + mock_delete_pod = mock_kubescheduler.return_value.delete_pod + mock_patch_pod = mock_kubescheduler.return_value.patch_pod_executor_done executor = self.kubernetes_executor executor.kube_config.delete_worker_pods = False executor.kube_config.delete_worker_pods_on_failure = False @@ -560,18 +562,21 @@ class TestKubernetesExecutor: executor.start() try: key = ("dag_id", "task_id", "run_id", "try_number2") - executor._change_state(key, State.SUCCESS, "pod_id", "default") + executor._change_state(key, State.SUCCESS, "pod_id", "test-namespace") assert executor.event_buffer[key][0] == State.SUCCESS mock_delete_pod.assert_not_called() + mock_patch_pod.assert_called_once_with(pod_id="pod_id", namespace="test-namespace") finally: executor.end() @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") - @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod") + @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler") def test_change_state_failed_pod_deletion( - self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher + self, mock_kubescheduler, mock_get_kube_client, mock_kubernetes_job_watcher ): + mock_delete_pod = mock_kubescheduler.return_value.delete_pod + mock_patch_pod = mock_kubescheduler.return_value.patch_pod_executor_done executor = self.kubernetes_executor executor.kube_config.delete_worker_pods_on_failure = True @@ -581,6 +586,7 @@ class TestKubernetesExecutor: executor._change_state(key, State.FAILED, "pod_id", "test-namespace") assert executor.event_buffer[key][0] == State.FAILED mock_delete_pod.assert_called_once_with("pod_id", "test-namespace") + mock_patch_pod.assert_not_called() finally: executor.end() @@ -743,7 +749,7 @@ class TestKubernetesExecutor: mock_kube_client.list_namespaced_pod.assert_called_once_with( namespace="somens", field_selector="status.phase=Succeeded", - label_selector="kubernetes_executor=True,airflow-worker!=modified", + label_selector="kubernetes_executor=True,airflow-worker!=modified,airflow_executor_done!=True", ) assert len(pod_names) == mock_kube_client.patch_namespaced_pod.call_count mock_kube_client.patch_namespaced_pod.assert_has_calls(
