To use with the SQLA backend to celery you need to override the options Airflow
passes to Celery. Those come from
https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py
Since you don't want most/all of those options (and there is no way in the
config file to _remove_ a setting) you will have to point airflow to a
different file for the celery config:
This line in the config is what you will need to change:
# Import path for celery configuration options
celery_config_options =
airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
If you create something like config/celery_config.py containing:
CELERY_CONFIG = {
# Just the options you want to set
}
(config/ should exist along side your dags/ folder, and I think it should be
added to the python path already). You can then set this in the config:
celery_config_options = celery_config.CELERY_CONFIG
That should give you complete control
> On 21 May 2018, at 09:50, Craig Rodrigues <[email protected]> wrote:
>
> Hi,
>
> I used this requirements.txt file to install airflow from the v1-10-test
> branch:
>
> git+https://github.com/celery/celery@master#egg=celery
> git+https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]
> kombu>=4.1.0
>
>
> In my airflow.cfg, I have:
>
> [celery]
> executor = CeleryExecutor
>
> executor = CeleryExec
> broker_url = sqla+mysql://airflow:blah@localhost:3306/mydb
>
> [celery_broker_transport_options]
> #
> #
>
> However, if I manually run this code inside the webserver, I see:
>
> python -c "from airflow import configuration; c =
> configuration.conf.getsection('celery_broker_transport_options'); print(c)"
> OrderedDict([(u'visibility_timeout', 21600), (u'ssl_active', False),
> (u'ssl_key', u''), (u'ssl_cert', u''), (u'ssl_cacert', u'')])
>
> My worker crashes with this error:
>
>
> [2018-05-21 07:46:12,406] {configuration.py:212} WARNING - section/key
> [celery/ssl_active] not found in config
> [2018-05-21 07:46:12,407] {default_celery.py:51} WARNING - Celery Executor
> will run without SSL
> [2018-05-21 07:46:12,411] {__init__.py:48} INFO - Using executor
> CeleryExecutor
> [2018-05-21 07:46:13,086: CRITICAL/MainProcess] Unrecoverable error:
> TypeError(u"Invalid argument(s)
> 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to
> create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.
> Please check that the keyword arguments are appropriate for this combination
> of components.",)
> Traceback (most recent call last):
> File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 205,
> in start
> self.blueprint.start(self)
> File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in
> start
> step.start(parent)
> File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 369, in
> start
> return self.obj.start()
> File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py",
> line 322, in start
> blueprint.start(self)
> File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in
> start
> step.start(parent)
> File "/usr/lib/python2.7/site-packages/celery/worker/consumer/tasks.py",
> line 41, in start
> c.connection, on_decode_error=c.on_decode_error,
> File "/usr/lib/python2.7/site-packages/celery/app/amqp.py", line 297, in
> TaskConsumer
> **kw
> File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 386, in
> __init__
> self.revive(self.channel)
> File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 408, in
> revive
> self.declare()
> File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 421, in
> declare
> queue.declare()
> File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 605, in declare
> self._create_queue(nowait=nowait, channel=channel)
> File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 614, in
> _create_queue
> self.queue_declare(nowait=nowait, passive=False, channel=channel)
> File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 649, in
> queue_declare
> nowait=nowait,
> File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py",
> line 531, in queue_declare
> self._new_queue(queue, **kwargs)
> File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 82, in _new_queue
> self._get_or_create(queue)
> File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 70, in _get_or_create
> obj = self.session.query(self.queue_cls) \
> File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 65, in session
> _, Session = self._open()
> File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 56, in _open
> engine = self._engine_from_config()
> File
> "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py",
> line 51, in _engine_from_config
> return create_engine(conninfo.hostname, **transport_options)
> File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/__init__.py",
> line 391, in create_engine
> return strategy.create(*args, **kwargs)
> File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py",
> line 160, in create
> engineclass.__name__))
> TypeError: Invalid argument(s)
> 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to
> create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.
> Please check that the keyword arguments are appropriate for this combination
> of components.
>
> -------------- celery@qa1 v4.2.0rc3 (windowlicker)
> ---- **** -----
> --- * *** * -- Linux-4.13.0-16-generic-x86_64-with-centos-7.3.1611-Core
> 2018-05-21 07:46:12
> -- * - **** ---
> - ** ---------- [config]
> - ** ---------- .> app: airflow.executors.celery_executor:0x4766d50
> - ** ---------- .> transport: sqla+mysql://airflow:blah@localhost:3306/mydb
> - ** ---------- .> results: mysql://airflow:**@localhost:3306/airflow
> - *** --- * --- .> concurrency: 16 (prefork)
> -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this
> worker)
> --- ***** -----
> -------------- [queues]
> .> airflow_celery exchange=airflow_celery(direct)
> key=airflow_celery
>
>
>
> What is the correct way to override the celery_broker_transport_options?
> I thought that having an empty section in airflow.cfg would be enough?
>
> I thought that this was fixed with:
> https://github.com/apache/incubator-airflow/pull/2842
>
>
> I cannot pass visibilty_timeout or ssl_key to a mysql backend.
> --
> Craig
>
>
>
>
>
>
>