Repository: incubator-airflow
Updated Branches:
  refs/heads/master 97383f76d -> aa737a582


[AIRFLOW-966] Make celery broker_transport_options configurable

Required for changing visibility timeout and other
options required
for Redis/SQS.

Closes #2842 from bolkedebruin/AIRFLOW-966


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/aa737a58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/aa737a58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/aa737a58

Branch: refs/heads/master
Commit: aa737a582c687e7105ef934ffc4da3dc78438235
Parents: 97383f7
Author: Bolke de Bruin <[email protected]>
Authored: Tue Dec 5 10:13:05 2017 +0100
Committer: Fokko Driesprong <[email protected]>
Committed: Tue Dec 5 10:13:05 2017 +0100

----------------------------------------------------------------------
 airflow/config_templates/default_airflow.cfg |  7 +++++++
 airflow/config_templates/default_celery.py   |  6 +++++-
 airflow/configuration.py                     | 10 ++++++++++
 docs/configuration.rst                       |  5 +++++
 4 files changed, 27 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/aa737a58/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 32af0a3..1dfb079 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -310,6 +310,13 @@ default_queue = default
 # Import path for celery configuration options
 celery_config_options = 
airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
 
+[celery_broker_transport_options]
+# The visibility timeout defines the number of seconds to wait for the worker
+# to acknowledge the task before the message is redelivered to another worker.
+# Make sure to increase the visibility timeout to match the time of the longest
+# ETA you’re planning to use. Especially important in case of using Redis or 
SQS
+visibility_timeout = 21600
+
 [dask]
 # This section only applies if you are using the DaskExecutor in
 # [core] section above

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/aa737a58/airflow/config_templates/default_celery.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_celery.py 
b/airflow/config_templates/default_celery.py
index 48611cb..390e3ef 100644
--- a/airflow/config_templates/default_celery.py
+++ b/airflow/config_templates/default_celery.py
@@ -19,6 +19,10 @@ from airflow import configuration
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 
+broker_transport_options = 
configuration.getsection('celery_broker_transport_options')
+if broker_transport_options is None:
+    broker_transport_options = {'visibility_timeout': 21600}
+
 DEFAULT_CELERY_CONFIG = {
     'accept_content': ['json', 'pickle'],
     'event_serializer': 'json',
@@ -28,7 +32,7 @@ DEFAULT_CELERY_CONFIG = {
     'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'),
     'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'),
     'broker_url': configuration.get('celery', 'BROKER_URL'),
-    'broker_transport_options': {'visibility_timeout': 21600},
+    'broker_transport_options': {'visibility_timeout': 
broker_transport_options},
     'result_backend': configuration.get('celery', 'CELERY_RESULT_BACKEND'),
     'worker_concurrency': configuration.getint('celery', 
'CELERYD_CONCURRENCY'),
 }

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/aa737a58/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index d61afb7..84913ff 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -236,6 +236,12 @@ class AirflowConfigParser(ConfigParser):
         ConfigParser.read(self, filenames)
         self._validate()
 
+    def getsection(self, section):
+        if section in self._sections:
+            return self._sections[section]
+
+        return None
+
     def as_dict(self, display_source=False, display_sensitive=False):
         """
         Returns the current configuration as an OrderedDict of OrderedDicts.
@@ -423,6 +429,10 @@ def getint(section, key):
     return conf.getint(section, key)
 
 
+def getsection(section):
+    return conf.getsection(section)
+
+
 def has_option(section, key):
     return conf.has_option(section, key)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/aa737a58/docs/configuration.rst
----------------------------------------------------------------------
diff --git a/docs/configuration.rst b/docs/configuration.rst
index e68a341..35616f2 100644
--- a/docs/configuration.rst
+++ b/docs/configuration.rst
@@ -155,6 +155,11 @@ Note that you can also run "Celery Flower", a web UI built 
on top of Celery,
 to monitor your workers. You can use the shortcut command ``airflow flower``
 to start a Flower web server.
 
+Some caveats:
+
+- Make sure to use a database backed result backend
+- Make sure to set a visibility timeout in [celery_broker_transport_options] 
that exceeds the ETA of your longest running task
+- Tasks can and consume resources, make sure your worker as enough resources 
to run `celeryd_concurrency` tasks
 
 Scaling Out with Dask
 '''''''''''''''''''''

Reply via email to