Perhaps I should have known, but didn't: Celery 4.x does not support 
asyncio (celery#3883 <https://github.com/celery/celery/issues/3883>, 
celery#3884 <https://github.com/celery/celery/issues/3884>). Luckily, this 
was explained over at channels_rabbitmq#25 
<https://github.com/CJWorkbench/channels_rabbitmq/issues/25>. From there, a 
subprocess based fix was easy...once I became aware of channels_rabbitmq#13 
<https://github.com/CJWorkbench/channels_rabbitmq/issues/13>. TL;DR:


   - You cannot naively call into Channels (or any other) async code from 
   inside a Celery task (until Celery 5).
   - You cannot naively call async_to_sync() et. al. Some awareness of the 
   scope of the event loop(s) that underpin the system is required, or a belt 
   and braces type workaround.
   
Thanks, Shaheed

P.S. I went down this rabbit hole partly because, at the time of my 
original posting, I had not noticed that the exception noted below was 
still happening AND when I moved off my development system onto an AWS 
deployment setup, everything stopped working.


On Saturday, 22 August 2020 at 14:52:33 UTC+1 [email protected] wrote:

> I have a question about running Channels and Django code under Celery. 
> Originally, I had some code that used neither Channels nor Celery, but 
> simply run as part of the View code:
>
> def import_all(...)
>
>     ...database access...
>
>
> This is an expensive operation and to avoid timing out the browser, I 
> needed to (a) run it under Celery but also (b) provide progress updates via 
> a Websocket to the client. First, I implemented (b) using a Channels 
> Consumer and the group_send, and that worked fine. I then tried to 
> implement (a) using a Celery Task:
>
> @task
>
> def import_all(...):
>     ...database access...
>     async_to_sync(group_send)(...)
>
>
> However, this caused Celery to be unhappy and the worker to die:
>
> ...
> 2020-08-22 13:33:08,303 [ERROR] MainProcess: Task handler raised error: 
> WorkerLostError('Worker exited prematurely: exitcode 0.') 
> Traceback (most recent call last): 
>  File 
> "/usr/local/lib/python3.8/dist-packages/billiard-3.6.3.0-py3.8.egg/billiard/pool.py",
>  
> line 1265, in mark_as_worker_lost 
>    raise WorkerLostError( 
> billiard.exceptions.WorkerLostError: Worker exited prematurely: exitcode 0.
>
> After quite some Googling, and various detours including where the 
> DataBase access caused issues (it seems it cannot be run from an async 
> context), I ended up with this double-wrapped monster:
>
> @task
>
> def import_all(...):
>     async_to_sync(_import_all)(...)
>
> async def _import_all(...):
>     await database_sync_to_async(__import_all)(...)
>
> def __import_all(...):   <<<<<<<<<< original code
>     ...database access...
>     async_to_sync(group_send)(...)
>
>
> This seems to work (even though the Celery task eventually dies, it seems 
> to have done its work first, and is replaced). Is this double-wrapping 
> really the correct approach? Even if it is clumsy, is there any reason to 
> think it won't be reliable?
>
> TIA, Shaheed
>
> P.S. I should mention that I am using Celery's prefork library, as 
> opposed to say gevent or greenlets. 
>
>
>
>
>
>
>
>
>
>
>
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/django-users/ef21c366-66b2-4760-a74f-415466695a55n%40googlegroups.com.

Reply via email to