GitHub user Samreay edited a comment on the discussion: [Python Client] How to 
subscribe to two topics - both with different schemas?

What I've come up with doesn't seem optimal, but it works by hiding away 
multiple blocking synchronous code blocks into threads and then putting the 
messages received into an janus.Queue (asyncio.Queues are not thread safe, so 
we swapped to janus). This all works under FastAPI to support the existing REST 
endpoints we serve.

```python

class Consumer(Thread):
    def __init__(self, queue: janus.Queue, topic: str, schema: 
pulsar.schema.AvroSchema):
        super().__init__()
        self.queue = queue
        self.pulsar_client = pulsar.Client("pulsar://pulsar:6650")
        self.topic = topic
        self.schema = schema

    def run(self):

        reader = self.pulsar_client.create_reader(
            self.topic, start_message_id=pulsar.MessageId.latest, 
schema=self.schema
        )
        while True:
            msg = reader.read_next()
            topic = msg.topic_name()
            if "/" in topic:
                topic = topic.split("/")[-1]
            message = from_avro(msg.value())
           self.queue.sync_q.put_nowait(message)
```

Then when the FastAPI app starts you can kick off the threads, and also launch 
an `async` method to get from the `asyncio.Queue` passed into the Consumer 
threads.

```python
app = FastAPI()
app.queue = asyncio.Queue()

@app.on_event("startup")
async def startup_event():
    consumer_preds = Consumer(app.queue, "predictions", Prediction.schema())
    consumer_preds.start()
    consumer_trades = Consumer(app.queue, "trades", Trade.schema())
    consumer_trades.start()

    asyncio.create_task(receive_message())
```

`Prediction.schema` and what not are just some wrappers we have to convert 
pydantic to avro schemas.

GitHub link: 
https://github.com/apache/pulsar/discussions/20263#discussioncomment-5845219

----
This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org

Reply via email to