Hi Nadeem,
We are using Celery with RabbitMQ. The upgrade to Celery 4.0 last week did cause RabbitMQ trouble. The web interface (for RabbitMQ) started giving us errors pointing to the use of non-utf8 characters in message queues (or queue names, it wasn't clear), which RabbitMQ does not support. We've also solved this by moving back to version 3.1.15 of Celery. We didn't investigate further, since this solved our problem, but I suspect that there are a fair few people who've been tripped up by this, or will be. Regards, Robin Miller OLIVER WYMAN [email protected]<mailto:[email protected]> www.oliverwyman.com<http://www.oliverwyman.com/> ________________________________ From: Nadeem Ahmed Nazeer <[email protected]> Sent: 12 November 2016 08:21:02 To: [email protected] Cc: Nadeem Ahmed Subject: Issue with latest versions of Celery & Kombu Hi Airflowers, We install airflow from our chef scripts and are currently using Airflow 1.7.1.3. We re-base airflow once in a while to reduce the amount of backfills, where we burn down everything and bring it up. It worked fine every time until today. While doing the re-base now, we faced an issue with celery which was not able to talk to the backend database using sqlalchemy. There were no changes made to our setup scripts in chef. 2016-11-12 07:34:14,386 ERROR:airflow.jobs.SchedulerJob[MainThread] u'No such transport: sqla' Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 755, in _execute executor.heartbeat() File "/usr/local/lib/python2.7/dist-packages/airflow/executors/base_executor.py", line 99, in heartbeat self.execute_async(key, command=command, queue=queue) File "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py", line 66, in execute_async args=[command], queue=queue) File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line 536, in apply_async **options File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 714, in send_task with self.producer_or_acquire(producer) as P: File "/usr/local/lib/python2.7/dist-packages/celery/utils/objects.py", line 85, in __enter__ *self.fb_args, **self.fb_kwargs File "/usr/local/lib/python2.7/dist-packages/kombu/resource.py", line 83, in acquire R = self.prepare(R) File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 62, in prepare p = p() File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py", line 203, in __call__ return self.evaluate() File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py", line 206, in evaluate return self._fun(*self._args, **self._kwargs) File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 42, in create_producer conn = self._acquire_connection() File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 39, in _acquire_connection return self.connections.acquire(block=True) File "/usr/local/lib/python2.7/dist-packages/kombu/resource.py", line 83, in acquire R = self.prepare(R) File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 936, in prepare resource = resource() File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py", line 203, in __call__ return self.evaluate() File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py", line 206, in evaluate return self._fun(*self._args, **self._kwargs) File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 908, in new return self.connection.clone() File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 587, in clone return self.__class__(**dict(self._info(resolve=False), **kwargs)) File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 597, in _info D = self.transport.default_connection_params File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 832, in transport self._transport = self.create_transport() File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 576, in create_transport return self.get_transport_cls()(client=self) File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 582, in get_transport_cls transport_cls = get_transport_cls(transport_cls) File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py", line 81, in get_transport_cls _transport_cache[transport] = resolve_transport(transport) File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py", line 62, in resolve_transport raise KeyError('No such transport: {0}'.format(transport)) KeyError: u'No such transport: sqla' 2016-11-12 07:34:14,399 ERROR:airflow.jobs.SchedulerJob[MainThread] Tachycardia! The pip install for airflow[celery], installs celery version 4.0.0 and kombu 4.0.0. Upon checking further, sqlalchemy has been removed from being supported as a broker in latest version of kombu. https://github.com/celery/kombu/commit/1cd4e07f9ebb2fdbde0f86054e963f 6bbd17e698#diff-7b8e685a0148804c7b353dc3f138d189 We had to manually apply a hotfix to downgrade the celery version to 3.1.15 which uses kombu version 3.0.37 that has sqla defined in TRANSPORT_ALIASES dict of __init__.py (http://docs.celeryproject. org/projects/kombu/en/latest/_modules/kombu/transport.html) whereas the __init__.py for kombu 4.0.0 is missing it. Is this a known issue now or has something changed w.r.t celery that we have to include? Please help. I would like your advice before I make any changes to the chef scripts for airflow setup. Thanks, Nadeem ________________________________ This e-mail and any attachments may be confidential or legally privileged. If you received this message in error or are not the intended recipient, you should destroy the e-mail message and any attachments or copies, and you are prohibited from retaining, distributing, disclosing or using any information contained herein. Please inform us of the erroneous delivery by return e-mail. Thank you for your cooperation.
