This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 29ce614b21d070262d0614aa9c4d2cf2ae3c2d37 Author: Leonardo Alves Miguel <[email protected]> AuthorDate: Thu Feb 27 07:25:24 2020 -0300 [AIRFLOW-5659] Add support for ephemeral storage on KubernetesPodOperator (#6337) (cherry picked from commit dfb18adaf53fa12f1b10b1283321b1dd71211059) --- airflow/kubernetes/pod.py | 31 ++++++++++++++++++++---- airflow/kubernetes/pod_generator.py | 6 +++-- kubernetes_tests/test_kubernetes_pod_operator.py | 4 +++ tests/kubernetes/test_pod_generator.py | 9 ++++--- 4 files changed, 40 insertions(+), 10 deletions(-) diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py index 6a0e788..b1df462 100644 --- a/airflow/kubernetes/pod.py +++ b/airflow/kubernetes/pod.py @@ -33,25 +33,33 @@ class Resources(K8SModel): :type request_memory: str :param request_cpu: requested CPU number :type request_cpu: float | str + :param request_ephemeral_storage: requested ephermeral storage + :type request_ephemeral_storage: str :param limit_memory: limit for memory usage :type limit_memory: str :param limit_cpu: Limit for CPU used :type limit_cpu: float | str :param limit_gpu: Limits for GPU used :type limit_gpu: int + :param limit_ephemeral_storage: Limit for ephermeral storage + :type limit_ephemeral_storage: float | str """ def __init__( self, request_memory=None, request_cpu=None, + request_ephemeral_storage=None, limit_memory=None, limit_cpu=None, - limit_gpu=None): + limit_gpu=None, + limit_ephemeral_storage=None): self.request_memory = request_memory self.request_cpu = request_cpu + self.request_ephemeral_storage = request_ephemeral_storage self.limit_memory = limit_memory self.limit_cpu = limit_cpu self.limit_gpu = limit_gpu + self.limit_ephemeral_storage = limit_ephemeral_storage def is_empty_resource_request(self): """Whether resource is empty""" @@ -59,16 +67,29 @@ class Resources(K8SModel): def has_limits(self): """Whether resource has limits""" - return self.limit_cpu is not None or self.limit_memory is not None or self.limit_gpu is not None + return self.limit_cpu is not None or \ + self.limit_memory is not None or \ + self.limit_gpu is not None or \ + self.limit_ephemeral_storage is not None def has_requests(self): """Whether resource has requests""" - return self.request_cpu is not None or self.request_memory is not None + return self.request_cpu is not None or \ + self.request_memory is not None or \ + self.request_ephemeral_storage is not None def to_k8s_client_obj(self): return k8s.V1ResourceRequirements( - limits={'cpu': self.limit_cpu, 'memory': self.limit_memory, 'nvidia.com/gpu': self.limit_gpu}, - requests={'cpu': self.request_cpu, 'memory': self.request_memory} + limits={ + 'cpu': self.limit_cpu, + 'memory': self.limit_memory, + 'nvidia.com/gpu': self.limit_gpu, + 'ephemeral-storage': self.limit_ephemeral_storage + }, + requests={ + 'cpu': self.request_cpu, + 'memory': self.request_memory, + 'ephemeral-storage': self.request_ephemeral_storage} ) def attach_to_pod(self, pod): diff --git a/airflow/kubernetes/pod_generator.py b/airflow/kubernetes/pod_generator.py index 6956fc3..711b1a9 100644 --- a/airflow/kubernetes/pod_generator.py +++ b/airflow/kubernetes/pod_generator.py @@ -323,11 +323,13 @@ class PodGenerator: if resources is None: requests = { 'cpu': namespaced.get('request_cpu'), - 'memory': namespaced.get('request_memory') + 'memory': namespaced.get('request_memory'), + 'ephemeral-storage': namespaced.get('ephemeral-storage') } limits = { 'cpu': namespaced.get('limit_cpu'), - 'memory': namespaced.get('limit_memory') + 'memory': namespaced.get('limit_memory'), + 'ephemeral-storage': namespaced.get('ephemeral-storage') } all_resources = list(requests.values()) + list(limits.values()) if all(r is None for r in all_resources): diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 0a129b6..e20324b 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -375,8 +375,10 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): resources = { 'limit_cpu': 0.25, 'limit_memory': '64Mi', + 'limit_ephemeral_storage': '2Gi', 'request_cpu': '250m', 'request_memory': '64Mi', + 'request_ephemeral_storage': '1Gi', } k = KubernetesPodOperator( namespace='default', @@ -397,11 +399,13 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): 'requests': { 'memory': '64Mi', 'cpu': '250m', + 'ephemeral-storage': '1Gi' }, 'limits': { 'memory': '64Mi', 'cpu': 0.25, 'nvidia.com/gpu': None, + 'ephemeral-storage': '2Gi' } } self.assertEqual(self.expected_pod, actual_pod) diff --git a/tests/kubernetes/test_pod_generator.py b/tests/kubernetes/test_pod_generator.py index a9a3aa5..ce15a8b 100644 --- a/tests/kubernetes/test_pod_generator.py +++ b/tests/kubernetes/test_pod_generator.py @@ -43,6 +43,7 @@ class TestPodGenerator(unittest.TestCase): # This should produce a single secret mounted in env Secret('env', 'TARGET', 'secret_b', 'source_b'), ] + self.labels = { 'airflow-worker': 'uuid', 'dag_id': 'dag_id', @@ -58,7 +59,7 @@ class TestPodGenerator(unittest.TestCase): 'namespace': 'namespace' } - self.resources = Resources('1Gi', 1, '2Gi', 2, 1) + self.resources = Resources('1Gi', 1, '2Gi', '2Gi', 2, 1, '4Gi') self.k8s_client = ApiClient() self.expected = { 'apiVersion': 'v1', @@ -108,12 +109,14 @@ class TestPodGenerator(unittest.TestCase): 'resources': { 'requests': { 'memory': '1Gi', - 'cpu': 1 + 'cpu': 1, + 'ephemeral-storage': '2Gi' }, 'limits': { 'memory': '2Gi', 'cpu': 2, - 'nvidia.com/gpu': 1 + 'nvidia.com/gpu': 1, + 'ephemeral-storage': '4Gi' }, }, 'ports': [{'name': 'foo', 'containerPort': 1234}],
