Maozy-fnst opened a new issue #101:
URL: https://github.com/apache/rocketmq-client-python/issues/101


   **问题描述**
   
   需要在多线程中创建Producer实例向rocketmq发送数据,但是在社区的文档中关于多线程应用场景
   资料很少,因此在外部找到一部分内容:
   
   _Thread Safety
   The producer is thread-safe, you can just use it in your business solution._
   
   具体见 https://rocketmq.apache.org/docs/best-practice-producer/
   
   
   此外在 https://github.com/apache/rocketmq-client-python/pull/100 也找到如下信息:
   
   _First, don't call start() and shutdown() under multi-threads; secondly, C++ 
SDK has retry logic.
   You should report the error of sending, but call shutdown() is unrecommended.
   send_sync is thread-safe, protect it with mutex is unnecessary._
   
   在阿里云的站点
   
https://help.aliyun.com/document_detail/55385.html?spm=a2c4g.11186623.6.602.e6ad311fJdsaNR
   中也提到:
   
   _消息队列RocketMQ版的消费者和生产者客户端对象是线程安全的,可以在多个线程之间共享使用。
   
   您可以在服务器上(或者多台服务器)部署多个生产者和消费者实例,也可以在同一个生产者或消
   费者实例里采用多线程发送或接收消息,从而提高消息发送或接收TPS。
   
   注意 请避免为每个线程创建一个客户端实例。_
   
   总结一下就是:
   
   - 避免在多线程中为每一个线程创建一个实例
   - Producer是线程安全的,意味着在多线程中可以共享对象,不需要加锁
   
   基于这两点,结合社区提供的样例,有如下实现:
   ```
   import time
   import json
   from rocketmq.client import Producer, Message, SendStatus
   from confparse import conf
   
   class RocketMQ():
   
       def __init__(self):
           self.producer_name = 'PID-bstperf' + '-' + str(time.time())
           self.producer = Producer(self.producer_name)
           self.producer.set_name_server_address(conf.mqaddr)
           self.producer.set_group(conf.mqgroup)
           self.producer.start()
   
       def push_message_to_rocketmq(self, data):
           """
           Push message to rocketmq
           """
           body = json.dumps(data)
           msg = Message(conf.mqtopic)
   
           msg.set_keys('keys')
           msg.set_tags('tags')
           msg.set_body(body)
   
           ret = self.producer.send_sync(msg)
           return ret
   
       def shutdown(self):
           self.producer.shutdown()
   ```
   在使用时创建一个RocketMQ实例,然后在多线程中共享这个实例,如下:
   
   ```
   mq = RocketMQ()
   t = threading.Thread(target = func,
                        args = (mq, arg1, arg2, ...),
                        name = dom.name())
   ```
   
   最后在线程函数中发数据
   
   ```mq.push_message_to_rocketmq(data)```
   
   但是会出现错误:
   
   _ProducerSendSyncFailed: No route info of this topic: 
dawn-bcmq-performance-topic,error:-1,in file 
rocketmq-client-cpp/src/producer/DefaultMQProducerImpl.cpp line:418_
   
   单独为每个线程创建一个Producer实例时可以成功发送数据到rocketmq,说明上述错误的原因可以排除nameserver
   设置,server端重启或者连接异常的影响,所以按照推荐的在多线程中共享实例的方法出现上述错误的原因是什么?
   是使用的方式有误还是python SDK Producer本身对多线程的支持有缺陷,期待社区大佬们的意见和建议。
   
   更详细的信息可以移步:
   
   
https://www.yuque.com/docs/share/8a0ca825-b98d-470a-8dcf-368eb9450f0e?#%20%E3%80%8ARocketMQ%20Python/C++%E5%AE%A2%E6%88%B7%E7%AB%AF%E9%97%AE%E9%A2%98%E3%80%8B
   
   感谢
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to