Repository: incubator-airflow Updated Branches: refs/heads/master 707ab6952 -> cdfced324
[AIRFLOW-1631] Fix local executor unbound parallelism Before, if unlimited parallelism was used passing `0` for the parallelism value, the local executor would stall execution since no worker was being created, violating the BaseExecutor contract on the parallelism option. Now, if unbound parallelism is used, processes will be created on demand for each task submitted for execution. Closes #2658 from edgarRd/erod-localexecutor-fix Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cdfced32 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cdfced32 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cdfced32 Branch: refs/heads/master Commit: cdfced3248c7f14b639919c093f4f3042deb754b Parents: 707ab69 Author: Edgar Rodriguez <[email protected]> Authored: Tue Oct 17 11:39:06 2017 -0700 Committer: Dan Davydov <[email protected]> Committed: Tue Oct 17 11:39:22 2017 -0700 ---------------------------------------------------------------------- airflow/executors/local_executor.py | 205 +++++++++++++++++++++++----- tests/executors/test_local_executor.py | 74 ++++++++++ 2 files changed, 244 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdfced32/airflow/executors/local_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index f9eceb3..9b4f8e1 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -11,6 +11,33 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +""" +LocalExecutor runs tasks by spawning processes in a controlled fashion in different +modes. Given that BaseExecutor has the option to receive a `parallelism` parameter to +limit the number of process spawned, when this parameter is `0` the number of processes +that LocalExecutor can spawn is unlimited. + +The following strategies are implemented: +1. Unlimited Parallelism (self.parallelism == 0): In this strategy, LocalExecutor will +spawn a process every time `execute_async` is called, that is, every task submitted to the +LocalExecutor will be executed in its own process. Once the task is executed and the +result stored in the `result_queue`, the process terminates. There is no need for a +`task_queue` in this approach, since as soon as a task is received a new process will be +allocated to the task. Processes used in this strategy are of class LocalWorker. + +2. Limited Parallelism (self.parallelism > 0): In this strategy, the LocalExecutor spawns +the number of processes equal to the value of `self.parallelism` at `start` time, +using a `task_queue` to coordinate the ingestion of tasks and the work distribution among +the workers, which will take a task as soon as they are ready. During the lifecycle of +the LocalExecutor, the worker processes are running waiting for tasks, once the +LocalExecutor receives the call to shutdown the executor a poison token is sent to the +workers to terminate them. Processes used in this strategy are of class QueuedLocalWorker. + +Arguably, `SequentialExecutor` could be thought as a LocalExecutor with limited +parallelism of just 1 worker, i.e. `self.parallelism = 1`. +This option could lead to the unification of the executor implementations, running +locally, into just one `LocalExecutor` with multiple modes. +""" import multiprocessing import subprocess @@ -18,20 +45,63 @@ import time from builtins import range -from airflow import configuration from airflow.executors.base_executor import BaseExecutor from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State -PARALLELISM = configuration.get('core', 'PARALLELISM') - class LocalWorker(multiprocessing.Process, LoggingMixin): + + """LocalWorker Process implementation to run airflow commands. Executes the given + command and puts the result into a result queue when done, terminating execution.""" + + def __init__(self, result_queue): + """ + :param result_queue: the queue to store result states tuples (key, State) + :type result_queue: multiprocessing.Queue + """ + super(LocalWorker, self).__init__() + self.daemon = True + self.result_queue = result_queue + self.key = None + self.command = None + + def execute_work(self, key, command): + """ + Executes command received and stores result state in queue. + :param key: the key to identify the TI + :type key: Tuple(dag_id, task_id, execution_date) + :param command: the command to execute + :type command: string + """ + if key is None: + return + self.log.info("%s running %s", self.__class__.__name__, command) + command = "exec bash -c '{0}'".format(command) + try: + subprocess.check_call(command, shell=True) + state = State.SUCCESS + except subprocess.CalledProcessError as e: + state = State.FAILED + self.log.error("Failed to execute task %s.", str(e)) + # TODO: Why is this commented out? + # raise e + self.result_queue.put((key, state)) + + def run(self): + self.execute_work(self.key, self.command) + time.sleep(1) + + +class QueuedLocalWorker(LocalWorker): + + """LocalWorker implementation that is waiting for tasks from a queue and will + continue executing commands as they become available in the queue. It will terminate + execution once the poison token is found.""" + def __init__(self, task_queue, result_queue): - multiprocessing.Process.__init__(self) + super(QueuedLocalWorker, self).__init__(result_queue=result_queue) self.task_queue = task_queue - self.result_queue = result_queue - self.daemon = True def run(self): while True: @@ -40,17 +110,7 @@ class LocalWorker(multiprocessing.Process, LoggingMixin): # Received poison pill, no more tasks to run self.task_queue.task_done() break - self.log.info("%s running %s", self.__class__.__name__, command) - command = "exec bash -c '{0}'".format(command) - try: - subprocess.check_call(command, shell=True) - state = State.SUCCESS - except subprocess.CalledProcessError as e: - state = State.FAILED - self.log.error("Failed to execute task %s.", str(e)) - # TODO: Why is this commented out? - # raise e - self.result_queue.put((key, state)) + self.execute_work(key, command) self.task_queue.task_done() time.sleep(1) @@ -62,30 +122,105 @@ class LocalExecutor(BaseExecutor): of tasks. """ + class _UnlimitedParallelism(object): + """Implements LocalExecutor with unlimited parallelism, starting one process + per each command to execute.""" + + def __init__(self, executor): + """ + :param executor: the executor instance to implement. + :type executor: LocalExecutor + """ + self.executor = executor + + def start(self): + self.executor.workers_used = 0 + self.executor.workers_active = 0 + + def execute_async(self, key, command): + """ + :param key: the key to identify the TI + :type key: Tuple(dag_id, task_id, execution_date) + :param command: the command to execute + :type command: string + """ + local_worker = LocalWorker(self.executor.result_queue) + local_worker.key = key + local_worker.command = command + self.executor.workers_used += 1 + self.executor.workers_active += 1 + local_worker.start() + + def sync(self): + while not self.executor.result_queue.empty(): + results = self.executor.result_queue.get() + self.executor.change_state(*results) + self.executor.workers_active -= 1 + + def end(self): + while self.executor.workers_active > 0: + self.executor.sync() + time.sleep(1) + + class _LimitedParallelism(object): + """Implements LocalExecutor with limited parallelism using a task queue to + coordinate work distribution.""" + + def __init__(self, executor): + self.executor = executor + + def start(self): + self.executor.queue = multiprocessing.JoinableQueue() + + self.executor.workers = [ + QueuedLocalWorker(self.executor.queue, self.executor.result_queue) + for _ in range(self.executor.parallelism) + ] + + self.executor.workers_used = len(self.executor.workers) + + for w in self.executor.workers: + w.start() + + def execute_async(self, key, command): + """ + :param key: the key to identify the TI + :type key: Tuple(dag_id, task_id, execution_date) + :param command: the command to execute + :type command: string + """ + self.executor.queue.put((key, command)) + + def sync(self): + while not self.executor.result_queue.empty(): + results = self.executor.result_queue.get() + self.executor.change_state(*results) + + def end(self): + # Sending poison pill to all worker + for _ in self.executor.workers: + self.executor.queue.put((None, None)) + + # Wait for commands to finish + self.executor.queue.join() + self.executor.sync() + def start(self): - self.queue = multiprocessing.JoinableQueue() self.result_queue = multiprocessing.Queue() - self.workers = [ - LocalWorker(self.queue, self.result_queue) - for _ in range(self.parallelism) - ] + self.queue = None + self.workers = [] + self.workers_used = 0 + self.workers_active = 0 + self.impl = (LocalExecutor._UnlimitedParallelism(self) if self.parallelism == 0 + else LocalExecutor._LimitedParallelism(self)) - for w in self.workers: - w.start() + self.impl.start() def execute_async(self, key, command, queue=None): - self.queue.put((key, command)) + self.impl.execute_async(key=key, command=command) def sync(self): - while not self.result_queue.empty(): - results = self.result_queue.get() - self.change_state(*results) + self.impl.sync() def end(self): - # Sending poison pill to all worker - for _ in self.workers: - self.queue.put((None, None)) - - # Wait for commands to finish - self.queue.join() - self.sync() + self.impl.end() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cdfced32/tests/executors/test_local_executor.py ---------------------------------------------------------------------- diff --git a/tests/executors/test_local_executor.py b/tests/executors/test_local_executor.py new file mode 100644 index 0000000..bca6354 --- /dev/null +++ b/tests/executors/test_local_executor.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from airflow.executors.local_executor import LocalExecutor +from airflow.utils.state import State +from airflow.utils.timeout import timeout + + +class LocalExecutorTest(unittest.TestCase): + + TEST_SUCCESS_COMMANDS = 5 + + def execution_parallelism(self, parallelism=0): + executor = LocalExecutor(parallelism=parallelism) + executor.start() + + success_key = 'success {}' + success_command = 'echo {}' + fail_command = 'exit 1' + + for i in range(self.TEST_SUCCESS_COMMANDS): + key, command = success_key.format(i), success_command.format(i) + executor.execute_async(key=key, command=command) + executor.running[key] = True + + # errors are propagated for some reason + try: + executor.execute_async(key='fail', command=fail_command) + except: + pass + + executor.running['fail'] = True + + if parallelism == 0: + with timeout(seconds=5): + executor.end() + else: + executor.end() + + for i in range(self.TEST_SUCCESS_COMMANDS): + key = success_key.format(i) + self.assertTrue(executor.event_buffer[key], State.SUCCESS) + self.assertTrue(executor.event_buffer['fail'], State.FAILED) + + for i in range(self.TEST_SUCCESS_COMMANDS): + self.assertNotIn(success_key.format(i), executor.running) + self.assertNotIn('fail', executor.running) + + expected = self.TEST_SUCCESS_COMMANDS + 1 if parallelism == 0 else parallelism + self.assertEqual(executor.workers_used, expected) + + def test_execution_unlimited_parallelism(self): + self.execution_parallelism(parallelism=0) + + def test_execution_limited_parallelism(self): + test_parallelism = 2 + self.execution_parallelism(parallelism=test_parallelism) + + +if __name__ == '__main__': + unittest.main()
