Hi guys,
Thanks for the comments and suggestions so far. @Paul, cheers for the 
heads-up re cloud pubsub, but using a cloud broker is not suitable for my 
use case. 

I found some time to implement a prototype pubsub servicer, and I'd like to 
share it because I still have a few doubts if I'm doing things right, and a 
few specific questions. 

The servicer has a single bidirectional streaming endpoint. I spawn a new 
thread in the implementation to process incoming requests, generate a 
unique id for the connection, do some book-keeping, and send out updates to 
subscribed clients as they are published. 
                                                                                
     

Specific questions: 
1. Is it possible to know when a client has disconnected, so I can release 
any resources that have been allocated? 
2. Would it be more idiomatic to use threadlocals somehow for the 
book-keeping? 
                                                          
Any other comments and criticism of the code very welcome ;)


# ------------------------------ pubsub.proto

package pubsub;

service PubSub {
  rpc subscribe(stream SubscriptionRequest) returns (stream SubscriptionReply);
}

message SubscriptionRequest {
  string topic = 1;
}

message SubscriptionReply {
  string topic = 1;
  string message = 2;

}

# ------------------------------ pubsub_server.py

import queue
import random
import threading
import uuid
from collections import defaultdict
import time
from concurrent import futures

import grpc

import pubsub_pb2

_ONE_DAY_IN_SECONDS = 60 * 60 * 24


class PubSub(pubsub_pb2.PubSubServicer):
  def __init__(self):
    self.client_q = dict()
    self.topics = defaultdict(list)

    t = threading.Thread(target=self._fake_publisher)
    t.start()

  def publish(self, topic, message):
    reply = pubsub_pb2.SubscriptionReply(topic=topic,
                                         message=message)
    print("Publishing topic {} message {}".format(topic, message))
    for clientid in self.topics[topic]:
      self.client_q[clientid].put(reply)

  def subscribe(self, request_iterator, context):
    clientid = uuid.uuid4()
    self.client_q[clientid] = queue.Queue()

    t = threading.Thread(target=self._process_subscriptions,
                         args=(clientid, request_iterator))
    t.start()

    while True:
      item = self.client_q[clientid].get()
      yield item
      self.client_q[clientid].task_done()

    # What's the best way to release resources and tidy up
    # when the client goes away?

  def _process_subscriptions(self, clientid, request_iterator):
    for request in request_iterator:
      print("Received subscription request from {} for topic {}".format(
        clientid, request.topic
      ))
      self.topics[request.topic].append(clientid)

  def _fake_publisher(self):
    while True:
      if len(self.topics):
        topic = random.choice(list(self.topics.keys()))
        self.publish(topic, str(uuid.uuid4()))
      time.sleep(1)


def serve():
  server = grpc.server(futures.ThreadPoolExecutor())
  pubsub_pb2.add_PubSubServicer_to_server(PubSub(), server)
  server.add_insecure_port('[::]:50051')
  server.start()
  try:
    while True:
      time.sleep(_ONE_DAY_IN_SECONDS)
  except KeyboardInterrupt:
    server.stop(0)


if __name__ == '__main__':
  serve()


# ------------------------------ pubsub_client.py

import time
import concurrent


import grpc

import pubsub_pb2

def client(stub):
  def my_topics():
    yield pubsub_pb2.SubscriptionRequest(topic="grpc")
    time.sleep(5)
    yield pubsub_pb2.SubscriptionRequest(topic="donkeys")

  responses = stub.subscribe(my_topics())
  for response in responses:
    print("Received topic {} message {}".format(
      response.topic, response.message))


def run():
  channel = grpc.insecure_channel('localhost:50051')
  stub = pubsub_pb2.PubSubStub(channel)

  with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [
      executor.submit(client, stub)
    ]

    for future in concurrent.futures.as_completed(futures):
      try:
        future.result()
      except Exception as exc:
        print("Generated an exception {}".format(exc))


if __name__ == '__main__':
  run()

 

On Sunday, 28 August 2016 02:52:15 UTC+2, Paul Grosu wrote:
>
> Hi Mark,
>
> Might the following PubSub via gRPC example be useful:
>
>
> https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-java/tree/master/grpc
>
> Paul
>
> On Friday, August 26, 2016 at 6:13:32 AM UTC-4, Mark NS wrote:
>>
>> Hi,
>>
>> I'm looking into using gRPC for a line-of-business app with 10-100 users. 
>> Python server side, C# client side. The server side application is 
>> stateful, and clients will have both request-response and streaming 
>> communications.
>>  
>> I was previously working with ZeroMQ (changing to gRPC for firewall 
>> traversal etc), which has pubsub patterns built in. Are there any examples 
>> of such a pattern implemented with gRPC? (I saw the post about cloud 
>> pubsub, but I am looking for something brokerless).
>>
>> - Mark
>>
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/8a783b81-1fe2-4524-9e9a-14c371427802%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to