This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 338b412 Add on_kill support for the KubernetesPodOperator (#10666)
338b412 is described below
commit 338b412c04abc3fef8126f9724b448d1a9fd0bbc
Author: Daniel Imberman <[email protected]>
AuthorDate: Wed Sep 2 07:48:29 2020 -0700
Add on_kill support for the KubernetesPodOperator (#10666)
This PR ensures that when a user kills a KubernetesPodOperator task
in the airflow UI, that the associated pod is also killed using the
on_kill method.
---
.../cncf/kubernetes/operators/kubernetes_pod.py | 10 ++++++++
kubernetes_tests/test_kubernetes_pod_operator.py | 28 ++++++++++++++++++++++
2 files changed, 38 insertions(+)
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index cc1e20b..2785109 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -235,6 +235,7 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
self.priority_class_name = priority_class_name
self.pod_template_file = pod_template_file
self.name = self._set_name(name)
+ self.client = None
@staticmethod
def create_labels_for_pod(context) -> dict:
@@ -270,6 +271,8 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
client =
kube_client.get_kube_client(cluster_context=self.cluster_context,
config_file=self.config_file)
+ self.client = client
+
# Add combination of labels to uniquely identify a running pod
labels = self.create_labels_for_pod(context)
@@ -441,3 +444,10 @@ class KubernetesPodOperator(BaseOperator): # pylint:
disable=too-many-instance-
'Pod returned a failure: {state}'.format(state=final_state)
)
return final_state, result
+
+ def on_kill(self) -> None:
+ if self.pod:
+ pod: k8s.V1Pod = self.pod
+ namespace = pod.metadata.namespace
+ name = pod.metadata.name
+ self.client.delete_namespaced_pod(name=name, namespace=namespace,
grace_period_seconds=0)
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 565703e..4a5e618 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -877,5 +877,33 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
do_xcom_push=False,
)
+ @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
+ def test_on_kill(self,
+ monitor_mock): # pylint: disable=unused-argument
+ from airflow.utils.state import State
+ client = kube_client.get_kube_client(in_cluster=False)
+ name = "test"
+ namespace = "default"
+ k = KubernetesPodOperator(
+ namespace='default',
+ image="ubuntu:16.04",
+ cmds=["bash", "-cx"],
+ arguments=["sleep 1000"],
+ labels={"foo": "bar"},
+ name="test",
+ task_id=name,
+ in_cluster=False,
+ do_xcom_push=False,
+ )
+ context = create_context(k)
+ monitor_mock.return_value = (State.SUCCESS, None)
+ k.execute(context)
+ name = k.pod.metadata.name
+ pod = client.read_namespaced_pod(name=name, namespace=namespace)
+ self.assertEqual(pod.status.phase, "Running")
+ k.on_kill()
+ with self.assertRaises(ApiException):
+ # pod should be deleted
+ client.read_namespaced_pod(name=name, namespace=namespace)
# pylint: enable=unused-argument