Taking a stab at this - I believe the original problem may be here:
    channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

Your updateData method is a synchronous method. However, 
channel_layer.group_send is an asynchronous method.

What you might try is wrapping the group_send method in the async_to_sync 
function.
See the documentation at 
http://channels.readthedocs.io/en/latest/topics/channel_layers.html#synchronous-functions

    async_to_sync(channel_layer.group_send)(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )


Your first solution to make updateData an asynchronous method might work 
with some other work involved in adding that task to the event loop - but 
that answer is beyond me at the moment.

Hope this helps,
     Ken


On Friday, March 2, 2018 at 1:36:08 PM UTC-5, G Broten wrote:
>
> Hi All:
>  I'm migrating a small application from Django 1.x/Channels 1.x to Django 
> 2.0.2 and Channels 2.0. I've run into an issue whose cause I'm trying to 
> determine. It could be due to a failure on my part to correctly implement 
> the channel_layer or it could be due to an
> incompatibility with Celery 4.1. The basics are:
> - Run a periodic Celery task
> - Use the channel_layer to perform a group_send
> - Have the consumer receive the group_send event and push a json  message 
> over the socket
>
> Show below is my simple consumer.py module:
> class mstatusMessage(AsyncJsonWebsocketConsumer):
>
>     ##### WebSocket event handlers
>
>     async def connect(self):
>         """
>         Called when the websocket is handshaking as part of initial 
> connection.
>         """
>         logging.info("### Connected ###")
>         # Accept the connection
>         await self.accept()
>
>         # Add to the group so they get messages
>         await self.channel_layer.group_add(
>             settings.CHANNEL_GROUP,
>             self.channel_name,
>         )
>
>     async def disconnect(self, code):
>         """
>         Called when the WebSocket closes for any reason.
>         """
>         # Remove them from the group
>         await self.channel_layer.group_discard(
>             settings.CHANNEL_GROUP,
>             self.channel_name,
>         )
>
>     ##### Handlers for messages sent over the channel layer
>
>     # These helper methods are named by the types we send - so epics.join 
> becomes epics_join
>     async def epics_message(self, event):
>         """
>         Called when the Celery task queries Epics.
>         """
>         logging.error("### Received Msg ###")
>         # Send a message down to the client
>         await self.send_json(
>             {
>                 "text": event["message"],
>             },
>         )
>
> The routing is simple:
> application = ProtocolTypeRouter({
>     "websocket":  mstatusMessage
> })
>
> The Celery task is as follows:
> @shared_task
> def updateData(param):
>
>     logger.error('##### updateData #####')
>
>     # # Get an instance of the channel layer for
>     # # inter task communications
>     channel_layer = get_channel_layer()
>
>     channel_layer.group_send(
>         settings.CHANNEL_GROUP,
>         {"type": "epics.message", "text": "Hello World"},
>     )
>
> The results are promising as the websocket connect opens successfully and 
> the Celery task run as show by the debugging output given below:
> 127.0.0.1:59818 - - [02/Mar/2018:09:32:11] "GET /" 200 100639
> 127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECTING /epics/" - -
> 2018-03-02 09:32:12,280 INFO     ### Connected ###
> 127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECT /epics/" - -
> [2018-03-02 09:32:12,312: ERROR/ForkPoolWorker-2] 
> mstatus.tasks.updateData[8d329e61-]: ##### updateData #####
> [2018-03-02 09:32:13,310: ERROR/ForkPoolWorker-2] 
> mstatus.tasks.updateData[786f51a6-]: ##### updateData #####
>
> BUT ............... although the Celery task runs the consumer never 
> receives a message via the channel layer. This could be due to an
> implementation error or, maybe, a compatibility issue. The application 
> doesn't crash but the following warning is issued:
>
> [2018-03-02 09:32:02,105: WARNING/ForkPoolWorker-2] 
> /mstatus/mstatus/tasks.py:33: RuntimeWarning: coroutine 
> 'RedisChannelLayer.group_send' was never awaited
>   {"type": "epics.message", "text": "Hello World"},
>
> This warning appears related to the Python asyncio functionality. Under 
> the Celery task module, the channel_layer.group_send
> doesn't use the await directive as it's inclusion hangs the Celery task. 
> Changing the Celery task to:
> async def updateData(param):
>
>     logger.error('##### updateData #####')
>
>     # # Get an instance of the channel layer for
>     # # inter task communications
>     channel_layer = get_channel_layer()
>
>     await channel_layer.group_send(
>         settings.CHANNEL_GROUP,
>         {"type": "epics.message", "text": "Hello World"},
>     )
>
> This results in the following runtime warning and the Celery task fails to 
> run (the debug message is never printed) :
>
> [2018-03-02 09:45:19,804: WARNING/ForkPoolWorker-2] 
> /home/broteng/.pyenv/versions/3.6.3/envs/djchannels2/lib/python3.6/site-packages/billiard/pool.py:358:
>  
> RuntimeWarning: coroutine 'updateData' was never awaited
>   result = (True, prepare_result(fun(*args, **kwargs)))
>
> I'm sure these warnings are understood by someone who can provide guidance 
> with respect to a solution.
>
> Thanks,
>
> G Broten
>
> Reference:
>
> The application has be tried under two OS versions:
> CentOS 7.4
> Alpine Linux 3.7
>
> A partial pip list of the significant packages:
> asgi-redis (1.4.3)
> asgiref (2.1.6)
> async-timeout (2.0.0)
> billiard (3.5.0.3)
> cached-property (1.4.0)
> celery (4.1.0)
> channels (2.0.2)
> channels-redis (2.1.0)
> daphne (2.0.4)
> Django (2.0.2)
> redis (2.10.6)
>
>
>
>
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/django-users/1199cf0d-bd71-422f-b148-5148eaa59d29%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to