This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch v2-5-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1eba989f1bd077d563e3a1b07f950367398a8281 Author: Jed Cunningham <[email protected]> AuthorDate: Fri Jan 13 00:35:16 2023 -0600 KubenetesExecutor sends state even when successful (#28871) Co-authored-by: Tzu-ping Chung <[email protected]> (cherry picked from commit 8a9959cc1ead49785b10b9b28c101e3d94cb4176) --- airflow/executors/kubernetes_executor.py | 37 ++++++++++++++++++----------- tests/executors/test_kubernetes_executor.py | 25 +++++++++++++++++-- 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 8098486ded..34204539fa 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -214,7 +214,7 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): self.watcher_queue.put((pod_id, namespace, State.FAILED, annotations, resource_version)) elif status == "Succeeded": self.log.info("Event: %s Succeeded", pod_id) - self.watcher_queue.put((pod_id, namespace, None, annotations, resource_version)) + self.watcher_queue.put((pod_id, namespace, State.SUCCESS, annotations, resource_version)) elif status == "Running": if event["type"] == "DELETED": self.log.info("Event: Pod %s deleted before it could complete", pod_id) @@ -719,19 +719,26 @@ class KubernetesExecutor(BaseExecutor): if TYPE_CHECKING: assert self.kube_scheduler - if state != State.RUNNING: - if self.kube_config.delete_worker_pods: - if state != State.FAILED or self.kube_config.delete_worker_pods_on_failure: - self.kube_scheduler.delete_pod(pod_id, namespace) - self.log.info("Deleted pod: %s in namespace %s", str(key), str(namespace)) - else: - self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace) - self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace)) - try: - self.running.remove(key) - except KeyError: - self.log.debug("Could not find key: %s", str(key)) - self.event_buffer[key] = state, None + if state == State.RUNNING: + self.event_buffer[key] = state, None + return + + if self.kube_config.delete_worker_pods: + if state != State.FAILED or self.kube_config.delete_worker_pods_on_failure: + self.kube_scheduler.delete_pod(pod_id, namespace) + self.log.info("Deleted pod: %s in namespace %s", str(key), str(namespace)) + else: + self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace) + self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace)) + + try: + self.running.remove(key) + except KeyError: + self.log.debug("TI key not in running, not adding to event_buffer: %s", key) + else: + # We get multiple events once the pod hits a terminal state, and we only want to + # do this once, so only do it when we remove the task from running + self.event_buffer[key] = state, None def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]: tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id] @@ -809,6 +816,8 @@ class KubernetesExecutor(BaseExecutor): ) except ApiException as e: self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e) + pod_id = annotations_to_key(pod.metadata.annotations) + self.running.add(pod_id) def _flush_task_queue(self) -> None: if TYPE_CHECKING: diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 9f5304ba4a..d24f3e8fd9 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -510,8 +510,10 @@ class TestKubernetesExecutor: executor.start() try: key = ("dag_id", "task_id", "run_id", "try_number1") + executor.running = {key} executor._change_state(key, State.RUNNING, "pod_id", "default") assert executor.event_buffer[key][0] == State.RUNNING + assert executor.running == {key} finally: executor.end() @@ -523,8 +525,10 @@ class TestKubernetesExecutor: executor.start() try: key = ("dag_id", "task_id", "run_id", "try_number2") + executor.running = {key} executor._change_state(key, State.SUCCESS, "pod_id", "default") assert executor.event_buffer[key][0] == State.SUCCESS + assert executor.running == set() mock_delete_pod.assert_called_once_with("pod_id", "default") finally: executor.end() @@ -541,8 +545,10 @@ class TestKubernetesExecutor: executor.start() try: key = ("dag_id", "task_id", "run_id", "try_number3") + executor.running = {key} executor._change_state(key, State.FAILED, "pod_id", "default") assert executor.event_buffer[key][0] == State.FAILED + assert executor.running == set() mock_delete_pod.assert_not_called() finally: executor.end() @@ -562,8 +568,10 @@ class TestKubernetesExecutor: executor.start() try: key = ("dag_id", "task_id", "run_id", "try_number2") + executor.running = {key} executor._change_state(key, State.SUCCESS, "pod_id", "test-namespace") assert executor.event_buffer[key][0] == State.SUCCESS + assert executor.running == set() mock_delete_pod.assert_not_called() mock_patch_pod.assert_called_once_with(pod_id="pod_id", namespace="test-namespace") finally: @@ -583,8 +591,10 @@ class TestKubernetesExecutor: executor.start() try: key = ("dag_id", "task_id", "run_id", "try_number2") + executor.running = {key} executor._change_state(key, State.FAILED, "pod_id", "test-namespace") assert executor.event_buffer[key][0] == State.FAILED + assert executor.running == set() mock_delete_pod.assert_called_once_with("pod_id", "test-namespace") mock_patch_pod.assert_not_called() finally: @@ -733,17 +743,27 @@ class TestKubernetesExecutor: executor.kube_client = mock_kube_client executor.kube_config.kube_namespace = "somens" pod_names = ["one", "two"] + + def get_annotations(pod_name): + return { + "dag_id": "dag", + "run_id": "run_id", + "task_id": pod_name, + "try_number": "1", + } + mock_kube_client.list_namespaced_pod.return_value.items = [ k8s.V1Pod( metadata=k8s.V1ObjectMeta( name=pod_name, labels={"airflow-worker": pod_name}, - annotations={"some_annotation": "hello"}, + annotations=get_annotations(pod_name), namespace="somens", ) ) for pod_name in pod_names ] + expected_running_ti_keys = {annotations_to_key(get_annotations(pod_name)) for pod_name in pod_names} executor._adopt_completed_pods(mock_kube_client) mock_kube_client.list_namespaced_pod.assert_called_once_with( @@ -763,6 +783,7 @@ class TestKubernetesExecutor: ], any_order=True, ) + assert executor.running == expected_running_ti_keys @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") def test_not_adopt_unassigned_task(self, mock_kube_client): @@ -1144,7 +1165,7 @@ class TestKubernetesJobWatcher: self.events.append({"type": "MODIFIED", "object": self.pod}) self._run() - self.assert_watcher_queue_called_once_with_state(None) + self.assert_watcher_queue_called_once_with_state(State.SUCCESS) def test_process_status_running_deleted(self): self.pod.status.phase = "Running"
