I'm trying to build a Python gRPC service using bi-directional streaming as
a method of sending commands from server -> client, to bypass a firewall.
The client will typically respond to a command from the server by sending
back another request. The first message in the stream from the client
simply serves to establish the connection and then any further message in
either direction could come totally out of band. Here's roughly what the
proto, client, and server look like:
Proto:
service Command {
rpc Send(stream CommandMessage) returns (stream CommandMessage ) {}
}
message CommandMessage {
google.protobuf.Any task = 1;
}
Server:
def Send(self, request_iterator, context):
q = Queue()
def response_worker():
while True:
# Send commands to client out-of-band
q.put(CommandMessage(task=...)))
t = Thread(target=response_worker)
t.daemon = True
t.start()
server_response_iterator = ServerResponseIterator(q)
for request in request_iterator:
# This is where I run into problems as described below
# I want to process requests here, maybe put something on the queue
in order to respond
return server_response_iterator
class ServerResponseIterator(object):
def __init__(self, queue):
self._queue = queue
def __iter__(self):
return self
def _next(self):
message = self._queue.get(block=True)
self._queue.task_done()
return message
def __next__(self): # Python 3
return self._next()
def next(self): # Python 2
return self._next()
Client:
class ClientRequestIterator(object):
def __init__(self, queue):
self._queue = queue
self._open = False
def __iter__(self):
return self
def _next(self):
if not self._open:
self._open = True
# Send a dummy message to open the connection
return CommandMessage(task=...)
else:
message = self._queue.get(block=True)
self._queue.task_done()
return message
def __next__(self): # Python 3
return self._next()
def next(self): # Python 2
return self._next()
if __name__ == '__main__':
with grpc.insecure_channel('localhost:50051') as channel:
stub = command_pb2_grpc.CommandStub(channel)
q = Queue()
def request_worker():
while True:
# Send commands to server out-of-band
time.sleep(15)
q.put(CommandMessage(task=...)
t = Thread(target=request_worker)
t.daemon = True
t.start()
request_iterator = ClientRequestIterator(q)
responses = stub.Send(request_iterator)
for response in responses:
# Respond to commands from server
q.put(CommandMessage(task=...)
The behavior I'm seeing is the client side works. It will send out of band
message that come from the background thread that I put on the queue. It
also properly consumes messages from the response iterator (commands from
the server), puts a command on the queue and then sends that request.
However, on the server side of things the server will only see the initial
request that opens the connections and then it will never see another
request (it seems hung in the request iterator). As the server puts out of
band messages on the queue it does send responses every 15 seconds (from
the response iterator).
So I'm open to the possibility that I'm going about this in entirely the
wrong way. Conceptually I think what I want is: 1) a server side thread
that can send responses at any time 2) a server side thread that will
process requests as they come in, and may or may not send a response 3) a
client side thread that can send requests at any time 4) a client side
thread that will process responses as they come in and may or may not send
a request as a result.
I'm making the assumption that the server doesn't have to send a response
for every request it receives, and conversely could send a response without
every having received a request.
Thanks in advance for any feedback!
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/ad70c131-6358-4ea4-ba72-adc8702c05d0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.