jedcunningham commented on code in PR #29930: URL: https://github.com/apache/airflow/pull/29930#discussion_r1163261287
########## airflow/providers/cncf/kubernetes/operators/resource.py: ########## @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Manage a Kubernetes Resource""" + +from __future__ import annotations + +from kubernetes.client import ApiClient +from kubernetes.utils import create_from_yaml + +from airflow.compat.functools import cached_property +from airflow.models import BaseOperator +from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook +from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_dict +from airflow.utils import yaml + + +class KubernetesResourceBaseOperator(BaseOperator): + """Abstract base class for all Kubernetes Resource operators.""" + + template_fields = ("yaml_conf",) + template_fields_renderers = {"yaml_conf": "yaml"} + + def __init__( + self, + *, + yaml_conf: str, + namespace: str | None = None, + kubernetes_conn_id: str | None = "kubernetes_default", + in_cluster: bool | None = None, + cluster_context: str | None = None, + config_file: str | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self._namespace = namespace + self.kubernetes_conn_id = kubernetes_conn_id + self.in_cluster = in_cluster + self.cluster_context = cluster_context + self.config_file = config_file + self.yaml_conf = yaml_conf + + @cached_property + def client(self) -> ApiClient: + return self.hook.api_client + + @cached_property + def hook(self) -> KubernetesHook: + hook = KubernetesHook( + conn_id=self.kubernetes_conn_id, + in_cluster=self.in_cluster, + config_file=self.config_file, + cluster_context=self.cluster_context, + ) + return hook + + def get_namespace(self) -> str: + if self._namespace: + return self._namespace + else: + tmp = self.hook.get_namespace() + if tmp: + return tmp + else: + return "default" Review Comment: ```suggestion return self.hook.get_namespace() or "default" ``` Simpler. ########## airflow/providers/cncf/kubernetes/operators/resource.py: ########## @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Manage a Kubernetes Resource""" + +from __future__ import annotations + +from kubernetes.client import ApiClient +from kubernetes.utils import create_from_yaml + +from airflow.compat.functools import cached_property +from airflow.models import BaseOperator +from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook +from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_dict +from airflow.utils import yaml + + +class KubernetesResourceBaseOperator(BaseOperator): + """Abstract base class for all Kubernetes Resource operators.""" + + template_fields = ("yaml_conf",) + template_fields_renderers = {"yaml_conf": "yaml"} + + def __init__( + self, + *, + yaml_conf: str, + namespace: str | None = None, + kubernetes_conn_id: str | None = "kubernetes_default", + in_cluster: bool | None = None, + cluster_context: str | None = None, + config_file: str | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self._namespace = namespace + self.kubernetes_conn_id = kubernetes_conn_id + self.in_cluster = in_cluster + self.cluster_context = cluster_context + self.config_file = config_file + self.yaml_conf = yaml_conf + + @cached_property + def client(self) -> ApiClient: + return self.hook.api_client + + @cached_property + def hook(self) -> KubernetesHook: + hook = KubernetesHook( + conn_id=self.kubernetes_conn_id, + in_cluster=self.in_cluster, + config_file=self.config_file, + cluster_context=self.cluster_context, + ) + return hook + + def get_namespace(self) -> str: + if self._namespace: + return self._namespace + else: + tmp = self.hook.get_namespace() + if tmp: + return tmp + else: + return "default" + + +class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator): + """Create a resource in a kubernetes.""" + + def execute(self, context) -> None: + print(self.yaml_conf) + create_from_yaml( + k8s_client=self.client, + yaml_objects=[yaml.safe_load(self.yaml_conf)], + namespace=self.get_namespace(), + ) + + +class KubernetesDeleteResourceOperator(KubernetesResourceBaseOperator): + """Delete a resource in a kubernetes.""" + + def execute(self, context) -> None: + print(self.yaml_conf) Review Comment: ```suggestion ``` ########## airflow/providers/cncf/kubernetes/operators/resource.py: ########## @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Manage a Kubernetes Resource""" + +from __future__ import annotations + +from kubernetes.client import ApiClient +from kubernetes.utils import create_from_yaml + +from airflow.compat.functools import cached_property +from airflow.models import BaseOperator +from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook +from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_dict +from airflow.utils import yaml + + +class KubernetesResourceBaseOperator(BaseOperator): + """Abstract base class for all Kubernetes Resource operators.""" + + template_fields = ("yaml_conf",) + template_fields_renderers = {"yaml_conf": "yaml"} + + def __init__( + self, + *, + yaml_conf: str, + namespace: str | None = None, + kubernetes_conn_id: str | None = "kubernetes_default", + in_cluster: bool | None = None, + cluster_context: str | None = None, + config_file: str | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self._namespace = namespace + self.kubernetes_conn_id = kubernetes_conn_id + self.in_cluster = in_cluster + self.cluster_context = cluster_context + self.config_file = config_file + self.yaml_conf = yaml_conf + + @cached_property + def client(self) -> ApiClient: + return self.hook.api_client + + @cached_property + def hook(self) -> KubernetesHook: + hook = KubernetesHook( + conn_id=self.kubernetes_conn_id, + in_cluster=self.in_cluster, + config_file=self.config_file, + cluster_context=self.cluster_context, + ) + return hook + + def get_namespace(self) -> str: + if self._namespace: + return self._namespace + else: + tmp = self.hook.get_namespace() + if tmp: + return tmp + else: + return "default" + + +class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator): + """Create a resource in a kubernetes.""" + + def execute(self, context) -> None: + print(self.yaml_conf) Review Comment: ```suggestion ``` Left over debugging? ########## tests/system/providers/cncf/kubernetes/example_kubernetes_resource.py: ########## @@ -0,0 +1,97 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +This is an example DAG which uses KubernetesCreateResourceOperator and KubernetesDeleteResourceOperator. +In this example, we create two tasks which execute sequentially. +The first task is to create a PVC on Kubernetes cluster. +and the second task is to delete the PVC. +""" +from __future__ import annotations + +import os +from datetime import datetime, timedelta + +# [START import_module] +# The DAG object; we'll need this to instantiate a DAG +from airflow import DAG + +# Operators; we need this to operate! +from airflow.providers.cncf.kubernetes.operators.resource import ( + KubernetesCreateResourceOperator, + KubernetesDeleteResourceOperator, +) + +# [END import_module] + + +# [START instantiate_dag] + +pvc_name = "toto" + +pvc_conf = f""" +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: {pvc_name} +spec: + accessModes: + - ReadWriteOnce + storageClassName: standard-rwo + resources: + requests: + storage: 500Gi Review Comment: I feel like we should use something way smaller? ########## tests/system/providers/cncf/kubernetes/example_kubernetes_resource.py: ########## @@ -0,0 +1,97 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +This is an example DAG which uses KubernetesCreateResourceOperator and KubernetesDeleteResourceOperator. +In this example, we create two tasks which execute sequentially. +The first task is to create a PVC on Kubernetes cluster. +and the second task is to delete the PVC. +""" +from __future__ import annotations + +import os +from datetime import datetime, timedelta + +# [START import_module] +# The DAG object; we'll need this to instantiate a DAG +from airflow import DAG + +# Operators; we need this to operate! +from airflow.providers.cncf.kubernetes.operators.resource import ( + KubernetesCreateResourceOperator, + KubernetesDeleteResourceOperator, +) + +# [END import_module] + + +# [START instantiate_dag] + +pvc_name = "toto" + +pvc_conf = f""" +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: {pvc_name} +spec: + accessModes: + - ReadWriteOnce + storageClassName: standard-rwo Review Comment: Might be more portable to just use the default storage class? ########## airflow/providers/cncf/kubernetes/utils/delete_from.py: ########## @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# from https://github.com/tomplus/kubernetes_asyncio/pull/239/files + +from __future__ import annotations + +import re + +from kubernetes import client + +DEFAULT_DELETION_BODY = client.V1DeleteOptions( + propagation_policy="Background", + grace_period_seconds=5, +) + + +def delete_from_dict( Review Comment: ```suggestion def delete_from_dict( *, ``` Let's do kwargs only from the start. Can you add some typing here too? ########## airflow/providers/cncf/kubernetes/utils/delete_from.py: ########## @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# from https://github.com/tomplus/kubernetes_asyncio/pull/239/files + +from __future__ import annotations + +import re + +from kubernetes import client + +DEFAULT_DELETION_BODY = client.V1DeleteOptions( + propagation_policy="Background", + grace_period_seconds=5, +) + + +def delete_from_dict( + k8s_client, + yml_document, + verbose=False, + namespace="default", + body=None, + **kwargs, +): + + if body is None: + body = DEFAULT_DELETION_BODY + + api_exceptions = [] + if "List" in yml_document["kind"]: + kind = yml_document["kind"].replace("List", "") + for yml_doc in yml_document["items"]: + if kind != "": + yml_doc["apiVersion"] = yml_document["apiVersion"] + yml_doc["kind"] = kind + try: + _delete_from_yaml_single_item( + k8s_client, + yml_doc, + verbose, + namespace=namespace, + body=body, + **kwargs, + ) + except client.rest.ApiException as api_exception: + api_exceptions.append(api_exception) + else: + + try: + _delete_from_yaml_single_item( + k8s_client, + yml_document, + verbose, + namespace=namespace, + body=body, + **kwargs, + ) + except client.rest.ApiException as api_exception: + api_exceptions.append(api_exception) + + if api_exceptions: + raise FailToDeleteError(api_exceptions) + + +def _delete_from_yaml_single_item( + k8s_client, + yml_document, + verbose=False, + namespace="default", + body=None, + **kwargs, +): + if body is None: + body = DEFAULT_DELETION_BODY + + # get group and version from apiVersion + group, _, version = yml_document["apiVersion"].partition("/") + if version == "": + version = group + group = "core" + # Take care for the case e.g. api_type is "apiextensions.k8s.io" + # Only replace the last instance + group = "".join(group.rsplit(".k8s.io", 1)) + # convert group name from DNS subdomain format to + # python class name convention + group = "".join(word.capitalize() for word in group.split(".")) + fcn_to_call = f"{group}{version.capitalize()}Api" + k8s_api = getattr(client, fcn_to_call)(k8s_client) + # Replace CamelCased action_type into snake_case + kind = yml_document["kind"] + kind = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", kind) + kind = re.sub("([a-z0-9])([A-Z])", r"\1_\2", kind).lower() + + # Decide which namespace we are going to use for deleting the object + # IMPORTANT: Its ignore namespace in args: Review Comment: ```suggestion # IMPORTANT: the docs namespace takes precedence over the namespace in args ``` ########## airflow/providers/cncf/kubernetes/utils/delete_from.py: ########## Review Comment: I feel like we are lacking some test coverage on this path. ########## airflow/providers/cncf/kubernetes/utils/delete_from.py: ########## @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# from https://github.com/tomplus/kubernetes_asyncio/pull/239/files + +from __future__ import annotations + +import re + +from kubernetes import client + +DEFAULT_DELETION_BODY = client.V1DeleteOptions( + propagation_policy="Background", + grace_period_seconds=5, +) + + +def delete_from_dict( + k8s_client, + yml_document, + verbose=False, + namespace="default", + body=None, + **kwargs, +): + + if body is None: + body = DEFAULT_DELETION_BODY + + api_exceptions = [] + if "List" in yml_document["kind"]: + kind = yml_document["kind"].replace("List", "") + for yml_doc in yml_document["items"]: + if kind != "": + yml_doc["apiVersion"] = yml_document["apiVersion"] + yml_doc["kind"] = kind + try: + _delete_from_yaml_single_item( + k8s_client, + yml_doc, + verbose, + namespace=namespace, + body=body, + **kwargs, + ) + except client.rest.ApiException as api_exception: + api_exceptions.append(api_exception) + else: + + try: + _delete_from_yaml_single_item( + k8s_client, + yml_document, + verbose, + namespace=namespace, + body=body, + **kwargs, + ) + except client.rest.ApiException as api_exception: + api_exceptions.append(api_exception) + + if api_exceptions: + raise FailToDeleteError(api_exceptions) + + +def _delete_from_yaml_single_item( + k8s_client, + yml_document, + verbose=False, + namespace="default", + body=None, + **kwargs, +): + if body is None: + body = DEFAULT_DELETION_BODY + + # get group and version from apiVersion + group, _, version = yml_document["apiVersion"].partition("/") + if version == "": + version = group + group = "core" + # Take care for the case e.g. api_type is "apiextensions.k8s.io" + # Only replace the last instance + group = "".join(group.rsplit(".k8s.io", 1)) + # convert group name from DNS subdomain format to + # python class name convention + group = "".join(word.capitalize() for word in group.split(".")) + fcn_to_call = f"{group}{version.capitalize()}Api" + k8s_api = getattr(client, fcn_to_call)(k8s_client) + # Replace CamelCased action_type into snake_case + kind = yml_document["kind"] + kind = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", kind) + kind = re.sub("([a-z0-9])([A-Z])", r"\1_\2", kind).lower() + + # Decide which namespace we are going to use for deleting the object + # IMPORTANT: Its ignore namespace in args: + # create_from_yaml_single_item have same behaviour + if "namespace" in yml_document["metadata"]: + namespace = yml_document["metadata"]["namespace"] + name = yml_document["metadata"]["name"] + + # Expect the user to delete namespaced objects more often + if hasattr(k8s_api, f"delete_namespaced_{kind}"): + resp: client.V1Status = getattr(k8s_api, f"delete_namespaced_{kind}")( + name=name, namespace=namespace, body=body, **kwargs + ) + else: + resp: client.V1Status = getattr(k8s_api, f"delete_{kind}")(name=name, body=body, **kwargs) + if verbose: + print(f"{kind} deleted. status='{str(resp.status)}'") + return resp + + +class FailToDeleteError(Exception): + """ + An exception class for handling error if an error occurred when + handling a yaml file during deletion of the resource. + """ + + def __init__(self, api_exceptions): + self.api_exceptions = api_exceptions + + def __str__(self): + msg = "" + for api_exception in self.api_exceptions: + msg += f"Error from server ({api_exception.reason}):{api_exception.body}" Review Comment: Won't these all run together? ########## airflow/providers/cncf/kubernetes/operators/resource.py: ########## @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Manage a Kubernetes Resource""" + +from __future__ import annotations + +from kubernetes.client import ApiClient +from kubernetes.utils import create_from_yaml + +from airflow.compat.functools import cached_property +from airflow.models import BaseOperator +from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook +from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_dict +from airflow.utils import yaml + + +class KubernetesResourceBaseOperator(BaseOperator): + """Abstract base class for all Kubernetes Resource operators.""" + + template_fields = ("yaml_conf",) + template_fields_renderers = {"yaml_conf": "yaml"} + + def __init__( + self, + *, + yaml_conf: str, + namespace: str | None = None, + kubernetes_conn_id: str | None = "kubernetes_default", + in_cluster: bool | None = None, + cluster_context: str | None = None, + config_file: str | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self._namespace = namespace Review Comment: Is there a reason this one is private, but none of the others are? ########## airflow/providers/cncf/kubernetes/utils/delete_from.py: ########## @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# from https://github.com/tomplus/kubernetes_asyncio/pull/239/files + +from __future__ import annotations + +import re + +from kubernetes import client + +DEFAULT_DELETION_BODY = client.V1DeleteOptions( + propagation_policy="Background", + grace_period_seconds=5, +) + + +def delete_from_dict( + k8s_client, + yml_document, + verbose=False, + namespace="default", + body=None, + **kwargs, +): + + if body is None: + body = DEFAULT_DELETION_BODY + + api_exceptions = [] + if "List" in yml_document["kind"]: + kind = yml_document["kind"].replace("List", "") + for yml_doc in yml_document["items"]: + if kind != "": + yml_doc["apiVersion"] = yml_document["apiVersion"] + yml_doc["kind"] = kind + try: + _delete_from_yaml_single_item( + k8s_client, + yml_doc, + verbose, + namespace=namespace, + body=body, + **kwargs, + ) + except client.rest.ApiException as api_exception: + api_exceptions.append(api_exception) + else: + + try: + _delete_from_yaml_single_item( + k8s_client, + yml_document, + verbose, + namespace=namespace, + body=body, + **kwargs, + ) + except client.rest.ApiException as api_exception: + api_exceptions.append(api_exception) + + if api_exceptions: + raise FailToDeleteError(api_exceptions) + + +def _delete_from_yaml_single_item( Review Comment: Might as well do the same here re kwarsgs only and typing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
