This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch v2-5-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 389c074b82758ff56c0785ad19ee06ed1b9e860a Author: Jarek Potiuk <[email protected]> AuthorDate: Sat Dec 10 17:09:06 2022 +0100 More robust cleanup of executors in test_kubernetes_executor (#28281) As a follow up after #28047, this PR will make the test cleanup more robust and resilient to any errors that might have caused kubernetes_executors left behind. wrapping start()/end() in try/finally will make the tests completely resilient to cases where the asserts start to fail - without those, any failure in tests would cause the same resource leakage as we initially had when #28407 was iterated on. (cherry picked from commit 3b203bcb676853bd642a01121988b1cbe929307d) --- tests/executors/test_kubernetes_executor.py | 348 +++++++++++++++------------- 1 file changed, 193 insertions(+), 155 deletions(-) diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 97619225e6..d1210765c0 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -162,9 +162,11 @@ class TestAirflowKubernetesScheduler: kube_executor = KubernetesExecutor() kube_executor.job_id = 1 kube_executor.start() - kube_executor.kube_scheduler.delete_pod(pod_id, namespace) - - mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions()) + try: + kube_executor.kube_scheduler.delete_pod(pod_id, namespace) + mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions()) + finally: + kube_executor.end() @unittest.skipIf(AirflowKubernetesScheduler is None, "kubernetes python package is not installed") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") @@ -203,9 +205,11 @@ class TestAirflowKubernetesScheduler: kube_executor = KubernetesExecutor() kube_executor.job_id = 1 kube_executor.start() - - kube_executor.kube_scheduler.delete_pod(pod_id, namespace) - mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions()) + try: + kube_executor.kube_scheduler.delete_pod(pod_id, namespace) + mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions()) + finally: + kube_executor.end() class TestKubernetesExecutor: @@ -266,32 +270,35 @@ class TestKubernetesExecutor: with conf_vars(config): kubernetes_executor = self.kubernetes_executor kubernetes_executor.start() - # Execute a task while the Api Throws errors - try_number = 1 - task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number) - kubernetes_executor.execute_async( - key=task_instance_key, - queue=None, - command=["airflow", "tasks", "run", "true", "some_parameter"], - ) - kubernetes_executor.sync() + try: + # Execute a task while the Api Throws errors + try_number = 1 + task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number) + kubernetes_executor.execute_async( + key=task_instance_key, + queue=None, + command=["airflow", "tasks", "run", "true", "some_parameter"], + ) + kubernetes_executor.sync() - assert mock_kube_client.create_namespaced_pod.call_count == 1 + assert mock_kube_client.create_namespaced_pod.call_count == 1 - if should_requeue: - assert not kubernetes_executor.task_queue.empty() + if should_requeue: + assert not kubernetes_executor.task_queue.empty() - # Disable the ApiException - mock_kube_client.create_namespaced_pod.side_effect = None + # Disable the ApiException + mock_kube_client.create_namespaced_pod.side_effect = None - # Execute the task without errors should empty the queue - mock_kube_client.create_namespaced_pod.reset_mock() - kubernetes_executor.sync() - assert mock_kube_client.create_namespaced_pod.called - assert kubernetes_executor.task_queue.empty() - else: - assert kubernetes_executor.task_queue.empty() - assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED + # Execute the task without errors should empty the queue + mock_kube_client.create_namespaced_pod.reset_mock() + kubernetes_executor.sync() + assert mock_kube_client.create_namespaced_pod.called + assert kubernetes_executor.task_queue.empty() + else: + assert kubernetes_executor.task_queue.empty() + assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED + finally: + kubernetes_executor.end() @pytest.mark.skipif( AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" @@ -311,23 +318,26 @@ class TestKubernetesExecutor: kubernetes_executor = self.kubernetes_executor kubernetes_executor.start() - try_number = 1 - task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number) - kubernetes_executor.execute_async( - key=task_instance_key, - queue=None, - command=["airflow", "tasks", "run", "true", "some_parameter"], - ) - kubernetes_executor.sync() + try: + try_number = 1 + task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number) + kubernetes_executor.execute_async( + key=task_instance_key, + queue=None, + command=["airflow", "tasks", "run", "true", "some_parameter"], + ) + kubernetes_executor.sync() - # The pod_mutation_hook should have been called once. - assert mock_pmh.call_count == 1 - # There should be no pod creation request sent - assert mock_kube_client.create_namespaced_pod.call_count == 0 - # The task is not re-queued and there is the failed record in event_buffer - assert kubernetes_executor.task_queue.empty() - assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED - assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == exception_in_pmh + # The pod_mutation_hook should have been called once. + assert mock_pmh.call_count == 1 + # There should be no pod creation request sent + assert mock_kube_client.create_namespaced_pod.call_count == 0 + # The task is not re-queued and there is the failed record in event_buffer + assert kubernetes_executor.task_queue.empty() + assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED + assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == exception_in_pmh + finally: + kubernetes_executor.end() @pytest.mark.skipif( AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" @@ -351,19 +361,22 @@ class TestKubernetesExecutor: with conf_vars(config): kubernetes_executor = self.kubernetes_executor kubernetes_executor.start() - # Execute a task while the Api Throws errors - try_number = 1 - task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number) - kubernetes_executor.execute_async( - key=task_instance_key, - queue=None, - command=["airflow", "tasks", "run", "true", "some_parameter"], - ) - kubernetes_executor.sync() + try: + # Execute a task while the Api Throws errors + try_number = 1 + task_instance_key = TaskInstanceKey("dag", "task", "run_id", try_number) + kubernetes_executor.execute_async( + key=task_instance_key, + queue=None, + command=["airflow", "tasks", "run", "true", "some_parameter"], + ) + kubernetes_executor.sync() - assert kubernetes_executor.task_queue.empty() - assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED - assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == fail_msg + assert kubernetes_executor.task_queue.empty() + assert kubernetes_executor.event_buffer[task_instance_key][0] == State.FAILED + assert kubernetes_executor.event_buffer[task_instance_key][1].args[0] == fail_msg + finally: + kubernetes_executor.end() @mock.patch("airflow.executors.kubernetes_executor.KubeConfig") @mock.patch("airflow.executors.kubernetes_executor.KubernetesExecutor.sync") @@ -384,20 +397,22 @@ class TestKubernetesExecutor: def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job_watcher): executor = self.kubernetes_executor executor.start() + try: + assert executor.event_buffer == {} + executor.execute_async( + key=("dag", "task", datetime.utcnow(), 1), + queue=None, + command=["airflow", "tasks", "run", "true", "some_parameter"], + executor_config=k8s.V1Pod( + spec=k8s.V1PodSpec( + containers=[k8s.V1Container(name="base", image="myimage", image_pull_policy="Always")] + ) + ), + ) - assert executor.event_buffer == {} - executor.execute_async( - key=("dag", "task", datetime.utcnow(), 1), - queue=None, - command=["airflow", "tasks", "run", "true", "some_parameter"], - executor_config=k8s.V1Pod( - spec=k8s.V1PodSpec( - containers=[k8s.V1Container(name="base", image="myimage", image_pull_policy="Always")] - ) - ), - ) - - assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed" + assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed" + finally: + executor.end() @pytest.mark.execution_timeout(10) @pytest.mark.skipif( @@ -417,83 +432,88 @@ class TestKubernetesExecutor: with conf_vars({("kubernetes", "pod_template_file"): ""}): executor = self.kubernetes_executor executor.start() + try: + assert executor.event_buffer == {} + assert executor.task_queue.empty() + + executor.execute_async( + key=TaskInstanceKey("dag", "task", "run_id", 1), + queue=None, + command=["airflow", "tasks", "run", "true", "some_parameter"], + executor_config={ + "pod_template_file": template_file, + "pod_override": k8s.V1Pod( + metadata=k8s.V1ObjectMeta(labels={"release": "stable"}), + spec=k8s.V1PodSpec( + containers=[k8s.V1Container(name="base", image="airflow:3.6")], + ), + ), + }, + ) - assert executor.event_buffer == {} - assert executor.task_queue.empty() - - executor.execute_async( - key=TaskInstanceKey("dag", "task", "run_id", 1), - queue=None, - command=["airflow", "tasks", "run", "true", "some_parameter"], - executor_config={ - "pod_template_file": template_file, - "pod_override": k8s.V1Pod( - metadata=k8s.V1ObjectMeta(labels={"release": "stable"}), + assert not executor.task_queue.empty() + task = executor.task_queue.get_nowait() + _, _, expected_executor_config, expected_pod_template_file = task + executor.task_queue.task_done() + # Test that the correct values have been put to queue + assert expected_executor_config.metadata.labels == {"release": "stable"} + assert expected_pod_template_file == template_file + + self.kubernetes_executor.kube_scheduler.run_next(task) + mock_run_pod_async.assert_called_once_with( + k8s.V1Pod( + api_version="v1", + kind="Pod", + metadata=k8s.V1ObjectMeta( + name=mock.ANY, + namespace="default", + annotations={ + "dag_id": "dag", + "run_id": "run_id", + "task_id": "task", + "try_number": "1", + }, + labels={ + "airflow-worker": "5", + "airflow_version": mock.ANY, + "dag_id": "dag", + "run_id": "run_id", + "kubernetes_executor": "True", + "mylabel": "foo", + "release": "stable", + "task_id": "task", + "try_number": "1", + }, + ), spec=k8s.V1PodSpec( - containers=[k8s.V1Container(name="base", image="airflow:3.6")], + containers=[ + k8s.V1Container( + name="base", + image="airflow:3.6", + args=["airflow", "tasks", "run", "true", "some_parameter"], + env=[k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value="True")], + ) + ], + image_pull_secrets=[k8s.V1LocalObjectReference(name="airflow-registry")], + scheduler_name="default-scheduler", + security_context=k8s.V1PodSecurityContext(fs_group=50000, run_as_user=50000), ), - ), - }, - ) - - assert not executor.task_queue.empty() - task = executor.task_queue.get_nowait() - _, _, expected_executor_config, expected_pod_template_file = task - - # Test that the correct values have been put to queue - assert expected_executor_config.metadata.labels == {"release": "stable"} - assert expected_pod_template_file == template_file - - self.kubernetes_executor.kube_scheduler.run_next(task) - mock_run_pod_async.assert_called_once_with( - k8s.V1Pod( - api_version="v1", - kind="Pod", - metadata=k8s.V1ObjectMeta( - name=mock.ANY, - namespace="default", - annotations={ - "dag_id": "dag", - "run_id": "run_id", - "task_id": "task", - "try_number": "1", - }, - labels={ - "airflow-worker": "5", - "airflow_version": mock.ANY, - "dag_id": "dag", - "run_id": "run_id", - "kubernetes_executor": "True", - "mylabel": "foo", - "release": "stable", - "task_id": "task", - "try_number": "1", - }, - ), - spec=k8s.V1PodSpec( - containers=[ - k8s.V1Container( - name="base", - image="airflow:3.6", - args=["airflow", "tasks", "run", "true", "some_parameter"], - env=[k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD", value="True")], - ) - ], - image_pull_secrets=[k8s.V1LocalObjectReference(name="airflow-registry")], - scheduler_name="default-scheduler", - security_context=k8s.V1PodSecurityContext(fs_group=50000, run_as_user=50000), - ), + ) ) - ) + finally: + executor.end() @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher): executor = self.kubernetes_executor executor.start() - key = ("dag_id", "task_id", "run_id", "try_number1") - executor._change_state(key, State.RUNNING, "pod_id", "default") - assert executor.event_buffer[key][0] == State.RUNNING + try: + key = ("dag_id", "task_id", "run_id", "try_number1") + executor._change_state(key, State.RUNNING, "pod_id", "default") + assert executor.event_buffer[key][0] == State.RUNNING + finally: + executor.end() @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") @@ -501,10 +521,13 @@ class TestKubernetesExecutor: def test_change_state_success(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher): executor = self.kubernetes_executor executor.start() - key = ("dag_id", "task_id", "run_id", "try_number2") - executor._change_state(key, State.SUCCESS, "pod_id", "default") - assert executor.event_buffer[key][0] == State.SUCCESS - mock_delete_pod.assert_called_once_with("pod_id", "default") + try: + key = ("dag_id", "task_id", "run_id", "try_number2") + executor._change_state(key, State.SUCCESS, "pod_id", "default") + assert executor.event_buffer[key][0] == State.SUCCESS + mock_delete_pod.assert_called_once_with("pod_id", "default") + finally: + executor.end() @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") @@ -516,10 +539,13 @@ class TestKubernetesExecutor: executor.kube_config.delete_worker_pods = False executor.kube_config.delete_worker_pods_on_failure = False executor.start() - key = ("dag_id", "task_id", "run_id", "try_number3") - executor._change_state(key, State.FAILED, "pod_id", "default") - assert executor.event_buffer[key][0] == State.FAILED - mock_delete_pod.assert_not_called() + try: + key = ("dag_id", "task_id", "run_id", "try_number3") + executor._change_state(key, State.FAILED, "pod_id", "default") + assert executor.event_buffer[key][0] == State.FAILED + mock_delete_pod.assert_not_called() + finally: + executor.end() @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") @@ -532,10 +558,13 @@ class TestKubernetesExecutor: executor.kube_config.delete_worker_pods_on_failure = False executor.start() - key = ("dag_id", "task_id", "run_id", "try_number2") - executor._change_state(key, State.SUCCESS, "pod_id", "default") - assert executor.event_buffer[key][0] == State.SUCCESS - mock_delete_pod.assert_not_called() + try: + key = ("dag_id", "task_id", "run_id", "try_number2") + executor._change_state(key, State.SUCCESS, "pod_id", "default") + assert executor.event_buffer[key][0] == State.SUCCESS + mock_delete_pod.assert_not_called() + finally: + executor.end() @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") @@ -547,10 +576,13 @@ class TestKubernetesExecutor: executor.kube_config.delete_worker_pods_on_failure = True executor.start() - key = ("dag_id", "task_id", "run_id", "try_number2") - executor._change_state(key, State.FAILED, "pod_id", "test-namespace") - assert executor.event_buffer[key][0] == State.FAILED - mock_delete_pod.assert_called_once_with("pod_id", "test-namespace") + try: + key = ("dag_id", "task_id", "run_id", "try_number2") + executor._change_state(key, State.FAILED, "pod_id", "test-namespace") + assert executor.event_buffer[key][0] == State.FAILED + mock_delete_pod.assert_called_once_with("pod_id", "test-namespace") + finally: + executor.end() @mock.patch("airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task") @mock.patch("airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods") @@ -789,8 +821,11 @@ class TestKubernetesExecutor: executor = KubernetesExecutor() executor.job_id = 123 executor.start() - assert 2 == len(executor.event_scheduler.queue) - executor._check_worker_pods_pending_timeout() + try: + assert 2 == len(executor.event_scheduler.queue) + executor._check_worker_pods_pending_timeout() + finally: + executor.end() mock_kube_client.list_namespaced_pod.assert_called_once_with( "mynamespace", @@ -831,7 +866,10 @@ class TestKubernetesExecutor: executor = KubernetesExecutor() executor.job_id = 123 executor.start() - executor._check_worker_pods_pending_timeout() + try: + executor._check_worker_pods_pending_timeout() + finally: + executor.end() mock_kube_client.list_pod_for_all_namespaces.assert_called_once_with( field_selector="status.phase=Pending",
