dengshaochun opened a new issue, #150:
URL: https://github.com/apache/pulsar-client-python/issues/150

   python: 3.6
   pulsar-client: pulsar-client[avro]==2.10.2
   celery: 5.1.2
   
   code example:
   ```python
   #!/usr/bin/env python
   
   import time
   import random
   import string
   from pulsar import Client, CompressionType
   from pulsar.schema import AvroSchema, Record, String, Integer
   
   
   def generate_random_string(length=6):
       charset = string.ascii_letters + string.digits
       random_chars = random.choices(charset, k=length)
       random_string = "".join(random_chars)
       return random_string.capitalize()
   
   
   class User(Record):
       name = String()
       age = Integer
   
   
   UserAvroSchema = AvroSchema(User)  # type: ignore
   
   
   def gen_random_data():
       return User(user=generate_random_string(), age=random.randint(0, 100))
   
   
   class PulsarDemo(object):
       def __init__(self) -> None:
           self.SERVICE_URL = "pulsar://***"
           self.TOPIC = "persistent://****"
           client = Client(service_url=self.SERVICE_URL)
           self.producer = client.create_producer(
               topic=self.TOPIC,
               schema=UserAvroSchema,
               batching_enabled=True,
               batching_max_messages=1000,
               batching_max_publish_delay_ms=1000,
               compression_type=CompressionType.SNAPPY,  # type: ignore
           )
   
       def send_callback(self, send_result, msg_id):
           print("Message published: result:{}  msg_id:{}".format(send_result, 
msg_id))
   
       def async_producer(self, cnt=1000):
           while cnt >= 0:
               data = gen_random_data()
               self.producer.send_async(
                   data,
                   callback=self.send_callback,
               )
               time.sleep(0.01)
               cnt -= 1
           self.producer.flush()
   
   
   # celery task
   from celery import shared_task
   @shared_task
   def mock_data2pulsar(cnt=1000):
       mock = PulsarDemo()
       mock.async_producer()
   
   ```
   
   an exception occurred at:
   ```text
   [2023-08-29 10:27:46,933: ERROR/ForkPoolWorker-31] Pulsar error: 
TopicNotFound
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to