Repository: incubator-airflow Updated Branches: refs/heads/master 6a9dc8ad1 -> a87ced4c1
[AIRFLOW-1650] Fix custom celery config loading Celery config loading was broken as it was just passing a string. This fixes it by loading it as a module with an attribute. Inspired by Django's module loading. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3aa05cb2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3aa05cb2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3aa05cb2 Branch: refs/heads/master Commit: 3aa05cb227a12dd8ec3f375b8444adad67a2718d Parents: dca9ab3 Author: Bolke de Bruin <bo...@xs4all.nl> Authored: Wed Sep 27 20:03:56 2017 +0200 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Wed Sep 27 20:03:56 2017 +0200 ---------------------------------------------------------------------- airflow/executors/celery_executor.py | 6 ++++-- airflow/utils/module_loading.py | 35 +++++++++++++++++++++++++++++++ tests/utils/test_module_loading.py | 30 ++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3aa05cb2/airflow/executors/celery_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 7e363db..d3809b3 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -23,7 +23,7 @@ from airflow.exceptions import AirflowException from airflow.executors.base_executor import BaseExecutor from airflow import configuration from airflow.utils.log.logging_mixin import LoggingMixin - +from airflow.utils.module_loading import import_string PARALLELISM = configuration.get('core', 'PARALLELISM') @@ -33,7 +33,9 @@ airflow worker ''' if configuration.has_option('celery', 'celery_config_options'): - celery_configuration = configuration.get('celery', 'celery_config_options') + celery_configuration = import_string( + configuration.get('celery', 'celery_config_options') + ) else: celery_configuration = DEFAULT_CELERY_CONFIG http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3aa05cb2/airflow/utils/module_loading.py ---------------------------------------------------------------------- diff --git a/airflow/utils/module_loading.py b/airflow/utils/module_loading.py new file mode 100644 index 0000000..f2138ea --- /dev/null +++ b/airflow/utils/module_loading.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from importlib import import_module + + +def import_string(dotted_path): + """ + Import a dotted module path and return the attribute/class designated by the + last name in the path. Raise ImportError if the import failed. + """ + try: + module_path, class_name = dotted_path.rsplit('.', 1) + except ValueError: + raise ImportError("{} doesn't look like a module path".format(dotted_path)) + + module = import_module(module_path) + + try: + return getattr(module, class_name) + except AttributeError as err: + raise ImportError('Module "{}" does not define a "{}" attribute/class'.format( + module_path, class_name) + ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3aa05cb2/tests/utils/test_module_loading.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_module_loading.py b/tests/utils/test_module_loading.py new file mode 100644 index 0000000..2eafb1f --- /dev/null +++ b/tests/utils/test_module_loading.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from airflow.utils.module_loading import import_string + + +class ModuleImportTestCase(unittest.TestCase): + def test_import_string(self): + cls = import_string('airflow.utils.module_loading.import_string') + self.assertEqual(cls, import_string) + + # Test exceptions raised + with self.assertRaises(ImportError): + import_string('no_dots_in_path') + msg = 'Module "airflow.utils" does not define a "nonexistent" attribute' + with self.assertRaisesRegexp(ImportError, msg): + import_string('airflow.utils.nonexistent')