2259289435 edited a comment on issue #566: Auto create topic question
URL: https://github.com/apache/rocketmq/issues/566#issuecomment-582813212
 
 
   clinet,broker version 4.6.0,if client,broker version not equals,get broker 
createtopickey first
   producer:
   SendMessageHook.sendMessageBefore(final SendMessageContext context)
   1. getTopic
   String newTopic = 
NamespaceUtil.withoutNamespace(context.getMessage().getTopic());
   2.createTopicForCache
   createTopicCache.get(newTopic, new Callable<Boolean>() {
               @Override
               public Boolean call() throws Exception {
                   Set<String> topicList = 
context.getProducer().getmQClientFactory().getMQClientAPIImpl().getTopicListFromNameServer(20000L).getTopicList();
                   if(!topicList.contains(newTopic)) {
                       log.info("producerGroup : {}, newTopic : {} begin 
autoCreate", context.getProducer().getDefaultMQProducer().getProducerGroup(), 
newTopic);
                       
context.getProducer().getmQClientFactory().getMQAdminImpl().createTopic(MixAllPlus.AUTO_CREATE_TOPIC_KEY_TOPIC,
 newTopic, DEFAULT_AUTO_CREATE_TOPIC_QUEUE_SIZE);
                       log.info("producerGroup : {}, newTopic : {} end 
autoCreate", context.getProducer().getDefaultMQProducer().getProducerGroup(), 
newTopic);
                   }
                   return true;
               }
           });
   
   pushconsumer.
   1. beforeStart(pushconsumer consumer)                    //custom method
   2. create instance
   DefaultMQPushConsumerImpl impl = consumer.getDefaultMQPushConsumerImpl();
           if (consumer.getMessageModel() == MessageModel.CLUSTERING) {
               consumer.changeInstanceNameToPID();
           }
           RPCHook rpcHook = ReflectUtil.on(impl).field("rpcHook").get();
           MQClientInstance instance = 
MQClientManager.getInstance().getOrCreateMQClientInstance(consumer, rpcHook);
           instance.start();
   3.getNewTopic
   Set<String> newTopics = Sets.newHashSet();
           Set<String> topicList = 
instance.getMQClientAPIImpl().getTopicListFromNameServer(20000L).getTopicList();
           for(String topic : impl.getSubscriptionInner().keySet()) {
               String newTopic = NamespaceUtil.withoutNamespace(topic);
               if(!topicList.contains(newTopic)) {
                   newTopics.add(newTopic);
               }
           }
   4.createTopic 
   for(final String newTopic : newTopics) {
               createTopicCache.get(newTopic, new Callable<Boolean>() {
                   @Override
                   public Boolean call() throws Exception {
                       log.info("consumerGroup : {}, newTopic : {} begin 
autoCreate", consumer.getConsumerGroup(), newTopic);
                       
instance.getMQAdminImpl().createTopic(instance.getDefaultMQProducer().getCreateTopicKey(),
 newTopic, DEFAULT_AUTO_CREATE_TOPIC_QUEUE_SIZE);
                       log.info("consumerGroup : {}, newTopic : {} end 
autoCreate", consumer.getConsumerGroup(), newTopic);
                       return true;
                   }
               });
           }
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to