GitHub user Samreay edited a comment on the discussion: [Python Client] How to subscribe to two topics - both with different schemas?
What I've come up with doesn't seem optimal, but it works by hiding away multiple blocking synchronous code blocks into threads and then putting the messages received into an janus.Queue (asyncio.Queues are not thread safe, so we swapped to janus). This all works under FastAPI to support the existing REST endpoints we serve. ```python class Consumer(Thread): def __init__(self, queue: janus.Queue, topic: str, schema: pulsar.schema.AvroSchema): super().__init__() self.queue = queue self.pulsar_client = pulsar.Client("pulsar://pulsar:6650") self.topic = topic self.schema = schema def run(self): reader = self.pulsar_client.create_reader( self.topic, start_message_id=pulsar.MessageId.latest, schema=self.schema ) while True: msg = reader.read_next() topic = msg.topic_name() if "/" in topic: topic = topic.split("/")[-1] message = from_avro(msg.value()) self.queue.sync_q.put_nowait(message) ``` Then when the FastAPI app starts you can kick off the threads, and also launch an `async` method to get from the `asyncio.Queue` passed into the Consumer threads. ```python app = FastAPI() app.queue = asyncio.Queue() @app.on_event("startup") async def startup_event(): consumer_preds = Consumer(app.queue, "predictions", Prediction.schema()) consumer_preds.start() consumer_trades = Consumer(app.queue, "trades", Trade.schema()) consumer_trades.start() asyncio.create_task(receive_message()) ``` `Prediction.schema` and what not are just some wrappers we have to convert pydantic to avro schemas. GitHub link: https://github.com/apache/pulsar/discussions/20263#discussioncomment-5845219 ---- This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org