Perhaps I should have known, but didn't: Celery 4.x does not support asyncio (celery#3883 <https://github.com/celery/celery/issues/3883>, celery#3884 <https://github.com/celery/celery/issues/3884>). Luckily, this was explained over at channels_rabbitmq#25 <https://github.com/CJWorkbench/channels_rabbitmq/issues/25>. From there, a subprocess based fix was easy...once I became aware of channels_rabbitmq#13 <https://github.com/CJWorkbench/channels_rabbitmq/issues/13>. TL;DR:
- You cannot naively call into Channels (or any other) async code from inside a Celery task (until Celery 5). - You cannot naively call async_to_sync() et. al. Some awareness of the scope of the event loop(s) that underpin the system is required, or a belt and braces type workaround. Thanks, Shaheed P.S. I went down this rabbit hole partly because, at the time of my original posting, I had not noticed that the exception noted below was still happening AND when I moved off my development system onto an AWS deployment setup, everything stopped working. On Saturday, 22 August 2020 at 14:52:33 UTC+1 [email protected] wrote: > I have a question about running Channels and Django code under Celery. > Originally, I had some code that used neither Channels nor Celery, but > simply run as part of the View code: > > def import_all(...) > > ...database access... > > > This is an expensive operation and to avoid timing out the browser, I > needed to (a) run it under Celery but also (b) provide progress updates via > a Websocket to the client. First, I implemented (b) using a Channels > Consumer and the group_send, and that worked fine. I then tried to > implement (a) using a Celery Task: > > @task > > def import_all(...): > ...database access... > async_to_sync(group_send)(...) > > > However, this caused Celery to be unhappy and the worker to die: > > ... > 2020-08-22 13:33:08,303 [ERROR] MainProcess: Task handler raised error: > WorkerLostError('Worker exited prematurely: exitcode 0.') > Traceback (most recent call last): > File > "/usr/local/lib/python3.8/dist-packages/billiard-3.6.3.0-py3.8.egg/billiard/pool.py", > > line 1265, in mark_as_worker_lost > raise WorkerLostError( > billiard.exceptions.WorkerLostError: Worker exited prematurely: exitcode 0. > > After quite some Googling, and various detours including where the > DataBase access caused issues (it seems it cannot be run from an async > context), I ended up with this double-wrapped monster: > > @task > > def import_all(...): > async_to_sync(_import_all)(...) > > async def _import_all(...): > await database_sync_to_async(__import_all)(...) > > def __import_all(...): <<<<<<<<<< original code > ...database access... > async_to_sync(group_send)(...) > > > This seems to work (even though the Celery task eventually dies, it seems > to have done its work first, and is replaced). Is this double-wrapping > really the correct approach? Even if it is clumsy, is there any reason to > think it won't be reliable? > > TIA, Shaheed > > P.S. I should mention that I am using Celery's prefork library, as > opposed to say gevent or greenlets. > > > > > > > > > > > > > -- You received this message because you are subscribed to the Google Groups "Django users" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/ef21c366-66b2-4760-a74f-415466695a55n%40googlegroups.com.

