Two big thumbs up for Ken!
His keen eyes spotted the problem, which was attempting an asynchronous
send from the Celery task. I changed the code to use the synchronous send
and, bingo, the consumer now receives events via the channel layer.
A big thank-you to Ken! And, I'm sure anyone else using Channels 2.0 with
Celery will find this thread of use.
G Broten
On Friday, March 2, 2018 at 12:57:01 PM UTC-6, Ken Whitesell wrote:
>
> Taking a stab at this - I believe the original problem may be here:
> channel_layer.group_send(
> settings.CHANNEL_GROUP,
> {"type": "epics.message", "text": "Hello World"},
> )
>
> Your updateData method is a synchronous method. However,
> channel_layer.group_send is an asynchronous method.
>
> What you might try is wrapping the group_send method in the async_to_sync
> function.
> See the documentation at
> http://channels.readthedocs.io/en/latest/topics/channel_layers.html#synchronous-functions
>
> async_to_sync(channel_layer.group_send)(
> settings.CHANNEL_GROUP,
> {"type": "epics.message", "text": "Hello World"},
> )
>
>
> Your first solution to make updateData an asynchronous method might work
> with some other work involved in adding that task to the event loop - but
> that answer is beyond me at the moment.
>
> Hope this helps,
> Ken
>
>
> On Friday, March 2, 2018 at 1:36:08 PM UTC-5, G Broten wrote:
>>
>> Hi All:
>> I'm migrating a small application from Django 1.x/Channels 1.x to Django
>> 2.0.2 and Channels 2.0. I've run into an issue whose cause I'm trying to
>> determine. It could be due to a failure on my part to correctly implement
>> the channel_layer or it could be due to an
>> incompatibility with Celery 4.1. The basics are:
>> - Run a periodic Celery task
>> - Use the channel_layer to perform a group_send
>> - Have the consumer receive the group_send event and push a json message
>> over the socket
>>
>> Show below is my simple consumer.py module:
>> class mstatusMessage(AsyncJsonWebsocketConsumer):
>>
>> ##### WebSocket event handlers
>>
>> async def connect(self):
>> """
>> Called when the websocket is handshaking as part of initial
>> connection.
>> """
>> logging.info("### Connected ###")
>> # Accept the connection
>> await self.accept()
>>
>> # Add to the group so they get messages
>> await self.channel_layer.group_add(
>> settings.CHANNEL_GROUP,
>> self.channel_name,
>> )
>>
>> async def disconnect(self, code):
>> """
>> Called when the WebSocket closes for any reason.
>> """
>> # Remove them from the group
>> await self.channel_layer.group_discard(
>> settings.CHANNEL_GROUP,
>> self.channel_name,
>> )
>>
>> ##### Handlers for messages sent over the channel layer
>>
>> # These helper methods are named by the types we send - so epics.join
>> becomes epics_join
>> async def epics_message(self, event):
>> """
>> Called when the Celery task queries Epics.
>> """
>> logging.error("### Received Msg ###")
>> # Send a message down to the client
>> await self.send_json(
>> {
>> "text": event["message"],
>> },
>> )
>>
>> The routing is simple:
>> application = ProtocolTypeRouter({
>> "websocket": mstatusMessage
>> })
>>
>> The Celery task is as follows:
>> @shared_task
>> def updateData(param):
>>
>> logger.error('##### updateData #####')
>>
>> # # Get an instance of the channel layer for
>> # # inter task communications
>> channel_layer = get_channel_layer()
>>
>> channel_layer.group_send(
>> settings.CHANNEL_GROUP,
>> {"type": "epics.message", "text": "Hello World"},
>> )
>>
>> The results are promising as the websocket connect opens successfully and
>> the Celery task run as show by the debugging output given below:
>> 127.0.0.1:59818 - - [02/Mar/2018:09:32:11] "GET /" 200 100639
>> 127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECTING /epics/" - -
>> 2018-03-02 09:32:12,280 INFO ### Connected ###
>> 127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECT /epics/" - -
>> [2018-03-02 09:32:12,312: ERROR/ForkPoolWorker-2]
>> mstatus.tasks.updateData[8d329e61-]: ##### updateData #####
>> [2018-03-02 09:32:13,310: ERROR/ForkPoolWorker-2]
>> mstatus.tasks.updateData[786f51a6-]: ##### updateData #####
>>
>> BUT ............... although the Celery task runs the consumer never
>> receives a message via the channel layer. This could be due to an
>> implementation error or, maybe, a compatibility issue. The application
>> doesn't crash but the following warning is issued:
>>
>> [2018-03-02 09:32:02,105: WARNING/ForkPoolWorker-2]
>> /mstatus/mstatus/tasks.py:33: RuntimeWarning: coroutine
>> 'RedisChannelLayer.group_send' was never awaited
>> {"type": "epics.message", "text": "Hello World"},
>>
>> This warning appears related to the Python asyncio functionality. Under
>> the Celery task module, the channel_layer.group_send
>> doesn't use the await directive as it's inclusion hangs the Celery task.
>> Changing the Celery task to:
>> async def updateData(param):
>>
>> logger.error('##### updateData #####')
>>
>> # # Get an instance of the channel layer for
>> # # inter task communications
>> channel_layer = get_channel_layer()
>>
>> await channel_layer.group_send(
>> settings.CHANNEL_GROUP,
>> {"type": "epics.message", "text": "Hello World"},
>> )
>>
>> This results in the following runtime warning and the Celery task fails
>> to run (the debug message is never printed) :
>>
>> [2018-03-02 09:45:19,804: WARNING/ForkPoolWorker-2]
>> /home/broteng/.pyenv/versions/3.6.3/envs/djchannels2/lib/python3.6/site-packages/billiard/pool.py:358:
>>
>> RuntimeWarning: coroutine 'updateData' was never awaited
>> result = (True, prepare_result(fun(*args, **kwargs)))
>>
>> I'm sure these warnings are understood by someone who can provide
>> guidance with respect to a solution.
>>
>> Thanks,
>>
>> G Broten
>>
>> Reference:
>>
>> The application has be tried under two OS versions:
>> CentOS 7.4
>> Alpine Linux 3.7
>>
>> A partial pip list of the significant packages:
>> asgi-redis (1.4.3)
>> asgiref (2.1.6)
>> async-timeout (2.0.0)
>> billiard (3.5.0.3)
>> cached-property (1.4.0)
>> celery (4.1.0)
>> channels (2.0.2)
>> channels-redis (2.1.0)
>> daphne (2.0.4)
>> Django (2.0.2)
>> redis (2.10.6)
>>
>>
>>
>>
>>
>>
--
You received this message because you are subscribed to the Google Groups
"Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit
https://groups.google.com/d/msgid/django-users/09c2b5c2-1926-45be-9139-927db807c703%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.