Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-10-test 64900e2b0 -> d96534c26


[AIRFLOW-2068] MesosExecutor allows optional Docker image

In its current form, MesosExecutor schedules tasks
on mesos slaves which
just contain airflow commands assuming that the
mesos slaves already
have airflow installed and configured on them.
This assumption goes
against the Mesos philosophy of having a
heterogeneous cluster.

Since Mesos provides an option to pull a Docker
image before actually
running the actual task/command so this
improvement changes the
mesos_executor.py to specify an optional docker
image containing
airflow which can be pulled on slaves before
running the actual
airflow command. This also opens the door for an
optimization of
resources in a future PR, by allowing the
specification of CPU and
memory needed for each airflow task.

Closes #3008 from agrajm/AIRFLOW-2068

(cherry picked from commit 1f86299cf998729ea93c666481390451c9724ebc)
Signed-off-by: Bolke de Bruin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d96534c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d96534c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d96534c2

Branch: refs/heads/v1-10-test
Commit: d96534c268a13f8e6a0e1e288821a436322926c4
Parents: 64900e2
Author: Agraj Mangal <[email protected]>
Authored: Mon Apr 23 22:22:29 2018 +0200
Committer: Bolke de Bruin <[email protected]>
Committed: Mon Apr 23 22:23:06 2018 +0200

----------------------------------------------------------------------
 airflow/config_templates/default_airflow.cfg   |   4 +
 airflow/config_templates/default_test.cfg      |   9 ++
 airflow/contrib/executors/__init__.py          |   6 +-
 airflow/contrib/executors/mesos_executor.py    |  19 ++++
 docs/configuration.rst                         |  39 +++++++-
 tests/contrib/executors/__init__.py            |  23 +++--
 tests/contrib/executors/test_mesos_executor.py | 105 ++++++++++++++++++++
 7 files changed, 188 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d96534c2/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 845342c..fa5eea0 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -479,6 +479,10 @@ authenticate = False
 # default_principal = admin
 # default_secret = admin
 
+# Optional Docker Image to run on slave before running the command
+# This image should be accessible from mesos slave i.e mesos slave
+# should be able to pull this docker image before executing the command.
+# docker_image_slave = puckel/docker-airflow
 
 [kerberos]
 ccache = /tmp/airflow_krb5_ccache

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d96534c2/airflow/config_templates/default_test.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_test.cfg 
b/airflow/config_templates/default_test.cfg
index 4145dfd..7c569cd 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -91,6 +91,15 @@ flower_host = 0.0.0.0
 flower_port = 5555
 default_queue = default
 
+[mesos]
+master = localhost:5050
+framework_name = Airflow
+task_cpu = 1
+task_memory = 256
+checkpoint = False
+authenticate = False
+docker_image_slave = test/docker-airflow
+
 [scheduler]
 job_heartbeat_sec = 1
 scheduler_heartbeat_sec = 5

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d96534c2/airflow/contrib/executors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/__init__.py 
b/airflow/contrib/executors/__init__.py
index f0f8b68..b7f8352 100644
--- a/airflow/contrib/executors/__init__.py
+++ b/airflow/contrib/executors/__init__.py
@@ -7,13 +7,13 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+#

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d96534c2/airflow/contrib/executors/mesos_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/mesos_executor.py 
b/airflow/contrib/executors/mesos_executor.py
index e1919fa..cf71939 100644
--- a/airflow/contrib/executors/mesos_executor.py
+++ b/airflow/contrib/executors/mesos_executor.py
@@ -68,6 +68,10 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, 
LoggingMixin):
         self.task_mem = task_mem
         self.task_counter = 0
         self.task_key_map = {}
+        if configuration.get('mesos', 'DOCKER_IMAGE_SLAVE'):
+            self.mesos_slave_docker_image = configuration.get(
+                'mesos', 'DOCKER_IMAGE_SLAVE'
+            )
 
     def registered(self, driver, frameworkId, masterInfo):
         self.log.info("AirflowScheduler registered to Mesos with framework ID 
%s", frameworkId.value)
@@ -159,6 +163,21 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, 
LoggingMixin):
                 command.value = cmd
                 task.command.MergeFrom(command)
 
+                # If docker image for airflow is specified in config then pull 
that
+                # image before running the above airflow command
+                if self.mesos_slave_docker_image:
+                    network = 
mesos_pb2.ContainerInfo.DockerInfo.Network.Value('BRIDGE')
+                    docker = mesos_pb2.ContainerInfo.DockerInfo(
+                        image=self.mesos_slave_docker_image,
+                        force_pull_image=False,
+                        network=network
+                    )
+                    container = mesos_pb2.ContainerInfo(
+                        type=mesos_pb2.ContainerInfo.DOCKER,
+                        docker=docker
+                    )
+                    task.container.MergeFrom(container)
+
                 tasks.append(task)
 
                 remainingCpus -= self.task_cpu

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d96534c2/docs/configuration.rst
----------------------------------------------------------------------
diff --git a/docs/configuration.rst b/docs/configuration.rst
index 46ac615..ea561b9 100644
--- a/docs/configuration.rst
+++ b/docs/configuration.rst
@@ -51,8 +51,8 @@ As Airflow was built to interact with its metadata using the 
great SqlAlchemy
 library, you should be able to use any database backend supported as a
 SqlAlchemy backend. We recommend using **MySQL** or **Postgres**.
 
-.. note:: We rely on more strict ANSI SQL settings for MySQL in order to have 
-   sane defaults. Make sure to have specified 
`explicit_defaults_for_timestamp=1` 
+.. note:: We rely on more strict ANSI SQL settings for MySQL in order to have
+   sane defaults. Make sure to have specified 
`explicit_defaults_for_timestamp=1`
    in your my.cnf under `[mysqld]`
 
 .. note:: If you decide to use **Postgres**, we recommend using the 
``psycopg2``
@@ -233,15 +233,22 @@ folder as 
``{dag_id}/{task_id}/{execution_date}/{try_number}.log``.
 
 Scaling Out on Mesos (community contributed)
 ''''''''''''''''''''''''''''''''''''''''''''
+There are two ways you can run airflow as a mesos framework:
+
+1. Running airflow tasks directly on mesos slaves, requiring each mesos slave 
to have airflow installed and configured.
+2. Running airflow tasks inside a docker container that has airflow installed, 
which is run on a mesos slave.
+
+Tasks executed directly on mesos slaves
+=======================================
 ``MesosExecutor`` allows you to schedule airflow tasks on a Mesos cluster.
 For this to work, you need a running mesos cluster and you must perform the 
following
 steps -
 
-1. Install airflow on a machine where web server and scheduler will run,
+1. Install airflow on a mesos slave where web server and scheduler will run,
    let's refer to this as the "Airflow server".
 2. On the Airflow server, install mesos python eggs from `mesos downloads 
<http://open.mesosphere.com/downloads/mesos/>`_.
-3. On the Airflow server, use a database (such as mysql) which can be accessed 
from mesos
-   slave machines and add configuration in ``airflow.cfg``.
+3. On the Airflow server, use a database (such as mysql) which can be accessed 
from all mesos
+   slaves and add configuration in ``airflow.cfg``.
 4. Change your ``airflow.cfg`` to point executor parameter to
    `MesosExecutor` and provide related Mesos settings.
 5. On all mesos slaves, install airflow. Copy the ``airflow.cfg`` from
@@ -266,6 +273,28 @@ The logs for airflow tasks can be seen in airflow UI as 
usual.
 For more information about mesos, refer to `mesos documentation 
<http://mesos.apache.org/documentation/latest/>`_.
 For any queries/bugs on `MesosExecutor`, please contact `@kapil-malik 
<https://github.com/kapil-malik>`_.
 
+Tasks executed in containers on mesos slaves
+============================================
+
+`This gist 
<https://gist.github.com/sebradloff/f158874e615bda0005c6f4577b20036e>`_ 
contains all files and configuration changes necessary to achieve the following:
+
+1. Create a dockerized version of airflow with mesos python eggs installed.
+
+  We recommend taking advantage of docker's multi stage builds in order to 
achieve this. We have one Dockerfile that defines building a specific version 
of mesos from source (Dockerfile-mesos), in order to create the python eggs. In 
the airflow Dockerfile (Dockerfile-airflow) we copy the python eggs from the 
mesos image.
+
+2. Create a mesos configuration block within the ``airflow.cfg``.
+
+  The configuration block remains the same as the default airflow 
configuration (default_airflow.cfg), but has the addition of an option 
``docker_image_slave``. This should be set to the name of the image you would 
like mesos to use when running airflow tasks. Make sure you have the proper 
configuration of the DNS record for your mesos master and any sort of 
authorization if any exists.
+
+3. Change your ``airflow.cfg`` to point the executor parameter to
+   `MesosExecutor` (`executor = SequentialExecutor`).
+
+4. Make sure your mesos slave has access to the docker repository you are 
using for your ``docker_image_slave``.
+
+  `Instructions are available in the mesos docs. 
<https://mesos.readthedocs.io/en/latest/docker-containerizer/#private-docker-repository>`_
+
+The rest is up to you and how you want to work with a dockerized airflow 
configuration.
+
 Integration with systemd
 ''''''''''''''''''''''''
 Airflow can integrate with systemd based systems. This makes watching your

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d96534c2/tests/contrib/executors/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/executors/__init__.py 
b/tests/contrib/executors/__init__.py
index 9d7677a..114d189 100644
--- a/tests/contrib/executors/__init__.py
+++ b/tests/contrib/executors/__init__.py
@@ -1,13 +1,18 @@
 # -*- coding: utf-8 -*-
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d96534c2/tests/contrib/executors/test_mesos_executor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/executors/test_mesos_executor.py 
b/tests/contrib/executors/test_mesos_executor.py
new file mode 100644
index 0000000..ea3faa2
--- /dev/null
+++ b/tests/contrib/executors/test_mesos_executor.py
@@ -0,0 +1,105 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+try:
+    from mesos.interface import mesos_pb2
+    from airflow.contrib.executors.mesos_executor import AirflowMesosScheduler
+    mock_mesos = True
+except ImportError:
+    mock_mesos = None
+
+from airflow import configuration
+from queue import Queue
+
+
+class MesosExecutorTest(unittest.TestCase):
+    FRAMEWORK_ID = 'fake_framework_id'
+
+    @unittest.skipIf(mock_mesos is None, "mesos python eggs are not present")
+    def setUp(self):
+        configuration.load_test_config()
+        self.framework_id = mesos_pb2.FrameworkID(value=self.FRAMEWORK_ID)
+        self.framework_info = mesos_pb2.FrameworkInfo(
+            user='fake_user',
+            name='fake_framework_name',
+        )
+        self.command_info = mesos_pb2.CommandInfo(value='fake-command')
+        self.executor_id = mesos_pb2.ExecutorID(value='fake-executor-id')
+        self.executor_info = mesos_pb2.ExecutorInfo(
+            executor_id=self.executor_id,
+            framework_id=self.framework_id,
+            command=self.command_info,
+        )
+        self.slave_id = mesos_pb2.SlaveID(value='fake-slave-id')
+        self.offer_id = mesos_pb2.OfferID(value='1')
+
+    @unittest.skipIf(mock_mesos is None, "mesos python eggs are not present")
+    @mock.patch('mesos.native.MesosSchedulerDriver')
+    def test_mesos_executor(self, driver):
+        # create task queue, empty result queue, task_cpu and task_memory
+        tasks_queue = Queue()
+        fake_af_task1 = {"key1", "airflow run tutorial"}
+        fake_af_task2 = {"key2", "airflow run tutorial2"}
+        tasks_queue.put(fake_af_task1)
+        tasks_queue.put(fake_af_task2)
+        results_queue = Queue()
+        task_cpu = 2
+        task_memory = 4
+        scheduler = AirflowMesosScheduler(tasks_queue,
+                                          results_queue,
+                                          task_cpu,
+                                          task_memory)
+        # Create Offers
+        resources = []
+        fake_cpu_resource = mesos_pb2.Resource(
+            name='cpus',
+            type=mesos_pb2.Value.SCALAR,
+        )
+        fake_cpu_resource.scalar.value = task_cpu
+        fake_mem_resource = mesos_pb2.Resource(
+            name='mem',
+            type=mesos_pb2.Value.SCALAR,
+        )
+        fake_mem_resource.scalar.value = task_memory
+        resources.append(fake_cpu_resource)
+        resources.append(fake_mem_resource)
+        fake_offer = mesos_pb2.Offer(
+            id=self.offer_id,
+            framework_id=self.framework_id,
+            slave_id=self.slave_id,
+            hostname='fake-host',
+            resources=resources
+        )
+        scheduler.resourceOffers(driver, [fake_offer])
+
+        # assertions
+        self.assertTrue(driver.launchTasks.called)
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to