Anyone have an answer to this question in particular?

1. Is it possible to know when a client has disconnected, so I can release 
any resources that have been allocated?


On Monday, 29 August 2016 22:00:00 UTC+2, Mark NS wrote:
>
> Hi guys,
> Thanks for the comments and suggestions so far. @Paul, cheers for the 
> heads-up re cloud pubsub, but using a cloud broker is not suitable for my 
> use case. 
>
> I found some time to implement a prototype pubsub servicer, and I'd like 
> to share it because I still have a few doubts if I'm doing things right, 
> and a few specific questions. 
>
> The servicer has a single bidirectional streaming endpoint. I spawn a new 
> thread in the implementation to process incoming requests, generate a 
> unique id for the connection, do some book-keeping, and send out updates to 
> subscribed clients as they are published. 
>                                                                               
>        
>
> Specific questions: 
> 1. Is it possible to know when a client has disconnected, so I can release 
> any resources that have been allocated? 
> 2. Would it be more idiomatic to use threadlocals somehow for the 
> book-keeping? 
>                                                           
> Any other comments and criticism of the code very welcome ;)
>
>
> # ------------------------------ pubsub.proto
>
> package pubsub;
>
> service PubSub {
>   rpc subscribe(stream SubscriptionRequest) returns (stream 
> SubscriptionReply);
> }
>
> message SubscriptionRequest {
>   string topic = 1;
> }
>
> message SubscriptionReply {
>   string topic = 1;
>   string message = 2;
>
> }
>
> # ------------------------------ pubsub_server.py
>
> import queue
> import random
> import threading
> import uuid
> from collections import defaultdict
> import time
> from concurrent import futures
>
> import grpc
>
> import pubsub_pb2
>
> _ONE_DAY_IN_SECONDS = 60 * 60 * 24
>
>
> class PubSub(pubsub_pb2.PubSubServicer):
>   def __init__(self):
>     self.client_q = dict()
>     self.topics = defaultdict(list)
>
>     t = threading.Thread(target=self._fake_publisher)
>     t.start()
>
>   def publish(self, topic, message):
>     reply = pubsub_pb2.SubscriptionReply(topic=topic,
>                                          message=message)
>     print("Publishing topic {} message {}".format(topic, message))
>     for clientid in self.topics[topic]:
>       self.client_q[clientid].put(reply)
>
>   def subscribe(self, request_iterator, context):
>     clientid = uuid.uuid4()
>     self.client_q[clientid] = queue.Queue()
>
>     t = threading.Thread(target=self._process_subscriptions,
>                          args=(clientid, request_iterator))
>     t.start()
>
>     while True:
>       item = self.client_q[clientid].get()
>       yield item
>       self.client_q[clientid].task_done()
>
>     # What's the best way to release resources and tidy up
>     # when the client goes away?
>
>   def _process_subscriptions(self, clientid, request_iterator):
>     for request in request_iterator:
>       print("Received subscription request from {} for topic {}".format(
>         clientid, request.topic
>       ))
>       self.topics[request.topic].append(clientid)
>
>   def _fake_publisher(self):
>     while True:
>       if len(self.topics):
>         topic = random.choice(list(self.topics.keys()))
>         self.publish(topic, str(uuid.uuid4()))
>       time.sleep(1)
>
>
> def serve():
>   server = grpc.server(futures.ThreadPoolExecutor())
>   pubsub_pb2.add_PubSubServicer_to_server(PubSub(), server)
>   server.add_insecure_port('[::]:50051')
>   server.start()
>   try:
>     while True:
>       time.sleep(_ONE_DAY_IN_SECONDS)
>   except KeyboardInterrupt:
>     server.stop(0)
>
>
> if __name__ == '__main__':
>   serve()
>
>
> # ------------------------------ pubsub_client.py
>
> import time
> import concurrent
>
>
> import grpc
>
> import pubsub_pb2
>
> def client(stub):
>   def my_topics():
>     yield pubsub_pb2.SubscriptionRequest(topic="grpc")
>     time.sleep(5)
>     yield pubsub_pb2.SubscriptionRequest(topic="donkeys")
>
>   responses = stub.subscribe(my_topics())
>   for response in responses:
>     print("Received topic {} message {}".format(
>       response.topic, response.message))
>
>
> def run():
>   channel = grpc.insecure_channel('localhost:50051')
>   stub = pubsub_pb2.PubSubStub(channel)
>
>   with concurrent.futures.ThreadPoolExecutor() as executor:
>     futures = [
>       executor.submit(client, stub)
>     ]
>
>     for future in concurrent.futures.as_completed(futures):
>       try:
>         future.result()
>       except Exception as exc:
>         print("Generated an exception {}".format(exc))
>
>
> if __name__ == '__main__':
>   run()
>
>  
>
> On Sunday, 28 August 2016 02:52:15 UTC+2, Paul Grosu wrote:
>>
>> Hi Mark,
>>
>> Might the following PubSub via gRPC example be useful:
>>
>>
>> https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-java/tree/master/grpc
>>
>> Paul
>>
>> On Friday, August 26, 2016 at 6:13:32 AM UTC-4, Mark NS wrote:
>>>
>>> Hi,
>>>
>>> I'm looking into using gRPC for a line-of-business app with 10-100 
>>> users. Python server side, C# client side. The server side application is 
>>> stateful, and clients will have both request-response and streaming 
>>> communications.
>>>  
>>> I was previously working with ZeroMQ (changing to gRPC for firewall 
>>> traversal etc), which has pubsub patterns built in. Are there any examples 
>>> of such a pattern implemented with gRPC? (I saw the post about cloud 
>>> pubsub, but I am looking for something brokerless).
>>>
>>> - Mark
>>>
>>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/19d1aee5-16bf-4687-ba47-551d20d1c207%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to