This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 338b412  Add on_kill support for the KubernetesPodOperator (#10666)
338b412 is described below

commit 338b412c04abc3fef8126f9724b448d1a9fd0bbc
Author: Daniel Imberman <[email protected]>
AuthorDate: Wed Sep 2 07:48:29 2020 -0700

    Add on_kill support for the KubernetesPodOperator (#10666)
    
    This PR ensures that when a user kills a KubernetesPodOperator task
    in the airflow UI, that the associated pod is also killed using the
    on_kill method.
---
 .../cncf/kubernetes/operators/kubernetes_pod.py    | 10 ++++++++
 kubernetes_tests/test_kubernetes_pod_operator.py   | 28 ++++++++++++++++++++++
 2 files changed, 38 insertions(+)

diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py 
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index cc1e20b..2785109 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -235,6 +235,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
         self.priority_class_name = priority_class_name
         self.pod_template_file = pod_template_file
         self.name = self._set_name(name)
+        self.client = None
 
     @staticmethod
     def create_labels_for_pod(context) -> dict:
@@ -270,6 +271,8 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
                 client = 
kube_client.get_kube_client(cluster_context=self.cluster_context,
                                                      
config_file=self.config_file)
 
+            self.client = client
+
             # Add combination of labels to uniquely identify a running pod
             labels = self.create_labels_for_pod(context)
 
@@ -441,3 +444,10 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
                 'Pod returned a failure: {state}'.format(state=final_state)
             )
         return final_state, result
+
+    def on_kill(self) -> None:
+        if self.pod:
+            pod: k8s.V1Pod = self.pod
+            namespace = pod.metadata.namespace
+            name = pod.metadata.name
+            self.client.delete_namespaced_pod(name=name, namespace=namespace, 
grace_period_seconds=0)
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 565703e..4a5e618 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -877,5 +877,33 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
                 do_xcom_push=False,
             )
 
+    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+    def test_on_kill(self,
+                     monitor_mock):  # pylint: disable=unused-argument
+        from airflow.utils.state import State
+        client = kube_client.get_kube_client(in_cluster=False)
+        name = "test"
+        namespace = "default"
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["sleep 1000"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id=name,
+            in_cluster=False,
+            do_xcom_push=False,
+        )
+        context = create_context(k)
+        monitor_mock.return_value = (State.SUCCESS, None)
+        k.execute(context)
+        name = k.pod.metadata.name
+        pod = client.read_namespaced_pod(name=name, namespace=namespace)
+        self.assertEqual(pod.status.phase, "Running")
+        k.on_kill()
+        with self.assertRaises(ApiException):
+            # pod should be deleted
+            client.read_namespaced_pod(name=name, namespace=namespace)
 
 # pylint: enable=unused-argument

Reply via email to