Two big thumbs up for Ken! 
His keen eyes spotted the problem, which was attempting an asynchronous 
send from the Celery task. I changed the code to use the synchronous send 
and, bingo, the consumer now receives events via the channel layer.

A big thank-you to Ken! And, I'm sure anyone else using Channels 2.0 with 
Celery will find this thread of use.

G Broten

On Friday, March 2, 2018 at 12:57:01 PM UTC-6, Ken Whitesell wrote:
>
> Taking a stab at this - I believe the original problem may be here:
>     channel_layer.group_send(
>         settings.CHANNEL_GROUP,
>         {"type": "epics.message", "text": "Hello World"},
>     )
>
> Your updateData method is a synchronous method. However, 
> channel_layer.group_send is an asynchronous method.
>
> What you might try is wrapping the group_send method in the async_to_sync 
> function.
> See the documentation at 
> http://channels.readthedocs.io/en/latest/topics/channel_layers.html#synchronous-functions
>
>     async_to_sync(channel_layer.group_send)(
>         settings.CHANNEL_GROUP,
>         {"type": "epics.message", "text": "Hello World"},
>     )
>
>
> Your first solution to make updateData an asynchronous method might work 
> with some other work involved in adding that task to the event loop - but 
> that answer is beyond me at the moment.
>
> Hope this helps,
>      Ken
>
>
> On Friday, March 2, 2018 at 1:36:08 PM UTC-5, G Broten wrote:
>>
>> Hi All:
>>  I'm migrating a small application from Django 1.x/Channels 1.x to Django 
>> 2.0.2 and Channels 2.0. I've run into an issue whose cause I'm trying to 
>> determine. It could be due to a failure on my part to correctly implement 
>> the channel_layer or it could be due to an
>> incompatibility with Celery 4.1. The basics are:
>> - Run a periodic Celery task
>> - Use the channel_layer to perform a group_send
>> - Have the consumer receive the group_send event and push a json  message 
>> over the socket
>>
>> Show below is my simple consumer.py module:
>> class mstatusMessage(AsyncJsonWebsocketConsumer):
>>
>>     ##### WebSocket event handlers
>>
>>     async def connect(self):
>>         """
>>         Called when the websocket is handshaking as part of initial 
>> connection.
>>         """
>>         logging.info("### Connected ###")
>>         # Accept the connection
>>         await self.accept()
>>
>>         # Add to the group so they get messages
>>         await self.channel_layer.group_add(
>>             settings.CHANNEL_GROUP,
>>             self.channel_name,
>>         )
>>
>>     async def disconnect(self, code):
>>         """
>>         Called when the WebSocket closes for any reason.
>>         """
>>         # Remove them from the group
>>         await self.channel_layer.group_discard(
>>             settings.CHANNEL_GROUP,
>>             self.channel_name,
>>         )
>>
>>     ##### Handlers for messages sent over the channel layer
>>
>>     # These helper methods are named by the types we send - so epics.join 
>> becomes epics_join
>>     async def epics_message(self, event):
>>         """
>>         Called when the Celery task queries Epics.
>>         """
>>         logging.error("### Received Msg ###")
>>         # Send a message down to the client
>>         await self.send_json(
>>             {
>>                 "text": event["message"],
>>             },
>>         )
>>
>> The routing is simple:
>> application = ProtocolTypeRouter({
>>     "websocket":  mstatusMessage
>> })
>>
>> The Celery task is as follows:
>> @shared_task
>> def updateData(param):
>>
>>     logger.error('##### updateData #####')
>>
>>     # # Get an instance of the channel layer for
>>     # # inter task communications
>>     channel_layer = get_channel_layer()
>>
>>     channel_layer.group_send(
>>         settings.CHANNEL_GROUP,
>>         {"type": "epics.message", "text": "Hello World"},
>>     )
>>
>> The results are promising as the websocket connect opens successfully and 
>> the Celery task run as show by the debugging output given below:
>> 127.0.0.1:59818 - - [02/Mar/2018:09:32:11] "GET /" 200 100639
>> 127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECTING /epics/" - -
>> 2018-03-02 09:32:12,280 INFO     ### Connected ###
>> 127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECT /epics/" - -
>> [2018-03-02 09:32:12,312: ERROR/ForkPoolWorker-2] 
>> mstatus.tasks.updateData[8d329e61-]: ##### updateData #####
>> [2018-03-02 09:32:13,310: ERROR/ForkPoolWorker-2] 
>> mstatus.tasks.updateData[786f51a6-]: ##### updateData #####
>>
>> BUT ............... although the Celery task runs the consumer never 
>> receives a message via the channel layer. This could be due to an
>> implementation error or, maybe, a compatibility issue. The application 
>> doesn't crash but the following warning is issued:
>>
>> [2018-03-02 09:32:02,105: WARNING/ForkPoolWorker-2] 
>> /mstatus/mstatus/tasks.py:33: RuntimeWarning: coroutine 
>> 'RedisChannelLayer.group_send' was never awaited
>>   {"type": "epics.message", "text": "Hello World"},
>>
>> This warning appears related to the Python asyncio functionality. Under 
>> the Celery task module, the channel_layer.group_send
>> doesn't use the await directive as it's inclusion hangs the Celery task. 
>> Changing the Celery task to:
>> async def updateData(param):
>>
>>     logger.error('##### updateData #####')
>>
>>     # # Get an instance of the channel layer for
>>     # # inter task communications
>>     channel_layer = get_channel_layer()
>>
>>     await channel_layer.group_send(
>>         settings.CHANNEL_GROUP,
>>         {"type": "epics.message", "text": "Hello World"},
>>     )
>>
>> This results in the following runtime warning and the Celery task fails 
>> to run (the debug message is never printed) :
>>
>> [2018-03-02 09:45:19,804: WARNING/ForkPoolWorker-2] 
>> /home/broteng/.pyenv/versions/3.6.3/envs/djchannels2/lib/python3.6/site-packages/billiard/pool.py:358:
>>  
>> RuntimeWarning: coroutine 'updateData' was never awaited
>>   result = (True, prepare_result(fun(*args, **kwargs)))
>>
>> I'm sure these warnings are understood by someone who can provide 
>> guidance with respect to a solution.
>>
>> Thanks,
>>
>> G Broten
>>
>> Reference:
>>
>> The application has be tried under two OS versions:
>> CentOS 7.4
>> Alpine Linux 3.7
>>
>> A partial pip list of the significant packages:
>> asgi-redis (1.4.3)
>> asgiref (2.1.6)
>> async-timeout (2.0.0)
>> billiard (3.5.0.3)
>> cached-property (1.4.0)
>> celery (4.1.0)
>> channels (2.0.2)
>> channels-redis (2.1.0)
>> daphne (2.0.4)
>> Django (2.0.2)
>> redis (2.10.6)
>>
>>
>>
>>
>>
>>

-- 
You received this message because you are subscribed to the Google Groups 
"Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/django-users/09c2b5c2-1926-45be-9139-927db807c703%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to