Anyone have an answer to this question in particular?
1. Is it possible to know when a client has disconnected, so I can release
any resources that have been allocated?
On Monday, 29 August 2016 22:00:00 UTC+2, Mark NS wrote:
>
> Hi guys,
> Thanks for the comments and suggestions so far. @Paul, cheers for the
> heads-up re cloud pubsub, but using a cloud broker is not suitable for my
> use case.
>
> I found some time to implement a prototype pubsub servicer, and I'd like
> to share it because I still have a few doubts if I'm doing things right,
> and a few specific questions.
>
> The servicer has a single bidirectional streaming endpoint. I spawn a new
> thread in the implementation to process incoming requests, generate a
> unique id for the connection, do some book-keeping, and send out updates to
> subscribed clients as they are published.
>
>
>
> Specific questions:
> 1. Is it possible to know when a client has disconnected, so I can release
> any resources that have been allocated?
> 2. Would it be more idiomatic to use threadlocals somehow for the
> book-keeping?
>
> Any other comments and criticism of the code very welcome ;)
>
>
> # ------------------------------ pubsub.proto
>
> package pubsub;
>
> service PubSub {
> rpc subscribe(stream SubscriptionRequest) returns (stream
> SubscriptionReply);
> }
>
> message SubscriptionRequest {
> string topic = 1;
> }
>
> message SubscriptionReply {
> string topic = 1;
> string message = 2;
>
> }
>
> # ------------------------------ pubsub_server.py
>
> import queue
> import random
> import threading
> import uuid
> from collections import defaultdict
> import time
> from concurrent import futures
>
> import grpc
>
> import pubsub_pb2
>
> _ONE_DAY_IN_SECONDS = 60 * 60 * 24
>
>
> class PubSub(pubsub_pb2.PubSubServicer):
> def __init__(self):
> self.client_q = dict()
> self.topics = defaultdict(list)
>
> t = threading.Thread(target=self._fake_publisher)
> t.start()
>
> def publish(self, topic, message):
> reply = pubsub_pb2.SubscriptionReply(topic=topic,
> message=message)
> print("Publishing topic {} message {}".format(topic, message))
> for clientid in self.topics[topic]:
> self.client_q[clientid].put(reply)
>
> def subscribe(self, request_iterator, context):
> clientid = uuid.uuid4()
> self.client_q[clientid] = queue.Queue()
>
> t = threading.Thread(target=self._process_subscriptions,
> args=(clientid, request_iterator))
> t.start()
>
> while True:
> item = self.client_q[clientid].get()
> yield item
> self.client_q[clientid].task_done()
>
> # What's the best way to release resources and tidy up
> # when the client goes away?
>
> def _process_subscriptions(self, clientid, request_iterator):
> for request in request_iterator:
> print("Received subscription request from {} for topic {}".format(
> clientid, request.topic
> ))
> self.topics[request.topic].append(clientid)
>
> def _fake_publisher(self):
> while True:
> if len(self.topics):
> topic = random.choice(list(self.topics.keys()))
> self.publish(topic, str(uuid.uuid4()))
> time.sleep(1)
>
>
> def serve():
> server = grpc.server(futures.ThreadPoolExecutor())
> pubsub_pb2.add_PubSubServicer_to_server(PubSub(), server)
> server.add_insecure_port('[::]:50051')
> server.start()
> try:
> while True:
> time.sleep(_ONE_DAY_IN_SECONDS)
> except KeyboardInterrupt:
> server.stop(0)
>
>
> if __name__ == '__main__':
> serve()
>
>
> # ------------------------------ pubsub_client.py
>
> import time
> import concurrent
>
>
> import grpc
>
> import pubsub_pb2
>
> def client(stub):
> def my_topics():
> yield pubsub_pb2.SubscriptionRequest(topic="grpc")
> time.sleep(5)
> yield pubsub_pb2.SubscriptionRequest(topic="donkeys")
>
> responses = stub.subscribe(my_topics())
> for response in responses:
> print("Received topic {} message {}".format(
> response.topic, response.message))
>
>
> def run():
> channel = grpc.insecure_channel('localhost:50051')
> stub = pubsub_pb2.PubSubStub(channel)
>
> with concurrent.futures.ThreadPoolExecutor() as executor:
> futures = [
> executor.submit(client, stub)
> ]
>
> for future in concurrent.futures.as_completed(futures):
> try:
> future.result()
> except Exception as exc:
> print("Generated an exception {}".format(exc))
>
>
> if __name__ == '__main__':
> run()
>
>
>
> On Sunday, 28 August 2016 02:52:15 UTC+2, Paul Grosu wrote:
>>
>> Hi Mark,
>>
>> Might the following PubSub via gRPC example be useful:
>>
>>
>> https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-java/tree/master/grpc
>>
>> Paul
>>
>> On Friday, August 26, 2016 at 6:13:32 AM UTC-4, Mark NS wrote:
>>>
>>> Hi,
>>>
>>> I'm looking into using gRPC for a line-of-business app with 10-100
>>> users. Python server side, C# client side. The server side application is
>>> stateful, and clients will have both request-response and streaming
>>> communications.
>>>
>>> I was previously working with ZeroMQ (changing to gRPC for firewall
>>> traversal etc), which has pubsub patterns built in. Are there any examples
>>> of such a pattern implemented with gRPC? (I saw the post about cloud
>>> pubsub, but I am looking for something brokerless).
>>>
>>> - Mark
>>>
>>
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/19d1aee5-16bf-4687-ba47-551d20d1c207%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.