Hi All:
 I'm migrating a small application from Django 1.x/Channels 1.x to Django 
2.0.2 and Channels 2.0. I've run into an issue whose cause I'm trying to 
determine. It could be due to a failure on my part to correctly implement 
the channel_layer or it could be due to an
incompatibility with Celery 4.1. The basics are:
- Run a periodic Celery task
- Use the channel_layer to perform a group_send
- Have the consumer receive the group_send event and push a json  message 
over the socket

Show below is my simple consumer.py module:
class mstatusMessage(AsyncJsonWebsocketConsumer):

    ##### WebSocket event handlers

    async def connect(self):
        """
        Called when the websocket is handshaking as part of initial 
connection.
        """
        logging.info("### Connected ###")
        # Accept the connection
        await self.accept()

        # Add to the group so they get messages
        await self.channel_layer.group_add(
            settings.CHANNEL_GROUP,
            self.channel_name,
        )

    async def disconnect(self, code):
        """
        Called when the WebSocket closes for any reason.
        """
        # Remove them from the group
        await self.channel_layer.group_discard(
            settings.CHANNEL_GROUP,
            self.channel_name,
        )

    ##### Handlers for messages sent over the channel layer

    # These helper methods are named by the types we send - so epics.join 
becomes epics_join
    async def epics_message(self, event):
        """
        Called when the Celery task queries Epics.
        """
        logging.error("### Received Msg ###")
        # Send a message down to the client
        await self.send_json(
            {
                "text": event["message"],
            },
        )

The routing is simple:
application = ProtocolTypeRouter({
    "websocket":  mstatusMessage
})

The Celery task is as follows:
@shared_task
def updateData(param):

    logger.error('##### updateData #####')

    # # Get an instance of the channel layer for
    # # inter task communications
    channel_layer = get_channel_layer()

    channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

The results are promising as the websocket connect opens successfully and 
the Celery task run as show by the debugging output given below:
127.0.0.1:59818 - - [02/Mar/2018:09:32:11] "GET /" 200 100639
127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECTING /epics/" - -
2018-03-02 09:32:12,280 INFO     ### Connected ###
127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECT /epics/" - -
[2018-03-02 09:32:12,312: ERROR/ForkPoolWorker-2] 
mstatus.tasks.updateData[8d329e61-]: ##### updateData #####
[2018-03-02 09:32:13,310: ERROR/ForkPoolWorker-2] 
mstatus.tasks.updateData[786f51a6-]: ##### updateData #####

BUT ............... although the Celery task runs the consumer never 
receives a message via the channel layer. This could be due to an
implementation error or, maybe, a compatibility issue. The application 
doesn't crash but the following warning is issued:

[2018-03-02 09:32:02,105: WARNING/ForkPoolWorker-2] 
/mstatus/mstatus/tasks.py:33: RuntimeWarning: coroutine 
'RedisChannelLayer.group_send' was never awaited
  {"type": "epics.message", "text": "Hello World"},

This warning appears related to the Python asyncio functionality. Under the 
Celery task module, the channel_layer.group_send
doesn't use the await directive as it's inclusion hangs the Celery task. 
Changing the Celery task to:
async def updateData(param):

    logger.error('##### updateData #####')

    # # Get an instance of the channel layer for
    # # inter task communications
    channel_layer = get_channel_layer()

    await channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

This results in the following runtime warning and the Celery task fails to 
run (the debug message is never printed) :

[2018-03-02 09:45:19,804: WARNING/ForkPoolWorker-2] 
/home/broteng/.pyenv/versions/3.6.3/envs/djchannels2/lib/python3.6/site-packages/billiard/pool.py:358:
 
RuntimeWarning: coroutine 'updateData' was never awaited
  result = (True, prepare_result(fun(*args, **kwargs)))

I'm sure these warnings are understood by someone who can provide guidance 
with respect to a solution.

Thanks,

G Broten

Reference:

The application has be tried under two OS versions:
CentOS 7.4
Alpine Linux 3.7

A partial pip list of the significant packages:
asgi-redis (1.4.3)
asgiref (2.1.6)
async-timeout (2.0.0)
billiard (3.5.0.3)
cached-property (1.4.0)
celery (4.1.0)
channels (2.0.2)
channels-redis (2.1.0)
daphne (2.0.4)
Django (2.0.2)
redis (2.10.6)





-- 
You received this message because you are subscribed to the Google Groups 
"Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/django-users/8bc30e1e-3a68-4eea-983a-937b85c05b59%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to