Repository: incubator-airflow
Updated Branches:
  refs/heads/master 5d90d132a -> 348f25f08


[AIRFLOW-341][operators] Add resource requirement attributes to operators

This PR adds optional resource requirements for tasks for use with
resource managers such as Yarn and Mesos.

Considerations:
- I chose to force users to encapsulate resources in a resources object
e.g. Resources(cpu=1) instead of just cpu=1 in their dag attributes.
This creates the pain of having to import Resources for almost every
DAG. I think this is kind of important for scoping/namespacing which we
should start doing.
- Once resources are used by executors we need to add documentation for
these new resources (and examples)

Testing Done:
- New/existing unit tests

plypaul artwr mistercrunch jlowin bolkedebruin criccomini

Closes #1669 from aoen/ddavydov/ddavydov/augment_tasks_with_resources


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/348f25f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/348f25f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/348f25f0

Branch: refs/heads/master
Commit: 348f25f08af2c02627cec04453564edd2fb69fa3
Parents: 5d90d13
Author: Dan Davydov <[email protected]>
Authored: Tue Jul 19 16:39:30 2016 -0700
Committer: Dan Davydov <[email protected]>
Committed: Tue Jul 19 16:39:34 2016 -0700

----------------------------------------------------------------------
 airflow/configuration.py            |  10 ++-
 airflow/models.py                   |   6 ++
 airflow/utils/operator_resources.py | 121 +++++++++++++++++++++++++++++++
 tests/utils.py                      |  35 +++++++++
 4 files changed, 171 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/348f25f0/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index ff9c361..e03b713 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -110,7 +110,11 @@ defaults = {
         'non_pooled_task_slot_count': 128,
     },
     'operators': {
-        'default_owner': 'airflow'
+        'default_owner': 'airflow',
+        'default_cpus': 1,
+        'default_ram': 512,
+        'default_disk': 512,
+        'default_gpus': 0,
     },
     'webserver': {
         'base_url': 'http://localhost:8080',
@@ -255,6 +259,10 @@ dagbag_import_timeout = 30
 # The default owner assigned to each new operator, unless
 # provided explicitly or passed via `default_args`
 default_owner = Airflow
+default_cpus: 1,
+default_ram: 512,
+default_disk: 512,
+default_gpu: 0,
 
 
 [webserver]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/348f25f0/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 6e6d6fc..1b61318 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -66,6 +66,7 @@ from airflow.utils.email import send_email
 from airflow.utils.helpers import (
     as_tuple, is_container, is_in, validate_key, pprinttable)
 from airflow.utils.logging import LoggingMixin
+from airflow.utils.operator_resources import Resources
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
 from airflow.utils.trigger_rule import TriggerRule
@@ -1796,6 +1797,9 @@ class BaseOperator(object):
         using the constants defined in the static class
         ``airflow.utils.TriggerRule``
     :type trigger_rule: str
+    :param resources: A map of resource parameter names (the argument names of 
the
+        Resources constructor) to their values.
+    :type resources: dict
     """
 
     # For derived classes to define which fields will get jinjaified
@@ -1836,6 +1840,7 @@ class BaseOperator(object):
             on_success_callback=None,
             on_retry_callback=None,
             trigger_rule=TriggerRule.ALL_SUCCESS,
+            resources=None,
             *args,
             **kwargs):
 
@@ -1898,6 +1903,7 @@ class BaseOperator(object):
         self.params = params or {}  # Available in templates!
         self.adhoc = adhoc
         self.priority_weight = priority_weight
+        self.resources = Resources(**(resources or {}))
 
         # Private attributes
         self._upstream_task_ids = []

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/348f25f0/airflow/utils/operator_resources.py
----------------------------------------------------------------------
diff --git a/airflow/utils/operator_resources.py 
b/airflow/utils/operator_resources.py
new file mode 100644
index 0000000..d304170
--- /dev/null
+++ b/airflow/utils/operator_resources.py
@@ -0,0 +1,121 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow import configuration
+from airflow.exceptions import AirflowException
+
+# Constants for resources (megabytes are the base unit)
+MB = 1
+GB = 1024 * MB
+TB = 1024 * GB
+PB = 1024 * TB
+EB = 1024 * PB
+
+
+class Resource(object):
+    """
+    Represents a resource requirement in an execution environment for an 
operator.
+
+    :param name: Name of the resource
+    :type name: string
+    :param units_str: The string representing the units of a resource (e.g. MB 
for a CPU
+        resource) to be used for display purposes
+    :type units_str: string
+    :param qty: The number of units of the specified resource that are 
required for
+        execution of the operator.
+    :type qty: long
+    """
+    def __init__(self, name, units_str, qty):
+        if qty < 0:
+            raise AirflowException(
+                'Received resource quantity {} for resource {} but resource 
quantity '
+                'must be non-negative.'.format(qty, name))
+
+        self._name = name
+        self._units_str = units_str
+        self._qty = qty
+
+    def __eq__(self, other):
+        return self.__dict__ == other.__dict__
+
+    def __repr__(self):
+        return str(self.__dict__)
+
+    @property
+    def name(self):
+        return self._name
+
+    @property
+    def units_str(self):
+        return self._units_str
+
+    @property
+    def qty(self):
+        return self._qty
+
+
+class CpuResource(Resource):
+    def __init__(self, qty):
+        super(CpuResource, self).__init__('CPU', 'core(s)', qty)
+
+
+class RamResource(Resource):
+    def __init__(self, qty):
+        super(RamResource, self).__init__('RAM', 'MB', qty)
+
+
+class DiskResource(Resource):
+    def __init__(self, qty):
+        super(DiskResource, self).__init__('Disk', 'MB', qty)
+
+
+class GpuResource(Resource):
+    def __init__(self, qty):
+        super(GpuResource, self).__init__('GPU', 'gpu(s)', qty)
+
+
+class Resources(object):
+    """
+    The resources required by an operator. Resources that are not specified 
will use the
+    default values from the airflow config.
+
+    :param cpus: The number of cpu cores that are required
+    :type cpus: long
+    :param ram: The amount of RAM required
+    :type ram: long
+    :param disk: The amount of disk space required
+    :type disk: long
+    :param gpus: The number of gpu units that are required
+    :type gpus: long
+    """
+    def __init__(self, cpus=None, ram=None, disk=None, gpus=None):
+        if cpus is None:
+            cpus = configuration.getint('operators', 'default_cpus')
+        if ram is None:
+            ram = configuration.getint('operators', 'default_ram')
+        if disk is None:
+            disk = configuration.getint('operators', 'default_disk')
+        if gpus is None:
+            gpus = configuration.getint('operators', 'default_gpus')
+
+        self.cpus = CpuResource(cpus)
+        self.ram = RamResource(ram)
+        self.disk = DiskResource(disk)
+        self.gpus = GpuResource(gpus)
+
+    def __eq__(self, other):
+        return self.__dict__ == other.__dict__
+
+    def __repr__(self):
+        return str(self.__dict__)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/348f25f0/tests/utils.py
----------------------------------------------------------------------
diff --git a/tests/utils.py b/tests/utils.py
index 89a39d8..ebed15b 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -21,7 +21,9 @@ import logging
 import unittest
 
 import airflow.utils.logging
+from airflow import configuration
 from airflow.exceptions import AirflowException
+from airflow.utils.operator_resources import Resources
 
 
 class LogUtilsTest(unittest.TestCase):
@@ -54,3 +56,36 @@ class LogUtilsTest(unittest.TestCase):
         self.assertEqual(
             glog.parse_gcs_url('gs://bucket/'),
             ('bucket', ''))
+
+
+class OperatorResourcesTest(unittest.TestCase):
+    def test_all_resources_specified(self):
+        resources = Resources(cpus=1, ram=2, disk=3, gpus=4)
+        self.assertEqual(resources.cpus.qty, 1)
+        self.assertEqual(resources.ram.qty, 2)
+        self.assertEqual(resources.disk.qty, 3)
+        self.assertEqual(resources.gpus.qty, 4)
+
+    def test_some_resources_specified(self):
+        resources = Resources(cpus=0, disk=1)
+        self.assertEqual(resources.cpus.qty, 0)
+        self.assertEqual(resources.ram.qty,
+                         configuration.defaults['operators']['default_ram'])
+        self.assertEqual(resources.disk.qty, 1)
+        self.assertEqual(resources.gpus.qty,
+                         configuration.defaults['operators']['default_gpus'])
+
+    def test_no_resources_specified(self):
+        resources = Resources()
+        self.assertEqual(resources.cpus.qty,
+                         configuration.defaults['operators']['default_cpus'])
+        self.assertEqual(resources.ram.qty,
+                         configuration.defaults['operators']['default_ram'])
+        self.assertEqual(resources.disk.qty,
+                         configuration.defaults['operators']['default_disk'])
+        self.assertEqual(resources.gpus.qty,
+                         configuration.defaults['operators']['default_gpus'])
+
+    def test_negative_resource_qty(self):
+        with self.assertRaises(AirflowException):
+            Resources(cpus=-1)

Reply via email to