This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4da3617431d77f6f7bf33ce9304943fe2323f7dc Author: Kaxil Naik <[email protected]> AuthorDate: Tue Nov 3 15:28:51 2020 +0000 Add Kubernetes cleanup-pods CLI command for Helm Chart (#11802) closes: https://github.com/apache/airflow/issues/11146 (cherry picked from commit 980c7252c0f28c251e9f87d736cd88d6027f3da3) --- airflow/bin/cli.py | 81 +++++++++++++++++++++++++++++++++ tests/cli/test_cli.py | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 203 insertions(+) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index c22e847..d5490f5 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -1464,6 +1464,74 @@ Happy Airflowing! print(output_string) +@cli_utils.action_logging +def cleanup_pods(args): + from kubernetes.client.rest import ApiException + + from airflow.kubernetes.kube_client import get_kube_client + + """Clean up k8s pods in evicted/failed/succeeded states""" + namespace = args.namespace + + # https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/ + # All Containers in the Pod have terminated in success, and will not be restarted. + pod_succeeded = 'succeeded' + + # All Containers in the Pod have terminated, and at least one Container has terminated in failure. + # That is, the Container either exited with non-zero status or was terminated by the system. + pod_failed = 'failed' + + # https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/ + pod_reason_evicted = 'evicted' + # If pod is failed and restartPolicy is: + # * Always: Restart Container; Pod phase stays Running. + # * OnFailure: Restart Container; Pod phase stays Running. + # * Never: Pod phase becomes Failed. + pod_restart_policy_never = 'never' + + print('Loading Kubernetes configuration') + kube_client = get_kube_client() + print('Listing pods in namespace {}'.format(namespace)) + continue_token = None + while True: # pylint: disable=too-many-nested-blocks + pod_list = kube_client.list_namespaced_pod(namespace=namespace, limit=500, _continue=continue_token) + for pod in pod_list.items: + pod_name = pod.metadata.name + print('Inspecting pod {}'.format(pod_name)) + pod_phase = pod.status.phase.lower() + pod_reason = pod.status.reason.lower() if pod.status.reason else '' + pod_restart_policy = pod.spec.restart_policy.lower() + + if ( + pod_phase == pod_succeeded + or (pod_phase == pod_failed and pod_restart_policy == pod_restart_policy_never) + or (pod_reason == pod_reason_evicted) + ): + print('Deleting pod "{}" phase "{}" and reason "{}", restart policy "{}"'.format( + pod_name, pod_phase, pod_reason, pod_restart_policy) + ) + try: + _delete_pod(pod.metadata.name, namespace) + except ApiException as e: + print("can't remove POD: {}".format(e), file=sys.stderr) + continue + print('No action taken on pod {}'.format(pod_name)) + continue_token = pod_list.metadata._continue # pylint: disable=protected-access + if not continue_token: + break + + +def _delete_pod(name, namespace): + """Helper Function for cleanup_pods""" + from kubernetes import client + + core_v1 = client.CoreV1Api() + delete_options = client.V1DeleteOptions() + print('Deleting POD "{}" from "{}" namespace'.format(name, namespace)) + api_response = core_v1.delete_namespaced_pod(name=name, namespace=namespace, body=delete_options) + print(api_response) + + @cli_utils.deprecated_action(new_name='celery worker') @cli_utils.action_logging def worker(args): @@ -2705,6 +2773,13 @@ ARG_SKIP_SERVE_LOGS = Arg( action="store_true", ) +# kubernetes cleanup-pods +ARG_NAMESPACE = Arg( + ("--namespace",), + default='default', + help="Kubernetes Namespace", +) + ALTERNATIVE_CONN_SPECS_ARGS = [ ARG_CONN_TYPE, ARG_CONN_HOST, @@ -3154,6 +3229,12 @@ CONFIG_COMMANDS = ( KUBERNETES_COMMANDS = ( ActionCommand( + name='cleanup-pods', + help="Clean up Kubernetes pods in evicted/failed/succeeded states", + func=cleanup_pods, + args=(ARG_NAMESPACE, ), + ), + ActionCommand( name='generate-dag-yaml', help="Generate YAML files for all tasks in DAG. Useful for debugging tasks without " "launching into a cluster", diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 048f802..bb39869 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -23,6 +23,8 @@ import io import logging import os +import kubernetes + from airflow.configuration import conf from parameterized import parameterized from six import StringIO, PY2 @@ -1026,3 +1028,123 @@ class TestCLIGetNumReadyWorkersRunning(unittest.TestCase): with mock.patch('psutil.Process', return_value=self.process): self.assertEqual(self.monitor._get_num_ready_workers_running(), 0) + + +class TestCleanUpPodsCommand(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.parser = cli.get_parser() + + @mock.patch('kubernetes.client.CoreV1Api.delete_namespaced_pod') + def test_delete_pod(self, delete_namespaced_pod): + cli._delete_pod('dummy', 'awesome-namespace') + delete_namespaced_pod.assert_called_with(body=mock.ANY, name='dummy', namespace='awesome-namespace') + + @mock.patch('airflow.bin.cli._delete_pod') + @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod') + @mock.patch('airflow.kubernetes.kube_client.config.load_incluster_config') + def test_running_pods_are_not_cleaned(self, load_incluster_config, list_namespaced_pod, delete_pod): + pod1 = MagicMock() + pod1.metadata.name = 'dummy' + pod1.status.phase = 'Running' + pod1.status.reason = None + pods = list_namespaced_pod() + pods.metadata._continue = None + pods.items = [pod1] + cli.cleanup_pods( + self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace']) + ) + delete_pod.assert_not_called() + load_incluster_config.assert_called_once() + + @mock.patch('airflow.bin.cli._delete_pod') + @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod') + @mock.patch('airflow.kubernetes.kube_client.config.load_incluster_config') + def test_cleanup_succeeded_pods(self, load_incluster_config, list_namespaced_pod, delete_pod): + pod1 = MagicMock() + pod1.metadata.name = 'dummy' + pod1.status.phase = 'Succeeded' + pod1.status.reason = None + pods = list_namespaced_pod() + pods.metadata._continue = None + pods.items = [pod1] + cli.cleanup_pods( + self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace']) + ) + delete_pod.assert_called_with('dummy', 'awesome-namespace') + load_incluster_config.assert_called_once() + + @mock.patch('airflow.bin.cli._delete_pod') + @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod') + @mock.patch('kubernetes.config.load_incluster_config') + def test_no_cleanup_failed_pods_wo_restart_policy_never( + self, load_incluster_config, list_namespaced_pod, delete_pod + ): + pod1 = MagicMock() + pod1.metadata.name = 'dummy2' + pod1.status.phase = 'Failed' + pod1.status.reason = None + pod1.spec.restart_policy = 'Always' + pods = list_namespaced_pod() + pods.metadata._continue = None + pods.items = [pod1] + cli.cleanup_pods( + self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace']) + ) + delete_pod.assert_not_called() + load_incluster_config.assert_called_once() + + @mock.patch('airflow.bin.cli._delete_pod') + @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod') + @mock.patch('kubernetes.config.load_incluster_config') + def test_cleanup_failed_pods_w_restart_policy_never( + self, load_incluster_config, list_namespaced_pod, delete_pod + ): + pod1 = MagicMock() + pod1.metadata.name = 'dummy3' + pod1.status.phase = 'Failed' + pod1.status.reason = None + pod1.spec.restart_policy = 'Never' + pods = list_namespaced_pod() + pods.metadata._continue = None + pods.items = [pod1] + cli.cleanup_pods( + self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace']) + ) + delete_pod.assert_called_with('dummy3', 'awesome-namespace') + load_incluster_config.assert_called_once() + + @mock.patch('airflow.bin.cli._delete_pod') + @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod') + @mock.patch('kubernetes.config.load_incluster_config') + def test_cleanup_evicted_pods(self, load_incluster_config, list_namespaced_pod, delete_pod): + pod1 = MagicMock() + pod1.metadata.name = 'dummy4' + pod1.status.phase = 'Failed' + pod1.status.reason = 'Evicted' + pod1.spec.restart_policy = 'Never' + pods = list_namespaced_pod() + pods.metadata._continue = None + pods.items = [pod1] + cli.cleanup_pods( + self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace']) + ) + delete_pod.assert_called_with('dummy4', 'awesome-namespace') + load_incluster_config.assert_called_once() + + @mock.patch('airflow.bin.cli._delete_pod') + @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod') + @mock.patch('kubernetes.config.load_incluster_config') + def test_cleanup_api_exception_continue(self, load_incluster_config, list_namespaced_pod, delete_pod): + delete_pod.side_effect = kubernetes.client.rest.ApiException(status=0) + pod1 = MagicMock() + pod1.metadata.name = 'dummy' + pod1.status.phase = 'Succeeded' + pod1.status.reason = None + pods = list_namespaced_pod() + pods.metadata._continue = None + pods.items = [pod1] + cli.cleanup_pods( + self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace']) + ) + load_incluster_config.assert_called_once()
