I'm trying to build a Python gRPC service using bi-directional streaming as 
a method of sending commands from server -> client, to bypass a firewall. 
The client will typically respond to a command from the server by sending 
back another request. The first message in the stream from the client 
simply serves to establish the connection and then any further message in 
either direction could come totally out of band. Here's roughly what the 
proto, client, and server look like:

Proto:

service Command {
    rpc Send(stream CommandMessage) returns (stream CommandMessage ) {}
}

message CommandMessage {
    google.protobuf.Any task = 1;
}

Server:
    
def Send(self, request_iterator, context):
    q = Queue()

    def response_worker():
        while True:
            # Send commands to client out-of-band
            q.put(CommandMessage(task=...)))

    t = Thread(target=response_worker)
    t.daemon = True
    t.start()

    server_response_iterator = ServerResponseIterator(q)
    for request in request_iterator:
        # This is where I run into problems as described below
        # I want to process requests here, maybe put something on the queue 
in order to respond
        return server_response_iterator

class ServerResponseIterator(object):
    def __init__(self, queue):
        self._queue = queue

    def __iter__(self):
        return self

    def _next(self):
        message = self._queue.get(block=True)
        self._queue.task_done()
        return message

    def __next__(self):  # Python 3
        return self._next()

    def next(self):  # Python 2
        return self._next()

Client:

class ClientRequestIterator(object):
    def __init__(self, queue):
        self._queue = queue
        self._open = False

    def __iter__(self):
        return self

    def _next(self):
        if not self._open:
            self._open = True
            # Send a dummy message to open the connection
            return CommandMessage(task=...)
        else:
            message = self._queue.get(block=True)
            self._queue.task_done()
            return message

    def __next__(self):  # Python 3
        return self._next()

    def next(self):  # Python 2
        return self._next()

if __name__ == '__main__':
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = command_pb2_grpc.CommandStub(channel)
        
        q = Queue()

        def request_worker():
            while True:
                # Send commands to server out-of-band
                time.sleep(15)                
                q.put(CommandMessage(task=...)
                
        t = Thread(target=request_worker)
        t.daemon = True
        t.start()

        request_iterator = ClientRequestIterator(q)
        responses = stub.Send(request_iterator)
        for response in responses:       
            # Respond to commands from server
            q.put(CommandMessage(task=...)          


The behavior I'm seeing is the client side works. It will send out of band 
message that come from the background thread that I put on the queue. It 
also properly consumes messages from the response iterator (commands from 
the server), puts a command on the queue and then sends that request. 
However, on the server side of things the server will only see the initial 
request that opens the connections and then it will never see another 
request (it seems hung in the request iterator). As the server puts out of 
band messages on the queue it does send responses every 15 seconds (from 
the response iterator).

So I'm open to the possibility that I'm going about this in entirely the 
wrong way. Conceptually I think what I want is: 1) a server side thread 
that can send responses at any time 2) a server side thread that will 
process requests as they come in, and may or may not send a response 3) a 
client side thread that can send requests at any time 4) a client side 
thread that will process responses as they come in and may or may not send 
a request as a result.

I'm making the assumption that the server doesn't have to send a response 
for every request it receives, and conversely could send a response without 
every having received a request.

Thanks in advance for any feedback!

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/ad70c131-6358-4ea4-ba72-adc8702c05d0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to