This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 05ec21a22f84cdbe2aaed38b712c30f2cbb38b59 Author: Daniel Imberman <[email protected]> AuthorDate: Thu Jul 23 19:52:20 2020 -0700 Fix bug in executor_config when defining resources (#9935) * Fix PodGenerator to handle Kubernetes resources In Airflow 1.10.11, `namespaced['resources'] = resources` is missing. This PR improves the definition of pod resources, `requests` and `limits` are optional. * Make it working in 2.7 * Add limit_gpu and fix ephemeral-storage keys * Fix flake8 Co-authored-by: Riccardo Bini <[email protected]> --- airflow/kubernetes/pod.py | 4 +- airflow/kubernetes/pod_generator.py | 33 ++++++--- tests/kubernetes/test_pod_generator.py | 132 +++++++++++++++++++++++++++++++++ 3 files changed, 155 insertions(+), 14 deletions(-) diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py index b1df462..0b332c2 100644 --- a/airflow/kubernetes/pod.py +++ b/airflow/kubernetes/pod.py @@ -33,7 +33,7 @@ class Resources(K8SModel): :type request_memory: str :param request_cpu: requested CPU number :type request_cpu: float | str - :param request_ephemeral_storage: requested ephermeral storage + :param request_ephemeral_storage: requested ephemeral storage :type request_ephemeral_storage: str :param limit_memory: limit for memory usage :type limit_memory: str @@ -41,7 +41,7 @@ class Resources(K8SModel): :type limit_cpu: float | str :param limit_gpu: Limits for GPU used :type limit_gpu: int - :param limit_ephemeral_storage: Limit for ephermeral storage + :param limit_ephemeral_storage: Limit for ephemeral storage :type limit_ephemeral_storage: float | str """ def __init__( diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py index e46407b..d11c175 100644 --- a/airflow/kubernetes/pod_generator.py +++ b/airflow/kubernetes/pod_generator.py @@ -344,18 +344,26 @@ class PodGenerator(object): resources = namespaced.get('resources') if resources is None: - requests = { - 'cpu': namespaced.get('request_cpu'), - 'memory': namespaced.get('request_memory'), - 'ephemeral-storage': namespaced.get('ephemeral-storage') - } - limits = { - 'cpu': namespaced.get('limit_cpu'), - 'memory': namespaced.get('limit_memory'), - 'ephemeral-storage': namespaced.get('ephemeral-storage') - } - all_resources = list(requests.values()) + list(limits.values()) - if all(r is None for r in all_resources): + def extract(cpu, memory, ephemeral_storage, limit_gpu=None): + resources_obj = { + 'cpu': namespaced.pop(cpu, None), + 'memory': namespaced.pop(memory, None), + 'ephemeral-storage': namespaced.pop(ephemeral_storage, None), + } + if limit_gpu is not None: + resources_obj['nvidia.com/gpu'] = namespaced.pop(limit_gpu, None) + + resources_obj = {k: v for k, v in resources_obj.items() if v is not None} + + if all(r is None for r in resources_obj): + resources_obj = None + return namespaced, resources_obj + + namespaced, requests = extract('request_cpu', 'request_memory', 'request_ephemeral_storage') + namespaced, limits = extract('limit_cpu', 'limit_memory', 'limit_ephemeral_storage', + limit_gpu='limit_gpu') + + if requests is None and limits is None: resources = None else: resources = k8s.V1ResourceRequirements( @@ -371,6 +379,7 @@ class PodGenerator(object): 'iam.cloud.google.com/service-account': gcp_service_account_key }) + namespaced['resources'] = resources return PodGenerator(**namespaced).gen_pod() @staticmethod diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py index 7d39cdc..d0faf4c 100644 --- a/tests/kubernetes/test_pod_generator.py +++ b/tests/kubernetes/test_pod_generator.py @@ -288,6 +288,138 @@ class TestPodGenerator(unittest.TestCase): }, result) @mock.patch('uuid.uuid4') + def test_from_obj_with_resources(self, mock_uuid): + self.maxDiff = None + + mock_uuid.return_value = self.static_uuid + result = PodGenerator.from_obj({ + "KubernetesExecutor": { + "annotations": {"test": "annotation"}, + "volumes": [ + { + "name": "example-kubernetes-test-volume", + "hostPath": {"path": "/tmp/"}, + }, + ], + "volume_mounts": [ + { + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume", + }, + ], + 'request_cpu': "200m", + 'limit_cpu': "400m", + 'request_memory': "500Mi", + 'limit_memory': "1000Mi", + 'limit_gpu': "2", + 'request_ephemeral_storage': '2Gi', + 'limit_ephemeral_storage': '4Gi', + } + }) + result = self.k8s_client.sanitize_for_serialization(result) + + self.assertEqual({ + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'annotations': {'test': 'annotation'}, + }, + 'spec': { + 'containers': [{ + 'args': [], + 'command': [], + 'env': [], + 'envFrom': [], + 'name': 'base', + 'ports': [], + 'resources': { + 'limits': { + 'cpu': '400m', + 'ephemeral-storage': '4Gi', + 'memory': '1000Mi', + 'nvidia.com/gpu': "2", + }, + 'requests': { + 'cpu': '200m', + 'ephemeral-storage': '2Gi', + 'memory': '500Mi', + }, + }, + 'volumeMounts': [{ + 'mountPath': '/foo/', + 'name': 'example-kubernetes-test-volume' + }], + }], + 'hostNetwork': False, + 'imagePullSecrets': [], + 'volumes': [{ + 'hostPath': {'path': '/tmp/'}, + 'name': 'example-kubernetes-test-volume' + }], + } + }, result) + + @mock.patch('uuid.uuid4') + def test_from_obj_with_only_request_resources(self, mock_uuid): + self.maxDiff = None + + mock_uuid.return_value = self.static_uuid + result = PodGenerator.from_obj({ + "KubernetesExecutor": { + "annotations": {"test": "annotation"}, + "volumes": [ + { + "name": "example-kubernetes-test-volume", + "hostPath": {"path": "/tmp/"}, + }, + ], + "volume_mounts": [ + { + "mountPath": "/foo/", + "name": "example-kubernetes-test-volume", + }, + ], + 'request_cpu': "200m", + 'request_memory': "500Mi", + } + }) + result = self.k8s_client.sanitize_for_serialization(result) + + self.assertEqual({ + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'annotations': {'test': 'annotation'}, + }, + 'spec': { + 'containers': [{ + 'args': [], + 'command': [], + 'env': [], + 'envFrom': [], + 'name': 'base', + 'ports': [], + 'resources': { + 'requests': { + 'cpu': '200m', + 'memory': '500Mi', + }, + }, + 'volumeMounts': [{ + 'mountPath': '/foo/', + 'name': 'example-kubernetes-test-volume' + }], + }], + 'hostNetwork': False, + 'imagePullSecrets': [], + 'volumes': [{ + 'hostPath': {'path': '/tmp/'}, + 'name': 'example-kubernetes-test-volume' + }], + } + }, result) + + @mock.patch('uuid.uuid4') def test_reconcile_pods_empty_mutator_pod(self, mock_uuid): mock_uuid.return_value = self.static_uuid base_pod = PodGenerator(
