Taking a stab at this - I believe the original problem may be here:
channel_layer.group_send(
settings.CHANNEL_GROUP,
{"type": "epics.message", "text": "Hello World"},
)
Your updateData method is a synchronous method. However,
channel_layer.group_send is an asynchronous method.
What you might try is wrapping the group_send method in the async_to_sync
function.
See the documentation at
http://channels.readthedocs.io/en/latest/topics/channel_layers.html#synchronous-functions
async_to_sync(channel_layer.group_send)(
settings.CHANNEL_GROUP,
{"type": "epics.message", "text": "Hello World"},
)
Your first solution to make updateData an asynchronous method might work
with some other work involved in adding that task to the event loop - but
that answer is beyond me at the moment.
Hope this helps,
Ken
On Friday, March 2, 2018 at 1:36:08 PM UTC-5, G Broten wrote:
>
> Hi All:
> I'm migrating a small application from Django 1.x/Channels 1.x to Django
> 2.0.2 and Channels 2.0. I've run into an issue whose cause I'm trying to
> determine. It could be due to a failure on my part to correctly implement
> the channel_layer or it could be due to an
> incompatibility with Celery 4.1. The basics are:
> - Run a periodic Celery task
> - Use the channel_layer to perform a group_send
> - Have the consumer receive the group_send event and push a json message
> over the socket
>
> Show below is my simple consumer.py module:
> class mstatusMessage(AsyncJsonWebsocketConsumer):
>
> ##### WebSocket event handlers
>
> async def connect(self):
> """
> Called when the websocket is handshaking as part of initial
> connection.
> """
> logging.info("### Connected ###")
> # Accept the connection
> await self.accept()
>
> # Add to the group so they get messages
> await self.channel_layer.group_add(
> settings.CHANNEL_GROUP,
> self.channel_name,
> )
>
> async def disconnect(self, code):
> """
> Called when the WebSocket closes for any reason.
> """
> # Remove them from the group
> await self.channel_layer.group_discard(
> settings.CHANNEL_GROUP,
> self.channel_name,
> )
>
> ##### Handlers for messages sent over the channel layer
>
> # These helper methods are named by the types we send - so epics.join
> becomes epics_join
> async def epics_message(self, event):
> """
> Called when the Celery task queries Epics.
> """
> logging.error("### Received Msg ###")
> # Send a message down to the client
> await self.send_json(
> {
> "text": event["message"],
> },
> )
>
> The routing is simple:
> application = ProtocolTypeRouter({
> "websocket": mstatusMessage
> })
>
> The Celery task is as follows:
> @shared_task
> def updateData(param):
>
> logger.error('##### updateData #####')
>
> # # Get an instance of the channel layer for
> # # inter task communications
> channel_layer = get_channel_layer()
>
> channel_layer.group_send(
> settings.CHANNEL_GROUP,
> {"type": "epics.message", "text": "Hello World"},
> )
>
> The results are promising as the websocket connect opens successfully and
> the Celery task run as show by the debugging output given below:
> 127.0.0.1:59818 - - [02/Mar/2018:09:32:11] "GET /" 200 100639
> 127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECTING /epics/" - -
> 2018-03-02 09:32:12,280 INFO ### Connected ###
> 127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECT /epics/" - -
> [2018-03-02 09:32:12,312: ERROR/ForkPoolWorker-2]
> mstatus.tasks.updateData[8d329e61-]: ##### updateData #####
> [2018-03-02 09:32:13,310: ERROR/ForkPoolWorker-2]
> mstatus.tasks.updateData[786f51a6-]: ##### updateData #####
>
> BUT ............... although the Celery task runs the consumer never
> receives a message via the channel layer. This could be due to an
> implementation error or, maybe, a compatibility issue. The application
> doesn't crash but the following warning is issued:
>
> [2018-03-02 09:32:02,105: WARNING/ForkPoolWorker-2]
> /mstatus/mstatus/tasks.py:33: RuntimeWarning: coroutine
> 'RedisChannelLayer.group_send' was never awaited
> {"type": "epics.message", "text": "Hello World"},
>
> This warning appears related to the Python asyncio functionality. Under
> the Celery task module, the channel_layer.group_send
> doesn't use the await directive as it's inclusion hangs the Celery task.
> Changing the Celery task to:
> async def updateData(param):
>
> logger.error('##### updateData #####')
>
> # # Get an instance of the channel layer for
> # # inter task communications
> channel_layer = get_channel_layer()
>
> await channel_layer.group_send(
> settings.CHANNEL_GROUP,
> {"type": "epics.message", "text": "Hello World"},
> )
>
> This results in the following runtime warning and the Celery task fails to
> run (the debug message is never printed) :
>
> [2018-03-02 09:45:19,804: WARNING/ForkPoolWorker-2]
> /home/broteng/.pyenv/versions/3.6.3/envs/djchannels2/lib/python3.6/site-packages/billiard/pool.py:358:
>
> RuntimeWarning: coroutine 'updateData' was never awaited
> result = (True, prepare_result(fun(*args, **kwargs)))
>
> I'm sure these warnings are understood by someone who can provide guidance
> with respect to a solution.
>
> Thanks,
>
> G Broten
>
> Reference:
>
> The application has be tried under two OS versions:
> CentOS 7.4
> Alpine Linux 3.7
>
> A partial pip list of the significant packages:
> asgi-redis (1.4.3)
> asgiref (2.1.6)
> async-timeout (2.0.0)
> billiard (3.5.0.3)
> cached-property (1.4.0)
> celery (4.1.0)
> channels (2.0.2)
> channels-redis (2.1.0)
> daphne (2.0.4)
> Django (2.0.2)
> redis (2.10.6)
>
>
>
>
>
>
--
You received this message because you are subscribed to the Google Groups
"Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit
https://groups.google.com/d/msgid/django-users/1199cf0d-bd71-422f-b148-5148eaa59d29%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.