This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ab5aabe50b Implement delete_on_status parameter for 
KubernetesDeleteJobOperator (#38458)
ab5aabe50b is described below

commit ab5aabe50b1023a7db0d256751eadd033091af63
Author: max <[email protected]>
AuthorDate: Tue Apr 2 18:19:03 2024 +0200

    Implement delete_on_status parameter for KubernetesDeleteJobOperator 
(#38458)
    
    * Implement on_status parameter for KubernetesDeleteJobOperator
    
    * rename parameters
---
 .../providers/cncf/kubernetes/hooks/kubernetes.py  |  43 +++++---
 airflow/providers/cncf/kubernetes/operators/job.py |  45 ++++++++-
 .../cncf/kubernetes/hooks/test_kubernetes.py       |  57 +++++++++++
 .../cncf/kubernetes/operators/test_job.py          | 112 ++++++++++++++++++++-
 .../cncf/kubernetes/example_kubernetes_job.py      |   9 +-
 5 files changed, 241 insertions(+), 25 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py 
b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index ec67254e3c..1b3c4254e8 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -568,18 +568,19 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol):
 
         :return: Boolean indicating that the given job is complete.
         """
-        if conditions := job.status.conditions:
-            if final_condition_types := list(
-                c for c in conditions if c.type in 
JOB_FINAL_STATUS_CONDITION_TYPES and c.status
-            ):
-                s = "s" if len(final_condition_types) > 1 else ""
-                self.log.info(
-                    "The job '%s' state%s: %s",
-                    job.metadata.name,
-                    s,
-                    ", ".join(f"{c.type} at {c.last_transition_time}" for c in 
final_condition_types),
-                )
-                return True
+        if status := job.status:
+            if conditions := status.conditions:
+                if final_condition_types := list(
+                    c for c in conditions if c.type in 
JOB_FINAL_STATUS_CONDITION_TYPES and c.status
+                ):
+                    s = "s" if len(final_condition_types) > 1 else ""
+                    self.log.info(
+                        "The job '%s' state%s: %s",
+                        job.metadata.name,
+                        s,
+                        ", ".join(f"{c.type} at {c.last_transition_time}" for 
c in final_condition_types),
+                    )
+                    return True
         return False
 
     @staticmethod
@@ -588,9 +589,21 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol):
 
         :return: Error message if the job is failed, and False otherwise.
         """
-        conditions = job.status.conditions or []
-        if fail_condition := next((c for c in conditions if c.type == "Failed" 
and c.status), None):
-            return fail_condition.reason
+        if status := job.status:
+            conditions = status.conditions or []
+            if fail_condition := next((c for c in conditions if c.type == 
"Failed" and c.status), None):
+                return fail_condition.reason
+        return False
+
+    @staticmethod
+    def is_job_successful(job: V1Job) -> str | bool:
+        """Check whether the given job is completed successfully..
+
+        :return: Error message if the job is failed, and False otherwise.
+        """
+        if status := job.status:
+            conditions = status.conditions or []
+            return bool(next((c for c in conditions if c.type == "Complete" 
and c.status), None))
         return False
 
     def patch_namespaced_job(self, job_name: str, namespace: str, body: 
object) -> V1Job:
diff --git a/airflow/providers/cncf/kubernetes/operators/job.py 
b/airflow/providers/cncf/kubernetes/operators/job.py
index 41d260bc98..eb7c646146 100644
--- a/airflow/providers/cncf/kubernetes/operators/job.py
+++ b/airflow/providers/cncf/kubernetes/operators/job.py
@@ -366,10 +366,17 @@ class KubernetesDeleteJobOperator(BaseOperator):
     :param in_cluster: run kubernetes client with in_cluster configuration.
     :param cluster_context: context that points to kubernetes cluster.
         Ignored when in_cluster is True. If None, current-context is used. 
(templated)
+    :param delete_on_status: Condition for performing delete operation 
depending on the job status. Values:
+        ``None`` - delete the job regardless of its status, "Complete" - 
delete only successfully completed
+        jobs, "Failed" - delete only failed jobs. (default: ``None``)
+    :param wait_for_completion: Whether to wait for the job to complete. 
(default: ``False``)
+    :param poll_interval: Interval in seconds between polling the job status. 
Used when the `delete_on_status`
+        parameter is set. (default: 10.0)
     """
 
     template_fields: Sequence[str] = (
         "config_file",
+        "name",
         "namespace",
         "cluster_context",
     )
@@ -383,6 +390,9 @@ class KubernetesDeleteJobOperator(BaseOperator):
         config_file: str | None = None,
         in_cluster: bool | None = None,
         cluster_context: str | None = None,
+        delete_on_status: str | None = None,
+        wait_for_completion: bool = False,
+        poll_interval: float = 10.0,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
@@ -392,6 +402,9 @@ class KubernetesDeleteJobOperator(BaseOperator):
         self.config_file = config_file
         self.in_cluster = in_cluster
         self.cluster_context = cluster_context
+        self.delete_on_status = delete_on_status
+        self.wait_for_completion = wait_for_completion
+        self.poll_interval = poll_interval
 
     @cached_property
     def hook(self) -> KubernetesHook:
@@ -408,9 +421,34 @@ class KubernetesDeleteJobOperator(BaseOperator):
 
     def execute(self, context: Context):
         try:
-            self.log.info("Deleting kubernetes Job: %s", self.name)
-            self.client.delete_namespaced_job(name=self.name, 
namespace=self.namespace)
-            self.log.info("Kubernetes job was deleted.")
+            if self.delete_on_status not in ("Complete", "Failed", None):
+                raise AirflowException(
+                    "The `delete_on_status` parameter must be one of 
'Complete', 'Failed' or None. "
+                    "The current value is %s",
+                    str(self.delete_on_status),
+                )
+
+            if self.wait_for_completion:
+                job = self.hook.wait_until_job_complete(
+                    job_name=self.name, namespace=self.namespace, 
job_poll_interval=self.poll_interval
+                )
+            else:
+                job = self.hook.get_job_status(job_name=self.name, 
namespace=self.namespace)
+
+            if (
+                self.delete_on_status is None
+                or (self.delete_on_status == "Complete" and 
self.hook.is_job_successful(job=job))
+                or (self.delete_on_status == "Failed" and 
self.hook.is_job_failed(job=job))
+            ):
+                self.log.info("Deleting kubernetes Job: %s", self.name)
+                self.client.delete_namespaced_job(name=self.name, 
namespace=self.namespace)
+                self.log.info("Kubernetes job was deleted.")
+            else:
+                self.log.info(
+                    "Deletion of the job %s was skipped due to settings of 
on_status=%s",
+                    self.name,
+                    self.delete_on_status,
+                )
         except ApiException as e:
             if e.status == 404:
                 self.log.info("The Kubernetes job %s does not exist.", 
self.name)
@@ -442,6 +480,7 @@ class KubernetesPatchJobOperator(BaseOperator):
 
     template_fields: Sequence[str] = (
         "config_file",
+        "name",
         "namespace",
         "body",
         "cluster_context",
diff --git a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py 
b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
index d23227bbc1..f02ccd1f3f 100644
--- a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
+++ b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
@@ -497,6 +497,52 @@ class TestKubernetesHook:
 
         assert actual_result == expected_result
 
+    @patch("kubernetes.config.kube_config.KubeConfigLoader")
+    @patch("kubernetes.config.kube_config.KubeConfigMerger")
+    def test_is_job_failed_no_status(self, mock_merger, mock_loader):
+        mock_job = mock.MagicMock()
+        mock_job.status = None
+
+        hook = KubernetesHook()
+        job_failed = hook.is_job_failed(mock_job)
+
+        assert not job_failed
+
+    @pytest.mark.parametrize(
+        "condition_type, status, expected_result",
+        [
+            ("Complete", False, False),
+            ("Complete", True, True),
+            ("Failed", False, False),
+            ("Failed", True, False),
+            ("Suspended", False, False),
+            ("Suspended", True, False),
+            ("Unknown", False, False),
+            ("Unknown", True, False),
+        ],
+    )
+    @patch("kubernetes.config.kube_config.KubeConfigLoader")
+    @patch("kubernetes.config.kube_config.KubeConfigMerger")
+    def test_is_job_successful(self, mock_merger, mock_loader, condition_type, 
status, expected_result):
+        mock_job = mock.MagicMock()
+        mock_job.status.conditions = [mock.MagicMock(type=condition_type, 
status=status)]
+
+        hook = KubernetesHook()
+        actual_result = hook.is_job_successful(mock_job)
+
+        assert actual_result == expected_result
+
+    @patch("kubernetes.config.kube_config.KubeConfigLoader")
+    @patch("kubernetes.config.kube_config.KubeConfigMerger")
+    def test_is_job_successful_no_status(self, mock_merger, mock_loader):
+        mock_job = mock.MagicMock()
+        mock_job.status = None
+
+        hook = KubernetesHook()
+        job_successful = hook.is_job_successful(mock_job)
+
+        assert not job_successful
+
     @pytest.mark.parametrize(
         "condition_type, status, expected_result",
         [
@@ -521,6 +567,17 @@ class TestKubernetesHook:
 
         assert actual_result == expected_result
 
+    @patch("kubernetes.config.kube_config.KubeConfigLoader")
+    @patch("kubernetes.config.kube_config.KubeConfigMerger")
+    def test_is_job_complete_no_status(self, mock_merger, mock_loader):
+        mock_job = mock.MagicMock()
+        mock_job.status = None
+
+        hook = KubernetesHook()
+        job_complete = hook.is_job_complete(mock_job)
+
+        assert not job_complete
+
     @patch("kubernetes.config.kube_config.KubeConfigLoader")
     @patch("kubernetes.config.kube_config.KubeConfigMerger")
     @patch(f"{HOOK_MODULE}.KubernetesHook.get_job_status")
diff --git a/tests/providers/cncf/kubernetes/operators/test_job.py 
b/tests/providers/cncf/kubernetes/operators/test_job.py
index 803523429c..a21dbeebd6 100644
--- a/tests/providers/cncf/kubernetes/operators/test_job.py
+++ b/tests/providers/cncf/kubernetes/operators/test_job.py
@@ -16,7 +16,9 @@
 # under the License.
 from __future__ import annotations
 
+import random
 import re
+import string
 from unittest import mock
 from unittest.mock import patch
 
@@ -41,6 +43,7 @@ HOOK_CLASS = JOB_OPERATORS_PATH.format("KubernetesHook")
 POLL_INTERVAL = 100
 JOB_NAME = "test-job"
 JOB_NAMESPACE = "test-namespace"
+JOB_POLL_INTERVAL = 20.0
 KUBERNETES_CONN_ID = "test-conn_id"
 
 
@@ -694,19 +697,120 @@ class TestKubernetesDeleteJobOperator:
 
         patch.stopall()
 
+    @patch(f"{HOOK_CLASS}.get_job_status")
+    @patch(f"{HOOK_CLASS}.wait_until_job_complete")
     @patch("kubernetes.config.load_kube_config")
     @patch("kubernetes.client.api.BatchV1Api.delete_namespaced_job")
-    def test_delete_execute(self, mock_delete_namespaced_job, 
mock_load_kube_config):
+    def test_execute(
+        self,
+        mock_delete_namespaced_job,
+        mock_load_kube_config,
+        mock_wait_until_job_complete,
+        mock_get_job_status,
+    ):
         op = KubernetesDeleteJobOperator(
             kubernetes_conn_id="kubernetes_default",
             task_id="test_delete_job",
-            name="test_job_name",
-            namespace="test_job_namespace",
+            name=JOB_NAME,
+            namespace=JOB_NAMESPACE,
         )
 
         op.execute(None)
 
-        mock_delete_namespaced_job.assert_called()
+        assert not mock_wait_until_job_complete.called
+        mock_get_job_status.assert_called_once_with(job_name=JOB_NAME, 
namespace=JOB_NAMESPACE)
+        mock_delete_namespaced_job.assert_called_once_with(name=JOB_NAME, 
namespace=JOB_NAMESPACE)
+
+    @patch(f"{HOOK_CLASS}.get_job_status")
+    @patch(f"{HOOK_CLASS}.wait_until_job_complete")
+    @patch("kubernetes.config.load_kube_config")
+    @patch("kubernetes.client.api.BatchV1Api.delete_namespaced_job")
+    def test_execute_wait_for_completion_true(
+        self,
+        mock_delete_namespaced_job,
+        mock_load_kube_config,
+        mock_wait_until_job_complete,
+        mock_get_job_status,
+    ):
+        op = KubernetesDeleteJobOperator(
+            kubernetes_conn_id="kubernetes_default",
+            task_id="test_delete_job",
+            name=JOB_NAME,
+            namespace=JOB_NAMESPACE,
+            wait_for_completion=True,
+            poll_interval=JOB_POLL_INTERVAL,
+        )
+
+        op.execute({})
+
+        mock_wait_until_job_complete.assert_called_once_with(
+            job_name=JOB_NAME, namespace=JOB_NAMESPACE, 
job_poll_interval=JOB_POLL_INTERVAL
+        )
+        assert not mock_get_job_status.called
+        mock_delete_namespaced_job.assert_called_once_with(name=JOB_NAME, 
namespace=JOB_NAMESPACE)
+
+    @pytest.mark.parametrize(
+        "on_status, success, fail, deleted",
+        [
+            (None, True, True, True),
+            (None, True, False, True),
+            (None, False, True, True),
+            (None, False, False, True),
+            ("Complete", True, True, True),
+            ("Complete", True, False, True),
+            ("Complete", False, True, False),
+            ("Complete", False, False, False),
+            ("Failed", True, True, True),
+            ("Failed", True, False, False),
+            ("Failed", False, True, True),
+            ("Failed", False, False, False),
+        ],
+    )
+    @patch(f"{HOOK_CLASS}.is_job_failed")
+    @patch(f"{HOOK_CLASS}.is_job_successful")
+    @patch("kubernetes.config.load_kube_config")
+    @patch("kubernetes.client.api.BatchV1Api.delete_namespaced_job")
+    def test_execute_delete_on_status(
+        self,
+        mock_delete_namespaced_job,
+        mock_load_kube_config,
+        mock_is_job_successful,
+        mock_is_job_failed,
+        on_status,
+        success,
+        fail,
+        deleted,
+    ):
+        mock_is_job_successful.return_value = success
+        mock_is_job_failed.return_value = fail
+
+        op = KubernetesDeleteJobOperator(
+            kubernetes_conn_id="kubernetes_default",
+            task_id="test_delete_job",
+            name=JOB_NAME,
+            namespace=JOB_NAMESPACE,
+            delete_on_status=on_status,
+        )
+
+        op.execute({})
+
+        assert mock_delete_namespaced_job.called == deleted
+
+    def test_execute_delete_on_status_exception(self):
+        invalid_delete_on_status = "".join(
+            random.choices(string.ascii_letters + string.digits, 
k=random.randint(1, 16))
+        )
+
+        op = KubernetesDeleteJobOperator(
+            kubernetes_conn_id="kubernetes_default",
+            task_id="test_delete_job",
+            name=JOB_NAME,
+            namespace=JOB_NAMESPACE,
+            delete_on_status=invalid_delete_on_status,
+        )
+
+        with pytest.raises(AirflowException):
+            op.execute({})
 
 
 @pytest.mark.execution_timeout(300)
diff --git a/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py 
b/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py
index 0f17f57a15..bf01712067 100644
--- a/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py
+++ b/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py
@@ -57,7 +57,7 @@ with DAG(
     update_job = KubernetesPatchJobOperator(
         task_id="update-job-task",
         namespace="default",
-        name=JOB_NAME,
+        name=k8s_job.output["job_name"],
         body={"spec": {"suspend": False}},
     )
     # [END howto_operator_update_job]
@@ -77,14 +77,17 @@ with DAG(
     # [START howto_operator_delete_k8s_job]
     delete_job_task = KubernetesDeleteJobOperator(
         task_id="delete_job_task",
-        name=JOB_NAME,
+        name=k8s_job.output["job_name"],
         namespace=JOB_NAMESPACE,
+        wait_for_completion=True,
+        delete_on_status="Complete",
+        poll_interval=1.0,
     )
     # [END howto_operator_delete_k8s_job]
 
     delete_job_task_def = KubernetesDeleteJobOperator(
         task_id="delete_job_task_def",
-        name=JOB_NAME + "-def",
+        name=k8s_job_def.output["job_name"],
         namespace=JOB_NAMESPACE,
     )
 

Reply via email to