This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 315005b2ffe7b180f5d4f873278606fd8ffaf1ca Author: Xiaodong DENG <[email protected]> AuthorDate: Thu Apr 8 12:11:08 2021 +0200 BugFix: CLI 'kubernetes cleanup-pods' should only clean up Airflow-created Pods (#15204) closes: #15193 Currently condition if the pod is created by Airflow is not considered. This commit fixes this. We decide if the Pod is created by Airflow via checking if it has all the labels added in PodGenerator.construct_pod() or KubernetesPodOperator.create_labels_for_pod(). (cherry picked from commit c594d9cfb32bbcfe30af3f5dcb452c6053cacc95) --- airflow/cli/cli_parser.py | 6 +++- airflow/cli/commands/kubernetes_command.py | 18 +++++++++++- tests/cli/commands/test_kubernetes_command.py | 40 +++++++++++++++++++++------ 3 files changed, 54 insertions(+), 10 deletions(-) diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index c33a854..b5384b4 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -1341,7 +1341,11 @@ CONFIG_COMMANDS = ( KUBERNETES_COMMANDS = ( ActionCommand( name='cleanup-pods', - help="Clean up Kubernetes pods in evicted/failed/succeeded states", + help=( + "Clean up Kubernetes pods " + "(created by KubernetesExecutor/KubernetesPodOperator) " + "in evicted/failed/succeeded states" + ), func=lazy_load_command('airflow.cli.commands.kubernetes_command.cleanup_pods'), args=(ARG_NAMESPACE,), ), diff --git a/airflow/cli/commands/kubernetes_command.py b/airflow/cli/commands/kubernetes_command.py index f98c45e..daf11a3 100644 --- a/airflow/cli/commands/kubernetes_command.py +++ b/airflow/cli/commands/kubernetes_command.py @@ -90,7 +90,23 @@ def cleanup_pods(args): print('Loading Kubernetes configuration') kube_client = get_kube_client() print(f'Listing pods in namespace {namespace}') - list_kwargs = {"namespace": namespace, "limit": 500} + airflow_pod_labels = [ + 'dag_id', + 'task_id', + 'execution_date', + 'try_number', + 'airflow_version', + ] + list_kwargs = { + "namespace": namespace, + "limit": 500, + "label_selector": client.V1LabelSelector( + match_expressions=[ + client.V1LabelSelectorRequirement(key=label, operator="Exists") + for label in airflow_pod_labels + ] + ), + } while True: # pylint: disable=too-many-nested-blocks pod_list = kube_client.list_namespaced_pod(**list_kwargs) for pod in pod_list.items: diff --git a/tests/cli/commands/test_kubernetes_command.py b/tests/cli/commands/test_kubernetes_command.py index 8ae2eef..707eb55 100644 --- a/tests/cli/commands/test_kubernetes_command.py +++ b/tests/cli/commands/test_kubernetes_command.py @@ -55,6 +55,13 @@ class TestGenerateDagYamlCommand(unittest.TestCase): class TestCleanUpPodsCommand(unittest.TestCase): + label_selector = kubernetes.client.V1LabelSelector( + match_expressions=[ + kubernetes.client.V1LabelSelectorRequirement(key=label, operator="Exists") + for label in ['dag_id', 'task_id', 'execution_date', 'try_number', 'airflow_version'] + ] + ) + @classmethod def setUpClass(cls): cls.parser = cli_parser.get_parser() @@ -79,7 +86,9 @@ class TestCleanUpPodsCommand(unittest.TestCase): kubernetes_command.cleanup_pods( self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace']) ) - list_namespaced_pod.assert_called_once_with(namespace='awesome-namespace', limit=500) + list_namespaced_pod.assert_called_once_with( + namespace='awesome-namespace', limit=500, label_selector=self.label_selector + ) delete_pod.assert_not_called() load_incluster_config.assert_called_once() @@ -98,7 +107,9 @@ class TestCleanUpPodsCommand(unittest.TestCase): kubernetes_command.cleanup_pods( self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace']) ) - list_namespaced_pod.assert_called_once_with(namespace='awesome-namespace', limit=500) + list_namespaced_pod.assert_called_once_with( + namespace='awesome-namespace', limit=500, label_selector=self.label_selector + ) delete_pod.assert_called_with('dummy', 'awesome-namespace') load_incluster_config.assert_called_once() @@ -120,7 +131,9 @@ class TestCleanUpPodsCommand(unittest.TestCase): kubernetes_command.cleanup_pods( self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace']) ) - list_namespaced_pod.assert_called_once_with(namespace='awesome-namespace', limit=500) + list_namespaced_pod.assert_called_once_with( + namespace='awesome-namespace', limit=500, label_selector=self.label_selector + ) delete_pod.assert_not_called() load_incluster_config.assert_called_once() @@ -142,7 +155,9 @@ class TestCleanUpPodsCommand(unittest.TestCase): kubernetes_command.cleanup_pods( self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace']) ) - list_namespaced_pod.assert_called_once_with(namespace='awesome-namespace', limit=500) + list_namespaced_pod.assert_called_once_with( + namespace='awesome-namespace', limit=500, label_selector=self.label_selector + ) delete_pod.assert_called_with('dummy3', 'awesome-namespace') load_incluster_config.assert_called_once() @@ -162,7 +177,9 @@ class TestCleanUpPodsCommand(unittest.TestCase): kubernetes_command.cleanup_pods( self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace']) ) - list_namespaced_pod.assert_called_once_with(namespace='awesome-namespace', limit=500) + list_namespaced_pod.assert_called_once_with( + namespace='awesome-namespace', limit=500, label_selector=self.label_selector + ) delete_pod.assert_called_with('dummy4', 'awesome-namespace') load_incluster_config.assert_called_once() @@ -182,7 +199,9 @@ class TestCleanUpPodsCommand(unittest.TestCase): kubernetes_command.cleanup_pods( self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace']) ) - list_namespaced_pod.assert_called_once_with(namespace='awesome-namespace', limit=500) + list_namespaced_pod.assert_called_once_with( + namespace='awesome-namespace', limit=500, label_selector=self.label_selector + ) load_incluster_config.assert_called_once() @mock.patch('airflow.cli.commands.kubernetes_command._delete_pod') @@ -204,8 +223,13 @@ class TestCleanUpPodsCommand(unittest.TestCase): self.parser.parse_args(['kubernetes', 'cleanup-pods', '--namespace', 'awesome-namespace']) ) calls = [ - call.first(namespace='awesome-namespace', limit=500), - call.second(namespace='awesome-namespace', limit=500, _continue='dummy-token'), + call.first(namespace='awesome-namespace', limit=500, label_selector=self.label_selector), + call.second( + namespace='awesome-namespace', + limit=500, + label_selector=self.label_selector, + _continue='dummy-token', + ), ] list_namespaced_pod.assert_has_calls(calls) delete_pod.assert_called_with('dummy', 'awesome-namespace')
