2259289435 commented on issue #566: Auto create topic question
URL: https://github.com/apache/rocketmq/issues/566#issuecomment-582813212
 
 
   clinet,broker version 4.6.0,if client,broker version not equals,get broker 
createtopickey first
   producer:
   SendMessageHook.sendMessageBefore(final SendMessageContext context)
   1. getTopic
   String newTopic = 
NamespaceUtil.withoutNamespace(context.getMessage().getTopic());
   2.createTopicForCache
   createTopicCache.get(newTopic, new Callable<Boolean>() {
               @Override
               public Boolean call() throws Exception {
                   Set<String> topicList = 
context.getProducer().getmQClientFactory().getMQClientAPIImpl().getTopicListFromNameServer(20000L).getTopicList();
                   if(!topicList.contains(newTopic)) {
                       log.info("producerGroup : {}, newTopic : {} begin 
autoCreate", context.getProducer().getDefaultMQProducer().getProducerGroup(), 
newTopic);
                       
context.getProducer().getmQClientFactory().getMQAdminImpl().createTopic(MixAllPlus.AUTO_CREATE_TOPIC_KEY_TOPIC,
 newTopic, DEFAULT_AUTO_CREATE_TOPIC_QUEUE_SIZE);
                       log.info("producerGroup : {}, newTopic : {} end 
autoCreate", context.getProducer().getDefaultMQProducer().getProducerGroup(), 
newTopic);
                   }
                   return true;
               }
           });
   
   pushconsumer.
   1. beforeStart(pushconsumer consumer)                    //custom method
   2. create instance
   DefaultMQPushConsumerImpl impl = consumer.getDefaultMQPushConsumerImpl();
           if (consumer.getMessageModel() == MessageModel.CLUSTERING) {
               consumer.changeInstanceNameToPID();
           }
           RPCHook rpcHook = ReflectUtil.on(impl).field("rpcHook").get();
           MQClientInstance instance = 
MQClientManager.getInstance().getOrCreateMQClientInstance(consumer, rpcHook);
           instance.start();
   3.getNewTopic
   Set<String> newTopics = Sets.newHashSet();
           Set<String> topicList = 
instance.getMQClientAPIImpl().getTopicListFromNameServer(20000L).getTopicList();
           for(String topic : impl.getSubscriptionInner().keySet()) {
               String newTopic = NamespaceUtil.withoutNamespace(topic);
               if(!topicList.contains(newTopic)) {
                   newTopics.add(newTopic);
               }
           }
   4.createTopic not thread safe
   for(String newTopic : newTopics) {
               log.info("consumerGroup : {}, newTopic : {} begin autoCreate", 
consumer.getConsumerGroup(), newTopic);
               
instance.getMQAdminImpl().createTopic(instance.getDefaultMQProducer().getCreateTopicKey(),
 newTopic, DEFAULT_AUTO_CREATE_TOPIC_QUEUE_SIZE);
               log.info("consumerGroup : {}, newTopic : {} end autoCreate", 
consumer.getConsumerGroup(), newTopic);
           }
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to