ephraimbuddy commented on code in PR #22253: URL: https://github.com/apache/airflow/pull/22253#discussion_r1020891590
########## airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py: ########## @@ -0,0 +1,333 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Launches Custom object""" +import sys +import time +from copy import deepcopy +from datetime import datetime as dt +from typing import Optional + +import tenacity +from kubernetes import client +from kubernetes.client import models as k8s +from kubernetes.client.rest import ApiException + +from airflow.exceptions import AirflowException +from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager +from airflow.utils.log.logging_mixin import LoggingMixin + +from airflow.providers.cncf.kubernetes.resource_convert.secret import convert_secret, convert_image_pull_secrets +from airflow.providers.cncf.kubernetes.resource_convert.configmap import convert_configmap, convert_configmap_to_volume +from airflow.providers.cncf.kubernetes.resource_convert.env_variable import convert_env_vars + +if sys.version_info >= (3, 8): + from functools import cached_property +else: + from cached_property import cached_property Review Comment: ```suggestion from airflow.compat.functools import cached_property ``` ########## airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py: ########## @@ -0,0 +1,333 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Launches Custom object""" +import sys +import time +from copy import deepcopy +from datetime import datetime as dt +from typing import Optional + +import tenacity +from kubernetes import client +from kubernetes.client import models as k8s +from kubernetes.client.rest import ApiException + +from airflow.exceptions import AirflowException +from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager +from airflow.utils.log.logging_mixin import LoggingMixin + +from airflow.providers.cncf.kubernetes.resource_convert.secret import convert_secret, convert_image_pull_secrets +from airflow.providers.cncf.kubernetes.resource_convert.configmap import convert_configmap, convert_configmap_to_volume +from airflow.providers.cncf.kubernetes.resource_convert.env_variable import convert_env_vars + +if sys.version_info >= (3, 8): + from functools import cached_property +else: + from cached_property import cached_property + + +def should_retry_start_spark_job(exception: BaseException) -> bool: + """Check if an Exception indicates a transient error and warrants retrying""" + if isinstance(exception, ApiException): + return exception.status == 409 + return False + + +class SparkJobSpec(): + def __init__(self, **entries): + self.__dict__.update(entries) + self.validate() + self.update_resources() + + def validate(self): + if self.spec['dynamicAllocation']['enabled']: + if not all( + [self.spec['dynamicAllocation']['initialExecutors'], self.spec['dynamicAllocation']['minExecutors'], self.spec['dynamicAllocation']['maxExecutors']] + ): + raise AirflowException("Make sure initial/min/max value for dynamic allocation is passed") + + def update_resources(self): + spark_resources = SparkResources(self.spec['driver'].pop('container_resources'), self.spec['executor'].pop('container_resources')) + self.spec['driver'].update(spark_resources.resources['driver']) + self.spec['executor'].update(spark_resources.resources['executor']) + + +class KubernetesSpec(): + def __init__(self, **entries): + self.__dict__.update(entries) + self.set_attribute() + + def set_attribute(self): + self.env_vars = convert_env_vars(self.env_vars) if self.env_vars else [] + self.image_pull_secrets = convert_image_pull_secrets(image_pull_secrets) if self.image_pull_secrets else [] + if self.config_map_mounts: + vols, vols_mounts = convert_configmap_to_volume(self.config_map_mounts) + self.volumes.extend(vols) + self.volume_mounts.extend(vols_mounts) + if self.from_env_config_map: + self.env_from.extend([convert_configmap(c_name) for c_name in self.from_env_config_map]) + if self.from_env_secret: + self.env_from.extend([convert_secret(c) for c in self.from_env_secret]) + + +class SparkResources: + """spark resources""" + + def __init__( + self, + driver=None, + executor=None Review Comment: We need typing for this and probably more description of what driver/executor should contain in the Doc string of this class ########## airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py: ########## @@ -0,0 +1,333 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Launches Custom object""" +import sys +import time +from copy import deepcopy +from datetime import datetime as dt +from typing import Optional + +import tenacity +from kubernetes import client +from kubernetes.client import models as k8s +from kubernetes.client.rest import ApiException + +from airflow.exceptions import AirflowException +from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager +from airflow.utils.log.logging_mixin import LoggingMixin + +from airflow.providers.cncf.kubernetes.resource_convert.secret import convert_secret, convert_image_pull_secrets +from airflow.providers.cncf.kubernetes.resource_convert.configmap import convert_configmap, convert_configmap_to_volume +from airflow.providers.cncf.kubernetes.resource_convert.env_variable import convert_env_vars + +if sys.version_info >= (3, 8): + from functools import cached_property +else: + from cached_property import cached_property + + +def should_retry_start_spark_job(exception: BaseException) -> bool: + """Check if an Exception indicates a transient error and warrants retrying""" + if isinstance(exception, ApiException): + return exception.status == 409 + return False + + +class SparkJobSpec(): + def __init__(self, **entries): + self.__dict__.update(entries) + self.validate() + self.update_resources() + + def validate(self): + if self.spec['dynamicAllocation']['enabled']: + if not all( + [self.spec['dynamicAllocation']['initialExecutors'], self.spec['dynamicAllocation']['minExecutors'], self.spec['dynamicAllocation']['maxExecutors']] + ): + raise AirflowException("Make sure initial/min/max value for dynamic allocation is passed") + + def update_resources(self): + spark_resources = SparkResources(self.spec['driver'].pop('container_resources'), self.spec['executor'].pop('container_resources')) + self.spec['driver'].update(spark_resources.resources['driver']) + self.spec['executor'].update(spark_resources.resources['executor']) + + +class KubernetesSpec(): + def __init__(self, **entries): + self.__dict__.update(entries) + self.set_attribute() + + def set_attribute(self): + self.env_vars = convert_env_vars(self.env_vars) if self.env_vars else [] + self.image_pull_secrets = convert_image_pull_secrets(image_pull_secrets) if self.image_pull_secrets else [] + if self.config_map_mounts: + vols, vols_mounts = convert_configmap_to_volume(self.config_map_mounts) + self.volumes.extend(vols) + self.volume_mounts.extend(vols_mounts) + if self.from_env_config_map: + self.env_from.extend([convert_configmap(c_name) for c_name in self.from_env_config_map]) + if self.from_env_secret: + self.env_from.extend([convert_secret(c) for c in self.from_env_secret]) + + +class SparkResources: + """spark resources""" + + def __init__( + self, + driver=None, + executor=None + ): + self.default = { + 'gpu': {'name': None, 'quantity': 0}, + 'cpu': {'request': None, 'limit': None}, + 'memory': {'request': None, 'limit': None} + } + self.driver = deepcopy(self.default) + self.executor = deepcopy(self.default) + if driver: + self.driver.update(driver) + if executor: + self.executor.update(executor) + self.convert_resources() + + @property + def resources(self): + """Return job resources""" + return {'driver': self.driver_resources, 'executor': self.executor_resources} + + @property + def driver_resources(self): + """Return resources to use""" + driver = {} + if self.driver['cpu'].get('request'): + driver['cores'] = self.driver['cpu']['request'] + if self.driver['cpu'].get('limit'): + driver['coreLimit'] = self.driver['cpu']['limit'] + if self.driver['memory'].get('limit'): + driver['memory'] = self.driver['memory']['limit'] + if self.driver['gpu'].get('name') and self.driver['gpu'].get('quantity'): + driver['gpu'] = {'name': self.driver['gpu']['name'], 'quantity': self.driver['gpu']['quantity']} + return driver + + @property + def executor_resources(self): + """Return resources to use""" + executor = {} + if self.executor['cpu'].get('request'): + executor['cores'] = self.executor['cpu']['request'] + if self.executor['cpu'].get('limit'): + executor['coreLimit'] = self.executor['cpu']['limit'] + if self.executor['memory'].get('limit'): + executor['memory'] = self.executor['memory']['limit'] + if self.executor['gpu'].get('name') and self.executor['gpu'].get('quantity'): + executor['gpu'] = {'name': self.executor['gpu']['name'], 'quantity': self.executor['gpu']['quantity']} + return executor + + def convert_resources(self): + if isinstance(self.driver['memory'].get('limit'), str): + if 'G' in self.driver['memory']['limit'] or 'Gi' in self.driver['memory']['limit']: + self.driver['memory']['limit'] = float(self.driver['memory']['limit'].rstrip('Gi G')) * 1024 + elif 'm' in self.driver['memory']['limit']: Review Comment: Any possibility that user could use 'M' instead of 'm'? 'g' instead of 'G'. Should we provide some better experience in these case? ########## airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py: ########## @@ -16,11 +16,31 @@ # specific language governing permissions and limitations # under the License. from __future__ import annotations +import json +import re +import sys +from typing import Any, Optional -from typing import TYPE_CHECKING, Sequence +from typing import TYPE_CHECKING +import jinja2 +from kubernetes.client import CoreV1Api, CustomObjectsApi, models as k8s + +from airflow.exceptions import AirflowException +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import _suppress +from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook, _load_body_to_dict +from airflow.utils.helpers import prune_dict + +if sys.version_info >= (3, 8): + from functools import cached_property +else: + from cached_property import cached_property Review Comment: ```suggestion from airflow.compat.functools import cached_property ``` ########## airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py: ########## @@ -0,0 +1,333 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Launches Custom object""" +import sys +import time +from copy import deepcopy +from datetime import datetime as dt +from typing import Optional + +import tenacity +from kubernetes import client +from kubernetes.client import models as k8s +from kubernetes.client.rest import ApiException + +from airflow.exceptions import AirflowException +from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager +from airflow.utils.log.logging_mixin import LoggingMixin + +from airflow.providers.cncf.kubernetes.resource_convert.secret import convert_secret, convert_image_pull_secrets +from airflow.providers.cncf.kubernetes.resource_convert.configmap import convert_configmap, convert_configmap_to_volume +from airflow.providers.cncf.kubernetes.resource_convert.env_variable import convert_env_vars + +if sys.version_info >= (3, 8): + from functools import cached_property +else: + from cached_property import cached_property + + +def should_retry_start_spark_job(exception: BaseException) -> bool: + """Check if an Exception indicates a transient error and warrants retrying""" + if isinstance(exception, ApiException): + return exception.status == 409 + return False + + +class SparkJobSpec(): + def __init__(self, **entries): + self.__dict__.update(entries) + self.validate() + self.update_resources() + + def validate(self): + if self.spec['dynamicAllocation']['enabled']: + if not all( + [self.spec['dynamicAllocation']['initialExecutors'], self.spec['dynamicAllocation']['minExecutors'], self.spec['dynamicAllocation']['maxExecutors']] + ): + raise AirflowException("Make sure initial/min/max value for dynamic allocation is passed") + + def update_resources(self): + spark_resources = SparkResources(self.spec['driver'].pop('container_resources'), self.spec['executor'].pop('container_resources')) + self.spec['driver'].update(spark_resources.resources['driver']) + self.spec['executor'].update(spark_resources.resources['executor']) + + +class KubernetesSpec(): + def __init__(self, **entries): + self.__dict__.update(entries) + self.set_attribute() + + def set_attribute(self): + self.env_vars = convert_env_vars(self.env_vars) if self.env_vars else [] + self.image_pull_secrets = convert_image_pull_secrets(image_pull_secrets) if self.image_pull_secrets else [] + if self.config_map_mounts: + vols, vols_mounts = convert_configmap_to_volume(self.config_map_mounts) + self.volumes.extend(vols) + self.volume_mounts.extend(vols_mounts) + if self.from_env_config_map: + self.env_from.extend([convert_configmap(c_name) for c_name in self.from_env_config_map]) + if self.from_env_secret: + self.env_from.extend([convert_secret(c) for c in self.from_env_secret]) + + +class SparkResources: + """spark resources""" + + def __init__( + self, + driver=None, + executor=None + ): + self.default = { + 'gpu': {'name': None, 'quantity': 0}, + 'cpu': {'request': None, 'limit': None}, + 'memory': {'request': None, 'limit': None} + } + self.driver = deepcopy(self.default) + self.executor = deepcopy(self.default) + if driver: + self.driver.update(driver) + if executor: + self.executor.update(executor) + self.convert_resources() + + @property + def resources(self): + """Return job resources""" + return {'driver': self.driver_resources, 'executor': self.executor_resources} + + @property + def driver_resources(self): + """Return resources to use""" + driver = {} + if self.driver['cpu'].get('request'): + driver['cores'] = self.driver['cpu']['request'] + if self.driver['cpu'].get('limit'): + driver['coreLimit'] = self.driver['cpu']['limit'] + if self.driver['memory'].get('limit'): + driver['memory'] = self.driver['memory']['limit'] + if self.driver['gpu'].get('name') and self.driver['gpu'].get('quantity'): + driver['gpu'] = {'name': self.driver['gpu']['name'], 'quantity': self.driver['gpu']['quantity']} + return driver + + @property + def executor_resources(self): + """Return resources to use""" + executor = {} + if self.executor['cpu'].get('request'): + executor['cores'] = self.executor['cpu']['request'] + if self.executor['cpu'].get('limit'): + executor['coreLimit'] = self.executor['cpu']['limit'] + if self.executor['memory'].get('limit'): + executor['memory'] = self.executor['memory']['limit'] + if self.executor['gpu'].get('name') and self.executor['gpu'].get('quantity'): + executor['gpu'] = {'name': self.executor['gpu']['name'], 'quantity': self.executor['gpu']['quantity']} + return executor + + def convert_resources(self): + if isinstance(self.driver['memory'].get('limit'), str): + if 'G' in self.driver['memory']['limit'] or 'Gi' in self.driver['memory']['limit']: + self.driver['memory']['limit'] = float(self.driver['memory']['limit'].rstrip('Gi G')) * 1024 + elif 'm' in self.driver['memory']['limit']: + self.driver['memory']['limit'] = float(self.driver['memory']['limit'].rstrip('m')) + # Adjusting the memory value as operator adds 40% to the given value + self.driver['memory']['limit'] = str(int(self.driver['memory']['limit'] / 1.4)) + 'm' + + if isinstance(self.executor['memory'].get('limit'), str): + if 'G' in self.executor['memory']['limit'] or 'Gi' in self.executor['memory']['limit']: + self.executor['memory']['limit'] = float(self.executor['memory']['limit'].rstrip('Gi G')) * 1024 + elif 'm' in self.executor['memory']['limit']: + self.executor['memory']['limit'] = float(self.executor['memory']['limit'].rstrip('m')) + # Adjusting the memory value as operator adds 40% to the given value + self.executor['memory']['limit'] = str(int(self.executor['memory']['limit'] / 1.4)) + 'm' + + if self.driver['cpu'].get('request'): + self.driver['cpu']['request'] = int(float(self.driver['cpu']['request'])) + if self.driver['cpu'].get('limit'): + self.driver['cpu']['limit'] = str(self.driver['cpu']['limit']) + if self.executor['cpu'].get('request'): + self.executor['cpu']['request'] = int(float(self.executor['cpu']['request'])) + if self.executor['cpu'].get('limit'): + self.executor['cpu']['limit'] = str(self.executor['cpu']['limit']) + + if self.driver['gpu'].get('quantity'): + self.driver['gpu']['quantity'] = int(float(driver['gpu']['quantity'])) + if self.executor['gpu'].get('quantity'): + self.executor['gpu']['quantity'] = int(float(self.executor['gpu']['quantity'])) + + +class CustomObjectStatus: + """Status of the PODs""" + + SUBMITTED = 'SUBMITTED' + RUNNING = 'RUNNING' + FAILED = 'FAILED' + SUCCEEDED = 'SUCCEEDED' + + +class CustomObjectLauncher(LoggingMixin): + """Launches PODS""" + + def __init__( + self, + name: str, + namespace: str, + kube_client: client.CoreV1Api, + custom_obj_api: client.CustomObjectsApi, + template_body: Optional[str] = None, + ): + """ + Creates the launcher. Review Comment: Which launcher? Maybe we should specify? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
