Hi Mark, Try to read this example in how to use ServerContext's AsyncNotifyWhenDone:
https://github.com/grpc/grpc/blob/master/test/cpp/end2end/async_end2end_test.cc Hope it helps, Paul On Wednesday, August 31, 2016 at 3:37:59 AM UTC-4, Mark NS wrote: > > Anyone have an answer to this question in particular? > > 1. Is it possible to know when a client has disconnected, so I can release > any resources that have been allocated? > > > On Monday, 29 August 2016 22:00:00 UTC+2, Mark NS wrote: >> >> Hi guys, >> Thanks for the comments and suggestions so far. @Paul, cheers for the >> heads-up re cloud pubsub, but using a cloud broker is not suitable for my >> use case. >> >> I found some time to implement a prototype pubsub servicer, and I'd like >> to share it because I still have a few doubts if I'm doing things right, >> and a few specific questions. >> >> The servicer has a single bidirectional streaming endpoint. I spawn a new >> thread in the implementation to process incoming requests, generate a >> unique id for the connection, do some book-keeping, and send out updates to >> subscribed clients as they are published. >> >> >> >> Specific questions: >> 1. Is it possible to know when a client has disconnected, so I can >> release any resources that have been allocated? >> 2. Would it be more idiomatic to use threadlocals somehow for the >> book-keeping? >> >> Any other comments and criticism of the code very welcome ;) >> >> >> # ------------------------------ pubsub.proto >> >> package pubsub; >> >> service PubSub { >> rpc subscribe(stream SubscriptionRequest) returns (stream >> SubscriptionReply); >> } >> >> message SubscriptionRequest { >> string topic = 1; >> } >> >> message SubscriptionReply { >> string topic = 1; >> string message = 2; >> >> } >> >> # ------------------------------ pubsub_server.py >> >> import queue >> import random >> import threading >> import uuid >> from collections import defaultdict >> import time >> from concurrent import futures >> >> import grpc >> >> import pubsub_pb2 >> >> _ONE_DAY_IN_SECONDS = 60 * 60 * 24 >> >> >> class PubSub(pubsub_pb2.PubSubServicer): >> def __init__(self): >> self.client_q = dict() >> self.topics = defaultdict(list) >> >> t = threading.Thread(target=self._fake_publisher) >> t.start() >> >> def publish(self, topic, message): >> reply = pubsub_pb2.SubscriptionReply(topic=topic, >> message=message) >> print("Publishing topic {} message {}".format(topic, message)) >> for clientid in self.topics[topic]: >> self.client_q[clientid].put(reply) >> >> def subscribe(self, request_iterator, context): >> clientid = uuid.uuid4() >> self.client_q[clientid] = queue.Queue() >> >> t = threading.Thread(target=self._process_subscriptions, >> args=(clientid, request_iterator)) >> t.start() >> >> while True: >> item = self.client_q[clientid].get() >> yield item >> self.client_q[clientid].task_done() >> >> # What's the best way to release resources and tidy up >> # when the client goes away? >> >> def _process_subscriptions(self, clientid, request_iterator): >> for request in request_iterator: >> print("Received subscription request from {} for topic {}".format( >> clientid, request.topic >> )) >> self.topics[request.topic].append(clientid) >> >> def _fake_publisher(self): >> while True: >> if len(self.topics): >> topic = random.choice(list(self.topics.keys())) >> self.publish(topic, str(uuid.uuid4())) >> time.sleep(1) >> >> >> def serve(): >> server = grpc.server(futures.ThreadPoolExecutor()) >> pubsub_pb2.add_PubSubServicer_to_server(PubSub(), server) >> server.add_insecure_port('[::]:50051') >> server.start() >> try: >> while True: >> time.sleep(_ONE_DAY_IN_SECONDS) >> except KeyboardInterrupt: >> server.stop(0) >> >> >> if __name__ == '__main__': >> serve() >> >> >> # ------------------------------ pubsub_client.py >> >> import time >> import concurrent >> >> >> import grpc >> >> import pubsub_pb2 >> >> def client(stub): >> def my_topics(): >> yield pubsub_pb2.SubscriptionRequest(topic="grpc") >> time.sleep(5) >> yield pubsub_pb2.SubscriptionRequest(topic="donkeys") >> >> responses = stub.subscribe(my_topics()) >> for response in responses: >> print("Received topic {} message {}".format( >> response.topic, response.message)) >> >> >> def run(): >> channel = grpc.insecure_channel('localhost:50051') >> stub = pubsub_pb2.PubSubStub(channel) >> >> with concurrent.futures.ThreadPoolExecutor() as executor: >> futures = [ >> executor.submit(client, stub) >> ] >> >> for future in concurrent.futures.as_completed(futures): >> try: >> future.result() >> except Exception as exc: >> print("Generated an exception {}".format(exc)) >> >> >> if __name__ == '__main__': >> run() >> >> >> >> On Sunday, 28 August 2016 02:52:15 UTC+2, Paul Grosu wrote: >>> >>> Hi Mark, >>> >>> Might the following PubSub via gRPC example be useful: >>> >>> >>> https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-java/tree/master/grpc >>> >>> Paul >>> >>> On Friday, August 26, 2016 at 6:13:32 AM UTC-4, Mark NS wrote: >>>> >>>> Hi, >>>> >>>> I'm looking into using gRPC for a line-of-business app with 10-100 >>>> users. Python server side, C# client side. The server side application is >>>> stateful, and clients will have both request-response and streaming >>>> communications. >>>> >>>> I was previously working with ZeroMQ (changing to gRPC for firewall >>>> traversal etc), which has pubsub patterns built in. Are there any examples >>>> of such a pattern implemented with gRPC? (I saw the post about cloud >>>> pubsub, but I am looking for something brokerless). >>>> >>>> - Mark >>>> >>> -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/grpc-io. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/02b7b7e3-1218-45ef-83e0-68ec50fa3e58%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.
