This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a9b71a129 Create KubernetesPatchJobOperator operator (#38146)
1a9b71a129 is described below

commit 1a9b71a1298da76fc254f670e1032fa12131901a
Author: Maksim <[email protected]>
AuthorDate: Thu Mar 21 11:05:46 2024 +0100

    Create KubernetesPatchJobOperator operator (#38146)
---
 .../providers/cncf/kubernetes/hooks/kubernetes.py  | 14 +++++
 airflow/providers/cncf/kubernetes/operators/job.py | 69 ++++++++++++++++++++++
 .../operators.rst                                  | 15 +++++
 .../cncf/kubernetes/operators/test_job.py          | 33 ++++++++++-
 .../cncf/kubernetes/example_kubernetes_job.py      | 17 +++++-
 5 files changed, 145 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py 
b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 349f886071..ffa484a9db 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -590,6 +590,20 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol):
         conditions = job.status.conditions or []
         return bool(next((c for c in conditions if c.type == "Failed" and 
c.status), None))
 
+    def patch_namespaced_job(self, job_name: str, namespace: str, body: 
object) -> V1Job:
+        """
+        Update the specified Job.
+
+        :param job_name: name of the Job
+        :param namespace: the namespace to run within kubernetes
+        :param body: json object with parameters for update
+        """
+        return self.batch_v1_client.patch_namespaced_job(
+            name=job_name,
+            namespace=namespace,
+            body=body,
+        )
+
 
 def _get_bool(val) -> bool | None:
     """Convert val to bool if can be done with certainty; if we cannot infer 
intention we return None."""
diff --git a/airflow/providers/cncf/kubernetes/operators/job.py 
b/airflow/providers/cncf/kubernetes/operators/job.py
index a3f36c7166..d487a789e2 100644
--- a/airflow/providers/cncf/kubernetes/operators/job.py
+++ b/airflow/providers/cncf/kubernetes/operators/job.py
@@ -38,6 +38,7 @@ from 
airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
 from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
 from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, 
merge_objects
 from airflow.utils import yaml
+from airflow.utils.context import Context
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -377,3 +378,71 @@ class KubernetesDeleteJobOperator(BaseOperator):
                 self.log.info("The Kubernetes job %s does not exist.", 
self.name)
             else:
                 raise e
+
+
+class KubernetesPatchJobOperator(BaseOperator):
+    """
+    Update a Kubernetes Job.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:KubernetesPatchJobOperator`
+
+    :param name: name of the Job
+    :param namespace: the namespace to run within kubernetes
+    :param body: Job json object with parameters for update
+        
https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#job-v1-batch
+        e.g. ``{"spec": {"suspend": True}}``
+    :param kubernetes_conn_id: The :ref:`kubernetes connection id 
<howto/connection:kubernetes>`
+        for the Kubernetes cluster.
+    :param config_file: The path to the Kubernetes config file. (templated)
+        If not specified, default value is ``~/.kube/config``
+    :param in_cluster: run kubernetes client with in_cluster configuration.
+    :param cluster_context: context that points to kubernetes cluster.
+        Ignored when in_cluster is True. If None, current-context is used. 
(templated)
+    """
+
+    template_fields: Sequence[str] = (
+        "config_file",
+        "namespace",
+        "body",
+        "cluster_context",
+    )
+
+    def __init__(
+        self,
+        *,
+        name: str,
+        namespace: str,
+        body: object,
+        kubernetes_conn_id: str | None = KubernetesHook.default_conn_name,
+        config_file: str | None = None,
+        in_cluster: bool | None = None,
+        cluster_context: str | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.name = name
+        self.namespace = namespace
+        self.body = body
+        self.kubernetes_conn_id = kubernetes_conn_id
+        self.config_file = config_file
+        self.in_cluster = in_cluster
+        self.cluster_context = cluster_context
+
+    @cached_property
+    def hook(self) -> KubernetesHook:
+        return KubernetesHook(
+            conn_id=self.kubernetes_conn_id,
+            in_cluster=self.in_cluster,
+            config_file=self.config_file,
+            cluster_context=self.cluster_context,
+        )
+
+    def execute(self, context: Context) -> dict:
+        self.log.info("Updating existing Job: %s", self.name)
+        job_object = self.hook.patch_namespaced_job(
+            job_name=self.name, namespace=self.namespace, body=self.body
+        )
+        self.log.info("Job was updated.")
+        return k8s.V1Job.to_dict(job_object)
diff --git a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst 
b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
index 4307aad95d..3fc7008549 100644
--- a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
+++ b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
@@ -643,3 +643,18 @@ you to delete Jobs on a Kubernetes cluster.
     :dedent: 4
     :start-after: [START howto_operator_delete_k8s_job]
     :end-before: [END howto_operator_delete_k8s_job]
+
+
+.. _howto/operator:KubernetesPatchJobOperator:
+
+KubernetesPatchJobOperator
+==========================
+
+The 
:class:`~airflow.providers.cncf.kubernetes.operators.job.KubernetesPatchJobOperator`
 allows
+you to update Jobs on a Kubernetes cluster.
+
+.. exampleinclude:: 
/../../tests/system/providers/cncf/kubernetes/example_kubernetes_job.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_update_job]
+    :end-before: [END howto_operator_update_job]
diff --git a/tests/providers/cncf/kubernetes/operators/test_job.py 
b/tests/providers/cncf/kubernetes/operators/test_job.py
index cd9f3835af..a5742abbbb 100644
--- a/tests/providers/cncf/kubernetes/operators/test_job.py
+++ b/tests/providers/cncf/kubernetes/operators/test_job.py
@@ -26,7 +26,11 @@ from kubernetes.client import ApiClient, models as k8s
 
 from airflow.exceptions import AirflowException
 from airflow.models import DAG, DagModel, DagRun, TaskInstance
-from airflow.providers.cncf.kubernetes.operators.job import 
KubernetesDeleteJobOperator, KubernetesJobOperator
+from airflow.providers.cncf.kubernetes.operators.job import (
+    KubernetesDeleteJobOperator,
+    KubernetesJobOperator,
+    KubernetesPatchJobOperator,
+)
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.types import DagRunType
@@ -545,3 +549,30 @@ class TestKubernetesDeleteJobOperator:
         op.execute(None)
 
         mock_delete_namespaced_job.assert_called()
+
+
[email protected]_timeout(300)
+class TestKubernetesPatchJobOperator:
+    @pytest.fixture(autouse=True)
+    def setup_tests(self):
+        self._default_client_patch = patch(f"{HOOK_CLASS}._get_default_client")
+        self._default_client_mock = self._default_client_patch.start()
+
+        yield
+
+        patch.stopall()
+
+    @patch("kubernetes.config.load_kube_config")
+    @patch("kubernetes.client.api.BatchV1Api.patch_namespaced_job")
+    def test_update_execute(self, mock_patch_namespaced_job, 
mock_load_kube_config):
+        op = KubernetesPatchJobOperator(
+            kubernetes_conn_id="kubernetes_default",
+            task_id="test_update_job",
+            name="test_job_name",
+            namespace="test_job_namespace",
+            body={"spec": {"suspend": False}},
+        )
+
+        op.execute(None)
+
+        mock_patch_namespaced_job.assert_called()
diff --git a/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py 
b/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py
index ab8fcb393d..a5c97d9c95 100644
--- a/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py
+++ b/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py
@@ -25,7 +25,11 @@ import os
 from datetime import datetime
 
 from airflow import DAG
-from airflow.providers.cncf.kubernetes.operators.job import 
KubernetesDeleteJobOperator, KubernetesJobOperator
+from airflow.providers.cncf.kubernetes.operators.job import (
+    KubernetesDeleteJobOperator,
+    KubernetesJobOperator,
+    KubernetesPatchJobOperator,
+)
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 DAG_ID = "example_kubernetes_job_operator"
@@ -49,6 +53,15 @@ with DAG(
     )
     # [END howto_operator_k8s_job]
 
+    # [START howto_operator_update_job]
+    update_job = KubernetesPatchJobOperator(
+        task_id="update-job-task",
+        namespace="default",
+        name="test-pi",
+        body={"spec": {"suspend": False}},
+    )
+    # [END howto_operator_update_job]
+
     # [START howto_operator_delete_k8s_job]
     delete_job_task = KubernetesDeleteJobOperator(
         task_id="delete_job_task",
@@ -57,7 +70,7 @@ with DAG(
     )
     # [END howto_operator_delete_k8s_job]
 
-    k8s_job >> delete_job_task
+    k8s_job >> update_job >> delete_job_task
 
     from tests.system.utils.watcher import watcher
 

Reply via email to