Repository: incubator-airflow Updated Branches: refs/heads/master 5d90d132a -> 348f25f08
[AIRFLOW-341][operators] Add resource requirement attributes to operators This PR adds optional resource requirements for tasks for use with resource managers such as Yarn and Mesos. Considerations: - I chose to force users to encapsulate resources in a resources object e.g. Resources(cpu=1) instead of just cpu=1 in their dag attributes. This creates the pain of having to import Resources for almost every DAG. I think this is kind of important for scoping/namespacing which we should start doing. - Once resources are used by executors we need to add documentation for these new resources (and examples) Testing Done: - New/existing unit tests plypaul artwr mistercrunch jlowin bolkedebruin criccomini Closes #1669 from aoen/ddavydov/ddavydov/augment_tasks_with_resources Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/348f25f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/348f25f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/348f25f0 Branch: refs/heads/master Commit: 348f25f08af2c02627cec04453564edd2fb69fa3 Parents: 5d90d13 Author: Dan Davydov <[email protected]> Authored: Tue Jul 19 16:39:30 2016 -0700 Committer: Dan Davydov <[email protected]> Committed: Tue Jul 19 16:39:34 2016 -0700 ---------------------------------------------------------------------- airflow/configuration.py | 10 ++- airflow/models.py | 6 ++ airflow/utils/operator_resources.py | 121 +++++++++++++++++++++++++++++++ tests/utils.py | 35 +++++++++ 4 files changed, 171 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/348f25f0/airflow/configuration.py ---------------------------------------------------------------------- diff --git a/airflow/configuration.py b/airflow/configuration.py index ff9c361..e03b713 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -110,7 +110,11 @@ defaults = { 'non_pooled_task_slot_count': 128, }, 'operators': { - 'default_owner': 'airflow' + 'default_owner': 'airflow', + 'default_cpus': 1, + 'default_ram': 512, + 'default_disk': 512, + 'default_gpus': 0, }, 'webserver': { 'base_url': 'http://localhost:8080', @@ -255,6 +259,10 @@ dagbag_import_timeout = 30 # The default owner assigned to each new operator, unless # provided explicitly or passed via `default_args` default_owner = Airflow +default_cpus: 1, +default_ram: 512, +default_disk: 512, +default_gpu: 0, [webserver] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/348f25f0/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 6e6d6fc..1b61318 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -66,6 +66,7 @@ from airflow.utils.email import send_email from airflow.utils.helpers import ( as_tuple, is_container, is_in, validate_key, pprinttable) from airflow.utils.logging import LoggingMixin +from airflow.utils.operator_resources import Resources from airflow.utils.state import State from airflow.utils.timeout import timeout from airflow.utils.trigger_rule import TriggerRule @@ -1796,6 +1797,9 @@ class BaseOperator(object): using the constants defined in the static class ``airflow.utils.TriggerRule`` :type trigger_rule: str + :param resources: A map of resource parameter names (the argument names of the + Resources constructor) to their values. + :type resources: dict """ # For derived classes to define which fields will get jinjaified @@ -1836,6 +1840,7 @@ class BaseOperator(object): on_success_callback=None, on_retry_callback=None, trigger_rule=TriggerRule.ALL_SUCCESS, + resources=None, *args, **kwargs): @@ -1898,6 +1903,7 @@ class BaseOperator(object): self.params = params or {} # Available in templates! self.adhoc = adhoc self.priority_weight = priority_weight + self.resources = Resources(**(resources or {})) # Private attributes self._upstream_task_ids = [] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/348f25f0/airflow/utils/operator_resources.py ---------------------------------------------------------------------- diff --git a/airflow/utils/operator_resources.py b/airflow/utils/operator_resources.py new file mode 100644 index 0000000..d304170 --- /dev/null +++ b/airflow/utils/operator_resources.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow import configuration +from airflow.exceptions import AirflowException + +# Constants for resources (megabytes are the base unit) +MB = 1 +GB = 1024 * MB +TB = 1024 * GB +PB = 1024 * TB +EB = 1024 * PB + + +class Resource(object): + """ + Represents a resource requirement in an execution environment for an operator. + + :param name: Name of the resource + :type name: string + :param units_str: The string representing the units of a resource (e.g. MB for a CPU + resource) to be used for display purposes + :type units_str: string + :param qty: The number of units of the specified resource that are required for + execution of the operator. + :type qty: long + """ + def __init__(self, name, units_str, qty): + if qty < 0: + raise AirflowException( + 'Received resource quantity {} for resource {} but resource quantity ' + 'must be non-negative.'.format(qty, name)) + + self._name = name + self._units_str = units_str + self._qty = qty + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + def __repr__(self): + return str(self.__dict__) + + @property + def name(self): + return self._name + + @property + def units_str(self): + return self._units_str + + @property + def qty(self): + return self._qty + + +class CpuResource(Resource): + def __init__(self, qty): + super(CpuResource, self).__init__('CPU', 'core(s)', qty) + + +class RamResource(Resource): + def __init__(self, qty): + super(RamResource, self).__init__('RAM', 'MB', qty) + + +class DiskResource(Resource): + def __init__(self, qty): + super(DiskResource, self).__init__('Disk', 'MB', qty) + + +class GpuResource(Resource): + def __init__(self, qty): + super(GpuResource, self).__init__('GPU', 'gpu(s)', qty) + + +class Resources(object): + """ + The resources required by an operator. Resources that are not specified will use the + default values from the airflow config. + + :param cpus: The number of cpu cores that are required + :type cpus: long + :param ram: The amount of RAM required + :type ram: long + :param disk: The amount of disk space required + :type disk: long + :param gpus: The number of gpu units that are required + :type gpus: long + """ + def __init__(self, cpus=None, ram=None, disk=None, gpus=None): + if cpus is None: + cpus = configuration.getint('operators', 'default_cpus') + if ram is None: + ram = configuration.getint('operators', 'default_ram') + if disk is None: + disk = configuration.getint('operators', 'default_disk') + if gpus is None: + gpus = configuration.getint('operators', 'default_gpus') + + self.cpus = CpuResource(cpus) + self.ram = RamResource(ram) + self.disk = DiskResource(disk) + self.gpus = GpuResource(gpus) + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + def __repr__(self): + return str(self.__dict__) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/348f25f0/tests/utils.py ---------------------------------------------------------------------- diff --git a/tests/utils.py b/tests/utils.py index 89a39d8..ebed15b 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -21,7 +21,9 @@ import logging import unittest import airflow.utils.logging +from airflow import configuration from airflow.exceptions import AirflowException +from airflow.utils.operator_resources import Resources class LogUtilsTest(unittest.TestCase): @@ -54,3 +56,36 @@ class LogUtilsTest(unittest.TestCase): self.assertEqual( glog.parse_gcs_url('gs://bucket/'), ('bucket', '')) + + +class OperatorResourcesTest(unittest.TestCase): + def test_all_resources_specified(self): + resources = Resources(cpus=1, ram=2, disk=3, gpus=4) + self.assertEqual(resources.cpus.qty, 1) + self.assertEqual(resources.ram.qty, 2) + self.assertEqual(resources.disk.qty, 3) + self.assertEqual(resources.gpus.qty, 4) + + def test_some_resources_specified(self): + resources = Resources(cpus=0, disk=1) + self.assertEqual(resources.cpus.qty, 0) + self.assertEqual(resources.ram.qty, + configuration.defaults['operators']['default_ram']) + self.assertEqual(resources.disk.qty, 1) + self.assertEqual(resources.gpus.qty, + configuration.defaults['operators']['default_gpus']) + + def test_no_resources_specified(self): + resources = Resources() + self.assertEqual(resources.cpus.qty, + configuration.defaults['operators']['default_cpus']) + self.assertEqual(resources.ram.qty, + configuration.defaults['operators']['default_ram']) + self.assertEqual(resources.disk.qty, + configuration.defaults['operators']['default_disk']) + self.assertEqual(resources.gpus.qty, + configuration.defaults['operators']['default_gpus']) + + def test_negative_resource_qty(self): + with self.assertRaises(AirflowException): + Resources(cpus=-1)
