Hi Nadeem,

We are using Celery with RabbitMQ.  The upgrade to Celery 4.0 last week did 
cause RabbitMQ trouble. The web interface (for RabbitMQ) started giving us 
errors pointing to the use of non-utf8 characters in message queues (or queue 
names, it wasn't clear), which RabbitMQ does not support.


We've also solved this by moving back to version 3.1.15 of Celery. We didn't 
investigate further, since this solved our problem, but I suspect that there 
are a fair few people who've been tripped up by this, or will be.


Regards,

Robin Miller
OLIVER WYMAN
[email protected]<mailto:[email protected]>
www.oliverwyman.com<http://www.oliverwyman.com/>

________________________________
From: Nadeem Ahmed Nazeer <[email protected]>
Sent: 12 November 2016 08:21:02
To: [email protected]
Cc: Nadeem Ahmed
Subject: Issue with latest versions of Celery & Kombu

Hi Airflowers,

We install airflow from our chef scripts and are currently using Airflow
1.7.1.3. We re-base airflow once in a while to reduce the amount of
backfills, where we burn down everything and bring it up. It worked fine
every time until today.

While doing the re-base now, we faced an issue with celery which was not
able to talk to the backend database using sqlalchemy. There were no
changes made to our setup scripts in chef.

2016-11-12 07:34:14,386 ERROR:airflow.jobs.SchedulerJob[MainThread] u'No
such transport: sqla'
Traceback (most recent call last):
 File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 755,
in _execute
   executor.heartbeat()
 File 
"/usr/local/lib/python2.7/dist-packages/airflow/executors/base_executor.py",
line 99, in heartbeat
   self.execute_async(key, command=command, queue=queue)
 File 
"/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py",
line 66, in execute_async
   args=[command], queue=queue)
 File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line
536, in apply_async
   **options
 File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line
714, in send_task
   with self.producer_or_acquire(producer) as P:
 File "/usr/local/lib/python2.7/dist-packages/celery/utils/objects.py",
line 85, in __enter__
   *self.fb_args, **self.fb_kwargs
 File "/usr/local/lib/python2.7/dist-packages/kombu/resource.py", line 83,
in acquire
   R = self.prepare(R)
 File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 62, in
prepare
   p = p()
 File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py",
line 203, in __call__
   return self.evaluate()
 File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py",
line 206, in evaluate
   return self._fun(*self._args, **self._kwargs)
 File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 42, in
create_producer
   conn = self._acquire_connection()
 File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 39, in
_acquire_connection
   return self.connections.acquire(block=True)
 File "/usr/local/lib/python2.7/dist-packages/kombu/resource.py", line 83,
in acquire
   R = self.prepare(R)
 File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
936, in prepare
   resource = resource()
 File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py",
line 203, in __call__
   return self.evaluate()
 File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py",
line 206, in evaluate
   return self._fun(*self._args, **self._kwargs)
 File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
908, in new
   return self.connection.clone()
 File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
587, in clone
   return self.__class__(**dict(self._info(resolve=False), **kwargs))
 File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
597, in _info
   D = self.transport.default_connection_params
 File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
832, in transport
   self._transport = self.create_transport()
 File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
576, in create_transport
   return self.get_transport_cls()(client=self)
 File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line
582, in get_transport_cls
   transport_cls = get_transport_cls(transport_cls)
 File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py",
line 81, in get_transport_cls
   _transport_cache[transport] = resolve_transport(transport)
 File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py",
line 62, in resolve_transport
   raise KeyError('No such transport: {0}'.format(transport))
KeyError: u'No such transport: sqla'
2016-11-12 07:34:14,399 ERROR:airflow.jobs.SchedulerJob[MainThread]
Tachycardia!

The pip install for airflow[celery], installs celery version 4.0.0 and
kombu 4.0.0. Upon checking further, sqlalchemy has been removed from being
supported as a broker in latest version of kombu.
https://github.com/celery/kombu/commit/1cd4e07f9ebb2fdbde0f86054e963f
6bbd17e698#diff-7b8e685a0148804c7b353dc3f138d189

We had to manually apply a hotfix to downgrade the celery version to 3.1.15
which uses kombu version 3.0.37 that has sqla defined in TRANSPORT_ALIASES
dict of __init__.py (http://docs.celeryproject.
org/projects/kombu/en/latest/_modules/kombu/transport.html) whereas the
__init__.py for kombu 4.0.0 is missing it.

Is this a known issue now or has something changed w.r.t celery that we
have to include?

Please help. I would like your advice before I make any changes to the chef
scripts for airflow setup.

Thanks,
Nadeem

________________________________
This e-mail and any attachments may be confidential or legally privileged. If 
you received this message in error or are not the intended recipient, you 
should destroy the e-mail message and any attachments or copies, and you are 
prohibited from retaining, distributing, disclosing or using any information 
contained herein. Please inform us of the erroneous delivery by return e-mail. 
Thank you for your cooperation.

Reply via email to