Hi guys,
Thanks for the comments and suggestions so far. @Paul, cheers for the
heads-up re cloud pubsub, but using a cloud broker is not suitable for my
use case.
I found some time to implement a prototype pubsub servicer, and I'd like to
share it because I still have a few doubts if I'm doing things right, and a
few specific questions.
The servicer has a single bidirectional streaming endpoint. I spawn a new
thread in the implementation to process incoming requests, generate a
unique id for the connection, do some book-keeping, and send out updates to
subscribed clients as they are published.
Specific questions:
1. Is it possible to know when a client has disconnected, so I can release
any resources that have been allocated?
2. Would it be more idiomatic to use threadlocals somehow for the
book-keeping?
Any other comments and criticism of the code very welcome ;)
# ------------------------------ pubsub.proto
package pubsub;
service PubSub {
rpc subscribe(stream SubscriptionRequest) returns (stream SubscriptionReply);
}
message SubscriptionRequest {
string topic = 1;
}
message SubscriptionReply {
string topic = 1;
string message = 2;
}
# ------------------------------ pubsub_server.py
import queue
import random
import threading
import uuid
from collections import defaultdict
import time
from concurrent import futures
import grpc
import pubsub_pb2
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
class PubSub(pubsub_pb2.PubSubServicer):
def __init__(self):
self.client_q = dict()
self.topics = defaultdict(list)
t = threading.Thread(target=self._fake_publisher)
t.start()
def publish(self, topic, message):
reply = pubsub_pb2.SubscriptionReply(topic=topic,
message=message)
print("Publishing topic {} message {}".format(topic, message))
for clientid in self.topics[topic]:
self.client_q[clientid].put(reply)
def subscribe(self, request_iterator, context):
clientid = uuid.uuid4()
self.client_q[clientid] = queue.Queue()
t = threading.Thread(target=self._process_subscriptions,
args=(clientid, request_iterator))
t.start()
while True:
item = self.client_q[clientid].get()
yield item
self.client_q[clientid].task_done()
# What's the best way to release resources and tidy up
# when the client goes away?
def _process_subscriptions(self, clientid, request_iterator):
for request in request_iterator:
print("Received subscription request from {} for topic {}".format(
clientid, request.topic
))
self.topics[request.topic].append(clientid)
def _fake_publisher(self):
while True:
if len(self.topics):
topic = random.choice(list(self.topics.keys()))
self.publish(topic, str(uuid.uuid4()))
time.sleep(1)
def serve():
server = grpc.server(futures.ThreadPoolExecutor())
pubsub_pb2.add_PubSubServicer_to_server(PubSub(), server)
server.add_insecure_port('[::]:50051')
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve()
# ------------------------------ pubsub_client.py
import time
import concurrent
import grpc
import pubsub_pb2
def client(stub):
def my_topics():
yield pubsub_pb2.SubscriptionRequest(topic="grpc")
time.sleep(5)
yield pubsub_pb2.SubscriptionRequest(topic="donkeys")
responses = stub.subscribe(my_topics())
for response in responses:
print("Received topic {} message {}".format(
response.topic, response.message))
def run():
channel = grpc.insecure_channel('localhost:50051')
stub = pubsub_pb2.PubSubStub(channel)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(client, stub)
]
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as exc:
print("Generated an exception {}".format(exc))
if __name__ == '__main__':
run()
On Sunday, 28 August 2016 02:52:15 UTC+2, Paul Grosu wrote:
>
> Hi Mark,
>
> Might the following PubSub via gRPC example be useful:
>
>
> https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-java/tree/master/grpc
>
> Paul
>
> On Friday, August 26, 2016 at 6:13:32 AM UTC-4, Mark NS wrote:
>>
>> Hi,
>>
>> I'm looking into using gRPC for a line-of-business app with 10-100 users.
>> Python server side, C# client side. The server side application is
>> stateful, and clients will have both request-response and streaming
>> communications.
>>
>> I was previously working with ZeroMQ (changing to gRPC for firewall
>> traversal etc), which has pubsub patterns built in. Are there any examples
>> of such a pattern implemented with gRPC? (I saw the post about cloud
>> pubsub, but I am looking for something brokerless).
>>
>> - Mark
>>
>
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/8a783b81-1fe2-4524-9e9a-14c371427802%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.