kaxil commented on a change in pull request #11802:
URL: https://github.com/apache/airflow/pull/11802#discussion_r516276831



##########
File path: tests/cli/commands/test_kubernetes_command.py
##########
@@ -40,3 +43,90 @@ def test_generate_dag_yaml(self):
             self.assertEqual(len(os.listdir(out_dir)), 6)
             self.assertTrue(os.path.isfile(out_dir + file_name))
             self.assertGreater(os.stat(out_dir + file_name).st_size, 0)
+
+    @mock.patch('kubernetes.client.CoreV1Api.delete_namespaced_pod')
+    def test_delete_pod(self, delete_namespaced_pod):
+        kubernetes_command._delete_pod('dummy', 'awesome-namespace')
+        delete_namespaced_pod.assert_called_with(body=mock.ANY, name='dummy', 
namespace='awesome-namespace')
+
+    @mock.patch('airflow.cli.commands.kubernetes_command._delete_pod')
+    @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+    @mock.patch('kubernetes.config.load_incluster_config')
+    def test_cleanup_succeeded_pods(self, load_incluster_config, 
list_namespaced_pod, delete_pod):
+        pod1 = MagicMock()
+        pod1.metadata.name = 'dummy'
+        pod1.status.phase = 'Succeeded'
+        pod1.status.reason = None
+        list_namespaced_pod().items = [pod1]
+        kubernetes_command.cleanup_pods(
+            self.parser.parse_args(['kubernetes', 'cleanup-pods', 
'--namespace', 'awesome-namespace'])
+        )
+        delete_pod.assert_called_with('dummy', 'awesome-namespace')
+        load_incluster_config.assert_called_once()
+
+    @mock.patch('airflow.cli.commands.kubernetes_command._delete_pod')
+    @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+    @mock.patch('kubernetes.config.load_incluster_config')
+    def test_no_cleanup_failed_pods_wo_restart_policy_never(
+        self, load_incluster_config, list_namespaced_pod, delete_pod
+    ):
+        pod1 = MagicMock()
+        pod1.metadata.name = 'dummy2'
+        pod1.status.phase = 'Failed'
+        pod1.status.reason = None
+        pod1.spec.restart_policy = 'Always'
+        list_namespaced_pod().items = [pod1]
+        kubernetes_command.cleanup_pods(
+            self.parser.parse_args(['kubernetes', 'cleanup-pods', 
'--namespace', 'awesome-namespace'])
+        )
+        delete_pod.assert_not_called()
+        load_incluster_config.assert_called_once()
+
+    @mock.patch('airflow.cli.commands.kubernetes_command._delete_pod')
+    @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+    @mock.patch('kubernetes.config.load_incluster_config')
+    def test_cleanup_failed_pods_w_restart_policy_never(
+        self, load_incluster_config, list_namespaced_pod, delete_pod
+    ):
+        pod1 = MagicMock()
+        pod1.metadata.name = 'dummy3'
+        pod1.status.phase = 'Failed'
+        pod1.status.reason = None
+        pod1.spec.restart_policy = 'Never'
+        list_namespaced_pod().items = [pod1]
+        kubernetes_command.cleanup_pods(
+            self.parser.parse_args(['kubernetes', 'cleanup-pods', 
'--namespace', 'awesome-namespace'])
+        )
+        delete_pod.assert_called_with('dummy3', 'awesome-namespace')
+        load_incluster_config.assert_called_once()
+
+    @mock.patch('airflow.cli.commands.kubernetes_command._delete_pod')
+    @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+    @mock.patch('kubernetes.config.load_incluster_config')
+    def test_cleanup_evicted_pods(self, load_incluster_config, 
list_namespaced_pod, delete_pod):
+        pod1 = MagicMock()
+        pod1.metadata.name = 'dummy4'
+        pod1.status.phase = 'Failed'
+        pod1.status.reason = 'Evicted'
+        pod1.spec.restart_policy = 'Never'
+        list_namespaced_pod().items = [pod1]
+        kubernetes_command.cleanup_pods(
+            self.parser.parse_args(['kubernetes', 'cleanup-pods', 
'--namespace', 'awesome-namespace'])
+        )
+        delete_pod.assert_called_with('dummy4', 'awesome-namespace')
+        load_incluster_config.assert_called_once()
+
+    @mock.patch('airflow.cli.commands.kubernetes_command._delete_pod')
+    @mock.patch('kubernetes.client.CoreV1Api.list_namespaced_pod')
+    @mock.patch('kubernetes.config.load_incluster_config')
+    def test_cleanup_api_exception_continue(self, load_incluster_config, 
list_namespaced_pod, delete_pod):

Review comment:
       Done in 
https://github.com/apache/airflow/pull/11802/commits/5e4a0785ea10d2cb9295423959e53f91b8a5469d
   
   
https://github.com/apache/airflow/pull/11802/commits/5e4a0785ea10d2cb9295423959e53f91b8a5469d#diff-a4427b7ffbc41c6c6e1fa8141b2b39029d9eeb44b897f8ab57fe9d7998557518R60-R73

##########
File path: tests/cli/commands/test_kubernetes_command.py
##########
@@ -40,3 +43,90 @@ def test_generate_dag_yaml(self):
             self.assertEqual(len(os.listdir(out_dir)), 6)
             self.assertTrue(os.path.isfile(out_dir + file_name))
             self.assertGreater(os.stat(out_dir + file_name).st_size, 0)
+
+    @mock.patch('kubernetes.client.CoreV1Api.delete_namespaced_pod')

Review comment:
       Done in 
https://github.com/apache/airflow/pull/11802/commits/5e4a0785ea10d2cb9295423959e53f91b8a5469d

##########
File path: airflow/cli/commands/kubernetes_command.py
##########
@@ -61,3 +63,64 @@ def generate_pod_yaml(args):
             sanitized_pod = api_client.sanitize_for_serialization(pod)
             output.write(yaml.dump(sanitized_pod))
     print(f"YAML output can be found at 
{yaml_output_path}/airflow_yaml_output/")
+
+
+@cli_utils.action_logging
+def cleanup_pods(args):
+    """Clean up k8s pods in evicted/failed/succeeded states"""
+    namespace = args.namespace
+
+    # https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
+    # All Containers in the Pod have terminated in success, and will not be 
restarted.
+    pod_succeeded = 'succeeded'
+
+    # All Containers in the Pod have terminated, and at least one Container 
has terminated in failure.
+    # That is, the Container either exited with non-zero status or was 
terminated by the system.
+    pod_failed = 'failed'
+
+    # https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/
+    pod_reason_evicted = 'evicted'
+    # If pod is failed and restartPolicy is:
+    # * Always: Restart Container; Pod phase stays Running.
+    # * OnFailure: Restart Container; Pod phase stays Running.
+    # * Never: Pod phase becomes Failed.
+    pod_restart_policy_never = 'never'
+
+    print('Loading Kubernetes configuration')
+    config.load_incluster_config()

Review comment:
       Done in 
https://github.com/apache/airflow/pull/11802/commits/5e4a0785ea10d2cb9295423959e53f91b8a5469d

##########
File path: airflow/cli/commands/kubernetes_command.py
##########
@@ -61,3 +63,64 @@ def generate_pod_yaml(args):
             sanitized_pod = api_client.sanitize_for_serialization(pod)
             output.write(yaml.dump(sanitized_pod))
     print(f"YAML output can be found at 
{yaml_output_path}/airflow_yaml_output/")
+
+
+@cli_utils.action_logging
+def cleanup_pods(args):
+    """Clean up k8s pods in evicted/failed/succeeded states"""
+    namespace = args.namespace
+
+    # https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
+    # All Containers in the Pod have terminated in success, and will not be 
restarted.
+    pod_succeeded = 'succeeded'
+
+    # All Containers in the Pod have terminated, and at least one Container 
has terminated in failure.
+    # That is, the Container either exited with non-zero status or was 
terminated by the system.
+    pod_failed = 'failed'
+
+    # https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/
+    pod_reason_evicted = 'evicted'
+    # If pod is failed and restartPolicy is:
+    # * Always: Restart Container; Pod phase stays Running.
+    # * OnFailure: Restart Container; Pod phase stays Running.
+    # * Never: Pod phase becomes Failed.
+    pod_restart_policy_never = 'never'
+
+    print('Loading Kubernetes configuration')
+    config.load_incluster_config()
+    core_v1 = client.CoreV1Api()
+    print(f'Listing pods in namespace {namespace}')
+    pod_list = core_v1.list_namespaced_pod(namespace)

Review comment:
       Done in 
https://github.com/apache/airflow/pull/11802/commits/5e4a0785ea10d2cb9295423959e53f91b8a5469d

##########
File path: airflow/cli/commands/kubernetes_command.py
##########
@@ -61,3 +63,64 @@ def generate_pod_yaml(args):
             sanitized_pod = api_client.sanitize_for_serialization(pod)
             output.write(yaml.dump(sanitized_pod))
     print(f"YAML output can be found at 
{yaml_output_path}/airflow_yaml_output/")
+
+
+@cli_utils.action_logging
+def cleanup_pods(args):
+    """Clean up k8s pods in evicted/failed/succeeded states"""
+    namespace = args.namespace
+
+    # https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
+    # All Containers in the Pod have terminated in success, and will not be 
restarted.
+    pod_succeeded = 'succeeded'
+
+    # All Containers in the Pod have terminated, and at least one Container 
has terminated in failure.
+    # That is, the Container either exited with non-zero status or was 
terminated by the system.
+    pod_failed = 'failed'
+
+    # https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/
+    pod_reason_evicted = 'evicted'
+    # If pod is failed and restartPolicy is:
+    # * Always: Restart Container; Pod phase stays Running.
+    # * OnFailure: Restart Container; Pod phase stays Running.
+    # * Never: Pod phase becomes Failed.
+    pod_restart_policy_never = 'never'
+
+    print('Loading Kubernetes configuration')
+    config.load_incluster_config()
+    core_v1 = client.CoreV1Api()
+    print(f'Listing pods in namespace {namespace}')
+    pod_list = core_v1.list_namespaced_pod(namespace)
+
+    for pod in pod_list.items:
+        pod_name = pod.metadata.name
+        print(f'Inspecting pod {pod_name}')
+        pod_phase = pod.status.phase.lower()
+        pod_reason = pod.status.reason.lower() if pod.status.reason else ''
+        pod_restart_policy = pod.spec.restart_policy.lower()
+
+        if (
+            pod_phase == pod_succeeded
+            or (pod_phase == pod_failed and pod_restart_policy == 
pod_restart_policy_never)
+            or (pod_reason == pod_reason_evicted)
+        ):
+
+            print(
+                f'Deleting pod "{pod_name}" phase "{pod_phase}" and reason 
"{pod_reason}", '
+                f'restart policy "{pod_restart_policy}"'
+            )
+            try:
+                _delete_pod(pod.metadata.name, namespace)
+            except ApiException as e:
+                print(f"can't remove POD: {e}", file=sys.stderr)
+            continue
+        print(f'No action taken on pod {pod_name}')
+
+
+def _delete_pod(name, namespace):
+    """Helper Function for cleanup_pods"""
+    core_v1 = client.CoreV1Api()
+    delete_options = client.V1DeleteOptions()
+    print(f'Deleting POD "{name}" from "{namespace}" namespace')
+    api_response = core_v1.delete_namespaced_pod(name=name, 
namespace=namespace, body=delete_options)
+    print(api_response)

Review comment:
       Yeah this will be very helpful when something goes wrong

##########
File path: chart/templates/cleanup/cleanup-cronjob.yaml
##########
@@ -61,7 +61,7 @@ spec:
               image: {{ template "default_airflow_image" . }}
               imagePullPolicy: {{ .Values.images.airflow.pullPolicy }}
               # Don't use entry point here, we don't need to wait on 
pg-bouncer etc being available.
-              command: ["airflow-cleanup-pods", "--namespace={{ 
.Release.Namespace }}"]
+              args: ["kubernetes", "cleanup-pods", "--namespace={{ 
.Release.Namespace }}"]

Review comment:
       Yes I am planning to backport this, will add 1.10.13 milestone




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to