Repository: incubator-airflow Updated Branches: refs/heads/master 7c3435442 -> 7c1d7db3d
[AIRFLOW-2519] Fix CeleryExecutor with SQLAlchemy When using a CeleryExecutor with SQLAlchemy specified in broker_url, such as: broker_url = sqla+mysql://airflow:airflow@localhos t:3306/airflow do not pass invalid options to the sqlalchemy backend. - In default_airflow.cfg, comment out visibility_timeout from [celery_broker_transport_options]. The user can specify the correct values in this section for the celery broker transport that they choose. visibility_timeout is only valid for Redis and SQS celery brokers. - Move ssl options from [celery_broker_transport_options] where they were wrongly placed, into the [celery] section where they belong. Closes #3417 from rodrigc/AIRFLOW-2519 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7c1d7db3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7c1d7db3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7c1d7db3 Branch: refs/heads/master Commit: 7c1d7db3dba9967f581244ddd721b4477c1577c6 Parents: 7c34354 Author: Craig Rodrigues <[email protected]> Authored: Wed May 30 10:34:41 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Wed May 30 10:34:41 2018 +0200 ---------------------------------------------------------------------- airflow/config_templates/default_airflow.cfg | 23 ++++++++++++++++------- airflow/config_templates/default_celery.py | 14 +++++++++++--- 2 files changed, 27 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c1d7db3/airflow/config_templates/default_airflow.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 60d1156..ebeb2e6 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -365,19 +365,28 @@ default_queue = default # Import path for celery configuration options celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG -[celery_broker_transport_options] -# The visibility timeout defines the number of seconds to wait for the worker -# to acknowledge the task before the message is redelivered to another worker. -# Make sure to increase the visibility timeout to match the time of the longest -# ETA you're planning to use. Especially important in case of using Redis or SQS -visibility_timeout = 21600 - # In case of using SSL ssl_active = False ssl_key = ssl_cert = ssl_cacert = +[celery_broker_transport_options] +# This section is for specifying options which can be passed to the +# underlying celery broker transport. See: +# http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-broker_transport_options + +# The visibility timeout defines the number of seconds to wait for the worker +# to acknowledge the task before the message is redelivered to another worker. +# Make sure to increase the visibility timeout to match the time of the longest +# ETA you're planning to use. +# +# visibility_timeout is only supported for Redis and SQS celery brokers. +# See: +# http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-broker_transport_options +# +#visibility_timeout = 21600 + [dask] # This section only applies if you are using the DaskExecutor in # [core] section above http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c1d7db3/airflow/config_templates/default_celery.py ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py index 9a5bdf8..d44f2b3 100644 --- a/airflow/config_templates/default_celery.py +++ b/airflow/config_templates/default_celery.py @@ -23,13 +23,21 @@ from airflow import configuration from airflow.exceptions import AirflowConfigException, AirflowException from airflow.utils.log.logging_mixin import LoggingMixin + +def _broker_supports_visibility_timeout(url): + return url.startswith("redis://") or url.startswith("sqs://") + + log = LoggingMixin().log +broker_url = configuration.conf.get('celery', 'BROKER_URL') + broker_transport_options = configuration.conf.getsection( 'celery_broker_transport_options' ) -if broker_transport_options is None: - broker_transport_options = {'visibility_timeout': 21600} +if 'visibility_timeout' not in broker_transport_options: + if _broker_supports_visibility_timeout(broker_url): + broker_transport_options = {'visibility_timeout': 21600} DEFAULT_CELERY_CONFIG = { 'accept_content': ['json', 'pickle'], @@ -38,7 +46,7 @@ DEFAULT_CELERY_CONFIG = { 'task_acks_late': True, 'task_default_queue': configuration.conf.get('celery', 'DEFAULT_QUEUE'), 'task_default_exchange': configuration.conf.get('celery', 'DEFAULT_QUEUE'), - 'broker_url': configuration.conf.get('celery', 'BROKER_URL'), + 'broker_url': broker_url, 'broker_transport_options': broker_transport_options, 'result_backend': configuration.conf.get('celery', 'RESULT_BACKEND'), 'worker_concurrency': configuration.conf.getint('celery', 'WORKER_CONCURRENCY'),
