Hi Mark,

Try to read this example in how to use ServerContext's AsyncNotifyWhenDone:

  
https://github.com/grpc/grpc/blob/master/test/cpp/end2end/async_end2end_test.cc

Hope it helps,
Paul

On Wednesday, August 31, 2016 at 3:37:59 AM UTC-4, Mark NS wrote:
>
> Anyone have an answer to this question in particular?
>
> 1. Is it possible to know when a client has disconnected, so I can release 
> any resources that have been allocated?
>
>
> On Monday, 29 August 2016 22:00:00 UTC+2, Mark NS wrote:
>>
>> Hi guys,
>> Thanks for the comments and suggestions so far. @Paul, cheers for the 
>> heads-up re cloud pubsub, but using a cloud broker is not suitable for my 
>> use case. 
>>
>> I found some time to implement a prototype pubsub servicer, and I'd like 
>> to share it because I still have a few doubts if I'm doing things right, 
>> and a few specific questions. 
>>
>> The servicer has a single bidirectional streaming endpoint. I spawn a new 
>> thread in the implementation to process incoming requests, generate a 
>> unique id for the connection, do some book-keeping, and send out updates to 
>> subscribed clients as they are published. 
>>                                                                              
>>         
>>
>> Specific questions: 
>> 1. Is it possible to know when a client has disconnected, so I can 
>> release any resources that have been allocated? 
>> 2. Would it be more idiomatic to use threadlocals somehow for the 
>> book-keeping? 
>>                                                           
>> Any other comments and criticism of the code very welcome ;)
>>
>>
>> # ------------------------------ pubsub.proto
>>
>> package pubsub;
>>
>> service PubSub {
>>   rpc subscribe(stream SubscriptionRequest) returns (stream 
>> SubscriptionReply);
>> }
>>
>> message SubscriptionRequest {
>>   string topic = 1;
>> }
>>
>> message SubscriptionReply {
>>   string topic = 1;
>>   string message = 2;
>>
>> }
>>
>> # ------------------------------ pubsub_server.py
>>
>> import queue
>> import random
>> import threading
>> import uuid
>> from collections import defaultdict
>> import time
>> from concurrent import futures
>>
>> import grpc
>>
>> import pubsub_pb2
>>
>> _ONE_DAY_IN_SECONDS = 60 * 60 * 24
>>
>>
>> class PubSub(pubsub_pb2.PubSubServicer):
>>   def __init__(self):
>>     self.client_q = dict()
>>     self.topics = defaultdict(list)
>>
>>     t = threading.Thread(target=self._fake_publisher)
>>     t.start()
>>
>>   def publish(self, topic, message):
>>     reply = pubsub_pb2.SubscriptionReply(topic=topic,
>>                                          message=message)
>>     print("Publishing topic {} message {}".format(topic, message))
>>     for clientid in self.topics[topic]:
>>       self.client_q[clientid].put(reply)
>>
>>   def subscribe(self, request_iterator, context):
>>     clientid = uuid.uuid4()
>>     self.client_q[clientid] = queue.Queue()
>>
>>     t = threading.Thread(target=self._process_subscriptions,
>>                          args=(clientid, request_iterator))
>>     t.start()
>>
>>     while True:
>>       item = self.client_q[clientid].get()
>>       yield item
>>       self.client_q[clientid].task_done()
>>
>>     # What's the best way to release resources and tidy up
>>     # when the client goes away?
>>
>>   def _process_subscriptions(self, clientid, request_iterator):
>>     for request in request_iterator:
>>       print("Received subscription request from {} for topic {}".format(
>>         clientid, request.topic
>>       ))
>>       self.topics[request.topic].append(clientid)
>>
>>   def _fake_publisher(self):
>>     while True:
>>       if len(self.topics):
>>         topic = random.choice(list(self.topics.keys()))
>>         self.publish(topic, str(uuid.uuid4()))
>>       time.sleep(1)
>>
>>
>> def serve():
>>   server = grpc.server(futures.ThreadPoolExecutor())
>>   pubsub_pb2.add_PubSubServicer_to_server(PubSub(), server)
>>   server.add_insecure_port('[::]:50051')
>>   server.start()
>>   try:
>>     while True:
>>       time.sleep(_ONE_DAY_IN_SECONDS)
>>   except KeyboardInterrupt:
>>     server.stop(0)
>>
>>
>> if __name__ == '__main__':
>>   serve()
>>
>>
>> # ------------------------------ pubsub_client.py
>>
>> import time
>> import concurrent
>>
>>
>> import grpc
>>
>> import pubsub_pb2
>>
>> def client(stub):
>>   def my_topics():
>>     yield pubsub_pb2.SubscriptionRequest(topic="grpc")
>>     time.sleep(5)
>>     yield pubsub_pb2.SubscriptionRequest(topic="donkeys")
>>
>>   responses = stub.subscribe(my_topics())
>>   for response in responses:
>>     print("Received topic {} message {}".format(
>>       response.topic, response.message))
>>
>>
>> def run():
>>   channel = grpc.insecure_channel('localhost:50051')
>>   stub = pubsub_pb2.PubSubStub(channel)
>>
>>   with concurrent.futures.ThreadPoolExecutor() as executor:
>>     futures = [
>>       executor.submit(client, stub)
>>     ]
>>
>>     for future in concurrent.futures.as_completed(futures):
>>       try:
>>         future.result()
>>       except Exception as exc:
>>         print("Generated an exception {}".format(exc))
>>
>>
>> if __name__ == '__main__':
>>   run()
>>
>>  
>>
>> On Sunday, 28 August 2016 02:52:15 UTC+2, Paul Grosu wrote:
>>>
>>> Hi Mark,
>>>
>>> Might the following PubSub via gRPC example be useful:
>>>
>>>
>>> https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-java/tree/master/grpc
>>>
>>> Paul
>>>
>>> On Friday, August 26, 2016 at 6:13:32 AM UTC-4, Mark NS wrote:
>>>>
>>>> Hi,
>>>>
>>>> I'm looking into using gRPC for a line-of-business app with 10-100 
>>>> users. Python server side, C# client side. The server side application is 
>>>> stateful, and clients will have both request-response and streaming 
>>>> communications.
>>>>  
>>>> I was previously working with ZeroMQ (changing to gRPC for firewall 
>>>> traversal etc), which has pubsub patterns built in. Are there any examples 
>>>> of such a pattern implemented with gRPC? (I saw the post about cloud 
>>>> pubsub, but I am looking for something brokerless).
>>>>
>>>> - Mark
>>>>
>>>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/02b7b7e3-1218-45ef-83e0-68ec50fa3e58%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to