I don't run celery. However, I would suggest that any solutions that you come up with be followed up by a PR with version updates to setup.py.
On Mon, Nov 14, 2016 at 11:56 AM, Miller, Robin < [email protected]> wrote: > Hi Nadeem, > > > We are using Celery with RabbitMQ. The upgrade to Celery 4.0 last week > did cause RabbitMQ trouble. The web interface (for RabbitMQ) started giving > us errors pointing to the use of non-utf8 characters in message queues (or > queue names, it wasn't clear), which RabbitMQ does not support. > > > We've also solved this by moving back to version 3.1.15 of Celery. We > didn't investigate further, since this solved our problem, but I suspect > that there are a fair few people who've been tripped up by this, or will be. > > > Regards, > > Robin Miller > OLIVER WYMAN > [email protected]<mailto:robin. > [email protected]> > www.oliverwyman.com<http://www.oliverwyman.com/> > > ________________________________ > From: Nadeem Ahmed Nazeer <[email protected]> > Sent: 12 November 2016 08:21:02 > To: [email protected] > Cc: Nadeem Ahmed > Subject: Issue with latest versions of Celery & Kombu > > Hi Airflowers, > > We install airflow from our chef scripts and are currently using Airflow > 1.7.1.3. We re-base airflow once in a while to reduce the amount of > backfills, where we burn down everything and bring it up. It worked fine > every time until today. > > While doing the re-base now, we faced an issue with celery which was not > able to talk to the backend database using sqlalchemy. There were no > changes made to our setup scripts in chef. > > 2016-11-12 07:34:14,386 ERROR:airflow.jobs.SchedulerJob[MainThread] u'No > such transport: sqla' > Traceback (most recent call last): > File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 755, > in _execute > executor.heartbeat() > File "/usr/local/lib/python2.7/dist-packages/airflow/ > executors/base_executor.py", > line 99, in heartbeat > self.execute_async(key, command=command, queue=queue) > File "/usr/local/lib/python2.7/dist-packages/airflow/ > executors/celery_executor.py", > line 66, in execute_async > args=[command], queue=queue) > File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line > 536, in apply_async > **options > File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line > 714, in send_task > with self.producer_or_acquire(producer) as P: > File "/usr/local/lib/python2.7/dist-packages/celery/utils/objects.py", > line 85, in __enter__ > *self.fb_args, **self.fb_kwargs > File "/usr/local/lib/python2.7/dist-packages/kombu/resource.py", line 83, > in acquire > R = self.prepare(R) > File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 62, in > prepare > p = p() > File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py", > line 203, in __call__ > return self.evaluate() > File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py", > line 206, in evaluate > return self._fun(*self._args, **self._kwargs) > File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 42, in > create_producer > conn = self._acquire_connection() > File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 39, in > _acquire_connection > return self.connections.acquire(block=True) > File "/usr/local/lib/python2.7/dist-packages/kombu/resource.py", line 83, > in acquire > R = self.prepare(R) > File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line > 936, in prepare > resource = resource() > File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py", > line 203, in __call__ > return self.evaluate() > File "/usr/local/lib/python2.7/dist-packages/kombu/utils/functional.py", > line 206, in evaluate > return self._fun(*self._args, **self._kwargs) > File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line > 908, in new > return self.connection.clone() > File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line > 587, in clone > return self.__class__(**dict(self._info(resolve=False), **kwargs)) > File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line > 597, in _info > D = self.transport.default_connection_params > File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line > 832, in transport > self._transport = self.create_transport() > File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line > 576, in create_transport > return self.get_transport_cls()(client=self) > File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line > 582, in get_transport_cls > transport_cls = get_transport_cls(transport_cls) > File "/usr/local/lib/python2.7/dist-packages/kombu/transport/ > __init__.py", > line 81, in get_transport_cls > _transport_cache[transport] = resolve_transport(transport) > File "/usr/local/lib/python2.7/dist-packages/kombu/transport/ > __init__.py", > line 62, in resolve_transport > raise KeyError('No such transport: {0}'.format(transport)) > KeyError: u'No such transport: sqla' > 2016-11-12 07:34:14,399 ERROR:airflow.jobs.SchedulerJob[MainThread] > Tachycardia! > > The pip install for airflow[celery], installs celery version 4.0.0 and > kombu 4.0.0. Upon checking further, sqlalchemy has been removed from being > supported as a broker in latest version of kombu. > https://github.com/celery/kombu/commit/1cd4e07f9ebb2fdbde0f86054e963f > 6bbd17e698#diff-7b8e685a0148804c7b353dc3f138d189 > > We had to manually apply a hotfix to downgrade the celery version to 3.1.15 > which uses kombu version 3.0.37 that has sqla defined in TRANSPORT_ALIASES > dict of __init__.py (http://docs.celeryproject. > org/projects/kombu/en/latest/_modules/kombu/transport.html) whereas the > __init__.py for kombu 4.0.0 is missing it. > > Is this a known issue now or has something changed w.r.t celery that we > have to include? > > Please help. I would like your advice before I make any changes to the chef > scripts for airflow setup. > > Thanks, > Nadeem > > ________________________________ > This e-mail and any attachments may be confidential or legally privileged. > If you received this message in error or are not the intended recipient, > you should destroy the e-mail message and any attachments or copies, and > you are prohibited from retaining, distributing, disclosing or using any > information contained herein. Please inform us of the erroneous delivery by > return e-mail. Thank you for your cooperation. >
