Repository: incubator-airflow Updated Branches: refs/heads/master 97383f76d -> aa737a582
[AIRFLOW-966] Make celery broker_transport_options configurable Required for changing visibility timeout and other options required for Redis/SQS. Closes #2842 from bolkedebruin/AIRFLOW-966 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/aa737a58 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/aa737a58 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/aa737a58 Branch: refs/heads/master Commit: aa737a582c687e7105ef934ffc4da3dc78438235 Parents: 97383f7 Author: Bolke de Bruin <[email protected]> Authored: Tue Dec 5 10:13:05 2017 +0100 Committer: Fokko Driesprong <[email protected]> Committed: Tue Dec 5 10:13:05 2017 +0100 ---------------------------------------------------------------------- airflow/config_templates/default_airflow.cfg | 7 +++++++ airflow/config_templates/default_celery.py | 6 +++++- airflow/configuration.py | 10 ++++++++++ docs/configuration.rst | 5 +++++ 4 files changed, 27 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/aa737a58/airflow/config_templates/default_airflow.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 32af0a3..1dfb079 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -310,6 +310,13 @@ default_queue = default # Import path for celery configuration options celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG +[celery_broker_transport_options] +# The visibility timeout defines the number of seconds to wait for the worker +# to acknowledge the task before the message is redelivered to another worker. +# Make sure to increase the visibility timeout to match the time of the longest +# ETA youâre planning to use. Especially important in case of using Redis or SQS +visibility_timeout = 21600 + [dask] # This section only applies if you are using the DaskExecutor in # [core] section above http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/aa737a58/airflow/config_templates/default_celery.py ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py index 48611cb..390e3ef 100644 --- a/airflow/config_templates/default_celery.py +++ b/airflow/config_templates/default_celery.py @@ -19,6 +19,10 @@ from airflow import configuration from airflow.utils.log.logging_mixin import LoggingMixin +broker_transport_options = configuration.getsection('celery_broker_transport_options') +if broker_transport_options is None: + broker_transport_options = {'visibility_timeout': 21600} + DEFAULT_CELERY_CONFIG = { 'accept_content': ['json', 'pickle'], 'event_serializer': 'json', @@ -28,7 +32,7 @@ DEFAULT_CELERY_CONFIG = { 'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'), 'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'), 'broker_url': configuration.get('celery', 'BROKER_URL'), - 'broker_transport_options': {'visibility_timeout': 21600}, + 'broker_transport_options': {'visibility_timeout': broker_transport_options}, 'result_backend': configuration.get('celery', 'CELERY_RESULT_BACKEND'), 'worker_concurrency': configuration.getint('celery', 'CELERYD_CONCURRENCY'), } http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/aa737a58/airflow/configuration.py ---------------------------------------------------------------------- diff --git a/airflow/configuration.py b/airflow/configuration.py index d61afb7..84913ff 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -236,6 +236,12 @@ class AirflowConfigParser(ConfigParser): ConfigParser.read(self, filenames) self._validate() + def getsection(self, section): + if section in self._sections: + return self._sections[section] + + return None + def as_dict(self, display_source=False, display_sensitive=False): """ Returns the current configuration as an OrderedDict of OrderedDicts. @@ -423,6 +429,10 @@ def getint(section, key): return conf.getint(section, key) +def getsection(section): + return conf.getsection(section) + + def has_option(section, key): return conf.has_option(section, key) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/aa737a58/docs/configuration.rst ---------------------------------------------------------------------- diff --git a/docs/configuration.rst b/docs/configuration.rst index e68a341..35616f2 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -155,6 +155,11 @@ Note that you can also run "Celery Flower", a web UI built on top of Celery, to monitor your workers. You can use the shortcut command ``airflow flower`` to start a Flower web server. +Some caveats: + +- Make sure to use a database backed result backend +- Make sure to set a visibility timeout in [celery_broker_transport_options] that exceeds the ETA of your longest running task +- Tasks can and consume resources, make sure your worker as enough resources to run `celeryd_concurrency` tasks Scaling Out with Dask '''''''''''''''''''''
