This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1a9b71a129 Create KubernetesPatchJobOperator operator (#38146)
1a9b71a129 is described below
commit 1a9b71a1298da76fc254f670e1032fa12131901a
Author: Maksim <[email protected]>
AuthorDate: Thu Mar 21 11:05:46 2024 +0100
Create KubernetesPatchJobOperator operator (#38146)
---
.../providers/cncf/kubernetes/hooks/kubernetes.py | 14 +++++
airflow/providers/cncf/kubernetes/operators/job.py | 69 ++++++++++++++++++++++
.../operators.rst | 15 +++++
.../cncf/kubernetes/operators/test_job.py | 33 ++++++++++-
.../cncf/kubernetes/example_kubernetes_job.py | 17 +++++-
5 files changed, 145 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 349f886071..ffa484a9db 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -590,6 +590,20 @@ class KubernetesHook(BaseHook, PodOperatorHookProtocol):
conditions = job.status.conditions or []
return bool(next((c for c in conditions if c.type == "Failed" and
c.status), None))
+ def patch_namespaced_job(self, job_name: str, namespace: str, body:
object) -> V1Job:
+ """
+ Update the specified Job.
+
+ :param job_name: name of the Job
+ :param namespace: the namespace to run within kubernetes
+ :param body: json object with parameters for update
+ """
+ return self.batch_v1_client.patch_namespaced_job(
+ name=job_name,
+ namespace=namespace,
+ body=body,
+ )
+
def _get_bool(val) -> bool | None:
"""Convert val to bool if can be done with certainty; if we cannot infer
intention we return None."""
diff --git a/airflow/providers/cncf/kubernetes/operators/job.py
b/airflow/providers/cncf/kubernetes/operators/job.py
index a3f36c7166..d487a789e2 100644
--- a/airflow/providers/cncf/kubernetes/operators/job.py
+++ b/airflow/providers/cncf/kubernetes/operators/job.py
@@ -38,6 +38,7 @@ from
airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator,
merge_objects
from airflow.utils import yaml
+from airflow.utils.context import Context
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -377,3 +378,71 @@ class KubernetesDeleteJobOperator(BaseOperator):
self.log.info("The Kubernetes job %s does not exist.",
self.name)
else:
raise e
+
+
+class KubernetesPatchJobOperator(BaseOperator):
+ """
+ Update a Kubernetes Job.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:KubernetesPatchJobOperator`
+
+ :param name: name of the Job
+ :param namespace: the namespace to run within kubernetes
+ :param body: Job json object with parameters for update
+
https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#job-v1-batch
+ e.g. ``{"spec": {"suspend": True}}``
+ :param kubernetes_conn_id: The :ref:`kubernetes connection id
<howto/connection:kubernetes>`
+ for the Kubernetes cluster.
+ :param config_file: The path to the Kubernetes config file. (templated)
+ If not specified, default value is ``~/.kube/config``
+ :param in_cluster: run kubernetes client with in_cluster configuration.
+ :param cluster_context: context that points to kubernetes cluster.
+ Ignored when in_cluster is True. If None, current-context is used.
(templated)
+ """
+
+ template_fields: Sequence[str] = (
+ "config_file",
+ "namespace",
+ "body",
+ "cluster_context",
+ )
+
+ def __init__(
+ self,
+ *,
+ name: str,
+ namespace: str,
+ body: object,
+ kubernetes_conn_id: str | None = KubernetesHook.default_conn_name,
+ config_file: str | None = None,
+ in_cluster: bool | None = None,
+ cluster_context: str | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.name = name
+ self.namespace = namespace
+ self.body = body
+ self.kubernetes_conn_id = kubernetes_conn_id
+ self.config_file = config_file
+ self.in_cluster = in_cluster
+ self.cluster_context = cluster_context
+
+ @cached_property
+ def hook(self) -> KubernetesHook:
+ return KubernetesHook(
+ conn_id=self.kubernetes_conn_id,
+ in_cluster=self.in_cluster,
+ config_file=self.config_file,
+ cluster_context=self.cluster_context,
+ )
+
+ def execute(self, context: Context) -> dict:
+ self.log.info("Updating existing Job: %s", self.name)
+ job_object = self.hook.patch_namespaced_job(
+ job_name=self.name, namespace=self.namespace, body=self.body
+ )
+ self.log.info("Job was updated.")
+ return k8s.V1Job.to_dict(job_object)
diff --git a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
index 4307aad95d..3fc7008549 100644
--- a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
+++ b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
@@ -643,3 +643,18 @@ you to delete Jobs on a Kubernetes cluster.
:dedent: 4
:start-after: [START howto_operator_delete_k8s_job]
:end-before: [END howto_operator_delete_k8s_job]
+
+
+.. _howto/operator:KubernetesPatchJobOperator:
+
+KubernetesPatchJobOperator
+==========================
+
+The
:class:`~airflow.providers.cncf.kubernetes.operators.job.KubernetesPatchJobOperator`
allows
+you to update Jobs on a Kubernetes cluster.
+
+.. exampleinclude::
/../../tests/system/providers/cncf/kubernetes/example_kubernetes_job.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_update_job]
+ :end-before: [END howto_operator_update_job]
diff --git a/tests/providers/cncf/kubernetes/operators/test_job.py
b/tests/providers/cncf/kubernetes/operators/test_job.py
index cd9f3835af..a5742abbbb 100644
--- a/tests/providers/cncf/kubernetes/operators/test_job.py
+++ b/tests/providers/cncf/kubernetes/operators/test_job.py
@@ -26,7 +26,11 @@ from kubernetes.client import ApiClient, models as k8s
from airflow.exceptions import AirflowException
from airflow.models import DAG, DagModel, DagRun, TaskInstance
-from airflow.providers.cncf.kubernetes.operators.job import
KubernetesDeleteJobOperator, KubernetesJobOperator
+from airflow.providers.cncf.kubernetes.operators.job import (
+ KubernetesDeleteJobOperator,
+ KubernetesJobOperator,
+ KubernetesPatchJobOperator,
+)
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.types import DagRunType
@@ -545,3 +549,30 @@ class TestKubernetesDeleteJobOperator:
op.execute(None)
mock_delete_namespaced_job.assert_called()
+
+
[email protected]_timeout(300)
+class TestKubernetesPatchJobOperator:
+ @pytest.fixture(autouse=True)
+ def setup_tests(self):
+ self._default_client_patch = patch(f"{HOOK_CLASS}._get_default_client")
+ self._default_client_mock = self._default_client_patch.start()
+
+ yield
+
+ patch.stopall()
+
+ @patch("kubernetes.config.load_kube_config")
+ @patch("kubernetes.client.api.BatchV1Api.patch_namespaced_job")
+ def test_update_execute(self, mock_patch_namespaced_job,
mock_load_kube_config):
+ op = KubernetesPatchJobOperator(
+ kubernetes_conn_id="kubernetes_default",
+ task_id="test_update_job",
+ name="test_job_name",
+ namespace="test_job_namespace",
+ body={"spec": {"suspend": False}},
+ )
+
+ op.execute(None)
+
+ mock_patch_namespaced_job.assert_called()
diff --git a/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py
b/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py
index ab8fcb393d..a5c97d9c95 100644
--- a/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py
+++ b/tests/system/providers/cncf/kubernetes/example_kubernetes_job.py
@@ -25,7 +25,11 @@ import os
from datetime import datetime
from airflow import DAG
-from airflow.providers.cncf.kubernetes.operators.job import
KubernetesDeleteJobOperator, KubernetesJobOperator
+from airflow.providers.cncf.kubernetes.operators.job import (
+ KubernetesDeleteJobOperator,
+ KubernetesJobOperator,
+ KubernetesPatchJobOperator,
+)
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_kubernetes_job_operator"
@@ -49,6 +53,15 @@ with DAG(
)
# [END howto_operator_k8s_job]
+ # [START howto_operator_update_job]
+ update_job = KubernetesPatchJobOperator(
+ task_id="update-job-task",
+ namespace="default",
+ name="test-pi",
+ body={"spec": {"suspend": False}},
+ )
+ # [END howto_operator_update_job]
+
# [START howto_operator_delete_k8s_job]
delete_job_task = KubernetesDeleteJobOperator(
task_id="delete_job_task",
@@ -57,7 +70,7 @@ with DAG(
)
# [END howto_operator_delete_k8s_job]
- k8s_job >> delete_job_task
+ k8s_job >> update_job >> delete_job_task
from tests.system.utils.watcher import watcher