This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8dc1b23116 feat: K8S resource operator - CRD (#35600)
8dc1b23116 is described below
commit 8dc1b2311626146894b09e51f3693de376e5ad87
Author: raphaelauv <[email protected]>
AuthorDate: Wed Nov 22 11:23:46 2023 +0100
feat: K8S resource operator - CRD (#35600)
* feat: K8S resource operator - CRD
* clean
* tests
* remove sensor ( for another PR )
* clean
* test on k8s_resource_iterator
---
.../cncf/kubernetes/operators/resource.py | 62 +++++++++++++++++----
.../cncf/kubernetes/utils/k8s_resource_iterator.py | 46 +++++++++++++++
.../cncf/kubernetes/operators/test_resource.py | 63 +++++++++++++++++++++
.../kubernetes/utils/test_k8s_resource_iterator.py | 65 ++++++++++++++++++++++
4 files changed, 225 insertions(+), 11 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/resource.py
b/airflow/providers/cncf/kubernetes/operators/resource.py
index 598731b639..569b5861a6 100644
--- a/airflow/providers/cncf/kubernetes/operators/resource.py
+++ b/airflow/providers/cncf/kubernetes/operators/resource.py
@@ -27,9 +27,10 @@ from kubernetes.utils import create_from_yaml
from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.providers.cncf.kubernetes.utils.delete_from import
delete_from_yaml
+from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import
k8s_resource_iterator
if TYPE_CHECKING:
- from kubernetes.client import ApiClient
+ from kubernetes.client import ApiClient, CustomObjectsApi
__all__ = ["KubernetesCreateResourceOperator",
"KubernetesDeleteResourceOperator"]
@@ -56,17 +57,23 @@ class KubernetesResourceBaseOperator(BaseOperator):
yaml_conf: str,
namespace: str | None = None,
kubernetes_conn_id: str | None = KubernetesHook.default_conn_name,
+ custom_resource_definition: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
self._namespace = namespace
self.kubernetes_conn_id = kubernetes_conn_id
self.yaml_conf = yaml_conf
+ self.custom_resource_definition = custom_resource_definition
@cached_property
def client(self) -> ApiClient:
return self.hook.api_client
+ @cached_property
+ def custom_object_client(self) -> CustomObjectsApi:
+ return self.hook.custom_object_client
+
@cached_property
def hook(self) -> KubernetesHook:
hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
@@ -78,24 +85,57 @@ class KubernetesResourceBaseOperator(BaseOperator):
else:
return self.hook.get_namespace() or "default"
+ def get_crd_fields(self, body: dict) -> tuple[str, str, str, str]:
+ api_version = body["apiVersion"]
+ group = api_version[0 : api_version.find("/")]
+ version = api_version[api_version.find("/") + 1 :]
+
+ namespace = None
+ if body.get("metadata"):
+ metadata: dict = body.get("metadata", None)
+ namespace = metadata.get("namespace", None)
+ if namespace is None:
+ namespace = self.get_namespace()
+
+ plural = body["kind"].lower() + "s"
+
+ return group, version, namespace, plural
+
class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator):
"""Create a resource in a kubernetes."""
+ def create_custom_from_yaml_object(self, body: dict):
+ group, version, namespace, plural = self.get_crd_fields(body)
+ self.custom_object_client.create_namespaced_custom_object(group,
version, namespace, plural, body)
+
def execute(self, context) -> None:
- create_from_yaml(
- k8s_client=self.client,
- yaml_objects=yaml.safe_load_all(self.yaml_conf),
- namespace=self.get_namespace(),
- )
+ resources = yaml.safe_load_all(self.yaml_conf)
+ if not self.custom_resource_definition:
+ create_from_yaml(
+ k8s_client=self.client,
+ yaml_objects=resources,
+ namespace=self.get_namespace(),
+ )
+ else:
+ k8s_resource_iterator(self.create_custom_from_yaml_object,
resources)
class KubernetesDeleteResourceOperator(KubernetesResourceBaseOperator):
"""Delete a resource in a kubernetes."""
+ def delete_custom_from_yaml_object(self, body: dict):
+ name = body["metadata"]["name"]
+ group, version, namespace, plural = self.get_crd_fields(body)
+ self.custom_object_client.delete_namespaced_custom_object(group,
version, namespace, plural, name)
+
def execute(self, context) -> None:
- delete_from_yaml(
- k8s_client=self.client,
- yaml_objects=yaml.safe_load_all(self.yaml_conf),
- namespace=self.get_namespace(),
- )
+ resources = yaml.safe_load_all(self.yaml_conf)
+ if not self.custom_resource_definition:
+ delete_from_yaml(
+ k8s_client=self.client,
+ yaml_objects=resources,
+ namespace=self.get_namespace(),
+ )
+ else:
+ k8s_resource_iterator(self.delete_custom_from_yaml_object,
resources)
diff --git a/airflow/providers/cncf/kubernetes/utils/k8s_resource_iterator.py
b/airflow/providers/cncf/kubernetes/utils/k8s_resource_iterator.py
new file mode 100644
index 0000000000..bfa1d05272
--- /dev/null
+++ b/airflow/providers/cncf/kubernetes/utils/k8s_resource_iterator.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Callable, Iterator
+
+from kubernetes.utils import FailToCreateError
+
+from airflow.providers.cncf.kubernetes.utils.delete_from import
FailToDeleteError
+
+
+def k8s_resource_iterator(callback: Callable[[dict], None], resources:
Iterator) -> None:
+ failures: list = []
+ for data in resources:
+ if data is not None:
+ if "List" in data["kind"]:
+ kind = data["kind"].replace("List", "")
+ for yml_doc in data["items"]:
+ if kind != "":
+ yml_doc["apiVersion"] = data["apiVersion"]
+ yml_doc["kind"] = kind
+ try:
+ callback(yml_doc)
+ except (FailToCreateError, FailToDeleteError) as failure:
+ failures.extend(failure.api_exceptions)
+ else:
+ try:
+ callback(data)
+ except (FailToCreateError, FailToDeleteError) as failure:
+ failures.extend(failure.api_exceptions)
+ if failures:
+ raise FailToCreateError(failures)
diff --git a/tests/providers/cncf/kubernetes/operators/test_resource.py
b/tests/providers/cncf/kubernetes/operators/test_resource.py
index a565e84fc1..9673c6e082 100644
--- a/tests/providers/cncf/kubernetes/operators/test_resource.py
+++ b/tests/providers/cncf/kubernetes/operators/test_resource.py
@@ -56,6 +56,16 @@ items:
name: test_pvc_2
"""
+TEST_VALID_CRD_YAML = """
+apiVersion: ray.io/v1
+kind: RayJob
+metadata:
+ name: rayjob-sample
+spec:
+ entrypoint: python /home/ray/program/job.py
+ shutdownAfterJobFinishes: true
+"""
+
HOOK_CLASS =
"airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook"
@@ -89,6 +99,19 @@ class TestKubernetesXResourceOperator:
body=yaml.safe_load(TEST_VALID_RESOURCE_YAML), namespace="default"
)
+
@patch("kubernetes.client.api.CoreV1Api.create_namespaced_persistent_volume_claim")
+ def test_create_application_from_yaml_list(self,
mock_create_namespaced_persistent_volume_claim, context):
+ op = KubernetesCreateResourceOperator(
+ yaml_conf=TEST_VALID_LIST_RESOURCE_YAML,
+ dag=self.dag,
+ kubernetes_conn_id="kubernetes_default",
+ task_id="test_task_id",
+ )
+
+ op.execute(context)
+
+ assert mock_create_namespaced_persistent_volume_claim.call_count == 2
+
@patch("kubernetes.client.api.CoreV1Api.delete_namespaced_persistent_volume_claim")
def test_single_delete_application_from_yaml(
self, mock_delete_namespaced_persistent_volume_claim, context
@@ -118,3 +141,43 @@ class TestKubernetesXResourceOperator:
op.execute(context)
mock_delete_namespaced_persistent_volume_claim.assert_called()
+
+
@patch("kubernetes.client.api.CustomObjectsApi.create_namespaced_custom_object")
+ def test_create_custom_application_from_yaml(self,
mock_create_namespaced_custom_object, context):
+ op = KubernetesCreateResourceOperator(
+ yaml_conf=TEST_VALID_CRD_YAML,
+ dag=self.dag,
+ kubernetes_conn_id="kubernetes_default",
+ task_id="test_task_id",
+ custom_resource_definition=True,
+ )
+
+ op.execute(context)
+
+ mock_create_namespaced_custom_object.assert_called_once_with(
+ "ray.io",
+ "v1",
+ "default",
+ "rayjobs",
+ yaml.safe_load(TEST_VALID_CRD_YAML),
+ )
+
+
@patch("kubernetes.client.api.CustomObjectsApi.delete_namespaced_custom_object")
+ def test_delete_custom_application_from_yaml(self,
mock_delete_namespaced_custom_object, context):
+ op = KubernetesDeleteResourceOperator(
+ yaml_conf=TEST_VALID_CRD_YAML,
+ dag=self.dag,
+ kubernetes_conn_id="kubernetes_default",
+ task_id="test_task_id",
+ custom_resource_definition=True,
+ )
+
+ op.execute(context)
+
+ mock_delete_namespaced_custom_object.assert_called_once_with(
+ "ray.io",
+ "v1",
+ "default",
+ "rayjobs",
+ "rayjob-sample",
+ )
diff --git
a/tests/providers/cncf/kubernetes/utils/test_k8s_resource_iterator.py
b/tests/providers/cncf/kubernetes/utils/test_k8s_resource_iterator.py
new file mode 100644
index 0000000000..8a2e4f0539
--- /dev/null
+++ b/tests/providers/cncf/kubernetes/utils/test_k8s_resource_iterator.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from collections import namedtuple
+
+import pytest
+import yaml
+from kubernetes.utils import FailToCreateError
+
+from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import
k8s_resource_iterator
+
+TEST_VALID_LIST_RESOURCE_YAML = """
+apiVersion: v1
+kind: List
+items:
+- apiVersion: v1
+ kind: PersistentVolumeClaim
+ metadata:
+ name: test_pvc_1
+- apiVersion: v1
+ kind: PersistentVolumeClaim
+ metadata:
+ name: test_pvc_2
+"""
+
+
+def test_k8s_resource_iterator():
+ exception_k8s = namedtuple("Exception_k8s", "reason body")
+
+ def test_callback_failing(yml_doc: dict) -> None:
+ raise FailToCreateError(exception_k8s("the_reason", "the_body "))
+
+ with pytest.raises(FailToCreateError) as exc_info:
+ k8s_resource_iterator(
+ test_callback_failing,
resources=yaml.safe_load_all(TEST_VALID_LIST_RESOURCE_YAML)
+ )
+
+ assert (
+ str(exc_info.value)
+ == "Error from server (the_reason): the_body Error from server
(the_reason): the_body "
+ )
+
+ def callback_success(yml_doc: dict) -> None:
+ return
+
+ try:
+ k8s_resource_iterator(callback_success,
resources=yaml.safe_load_all(TEST_VALID_LIST_RESOURCE_YAML))
+
+ except FailToCreateError:
+ assert False