2259289435 edited a comment on issue #566: Auto create topic question URL: https://github.com/apache/rocketmq/issues/566#issuecomment-582813212 clinet,broker version 4.6.0,if client,broker version not equals,get broker createtopickey first producer: SendMessageHook.sendMessageBefore(final SendMessageContext context) 1. getTopic String newTopic = NamespaceUtil.withoutNamespace(context.getMessage().getTopic()); 2.createTopicForCache createTopicCache.get(newTopic, new Callable<Boolean>() { @Override public Boolean call() throws Exception { Set<String> topicList = context.getProducer().getmQClientFactory().getMQClientAPIImpl().getTopicListFromNameServer(20000L).getTopicList(); if(!topicList.contains(newTopic)) { log.info("producerGroup : {}, newTopic : {} begin autoCreate", context.getProducer().getDefaultMQProducer().getProducerGroup(), newTopic); context.getProducer().getmQClientFactory().getMQAdminImpl().createTopic(MixAllPlus.AUTO_CREATE_TOPIC_KEY_TOPIC, newTopic, DEFAULT_AUTO_CREATE_TOPIC_QUEUE_SIZE); log.info("producerGroup : {}, newTopic : {} end autoCreate", context.getProducer().getDefaultMQProducer().getProducerGroup(), newTopic); } return true; } }); pushconsumer. 1. beforeStart(pushconsumer consumer) //custom method 2. create instance DefaultMQPushConsumerImpl impl = consumer.getDefaultMQPushConsumerImpl(); if (consumer.getMessageModel() == MessageModel.CLUSTERING) { consumer.changeInstanceNameToPID(); } RPCHook rpcHook = ReflectUtil.on(impl).field("rpcHook").get(); MQClientInstance instance = MQClientManager.getInstance().getOrCreateMQClientInstance(consumer, rpcHook); instance.start(); 3.getNewTopic Set<String> newTopics = Sets.newHashSet(); Set<String> topicList = instance.getMQClientAPIImpl().getTopicListFromNameServer(20000L).getTopicList(); for(String topic : impl.getSubscriptionInner().keySet()) { String newTopic = NamespaceUtil.withoutNamespace(topic); if(!topicList.contains(newTopic)) { newTopics.add(newTopic); } } 4.createTopic for(final String newTopic : newTopics) { createTopicCache.get(newTopic, new Callable<Boolean>() { @Override public Boolean call() throws Exception { log.info("consumerGroup : {}, newTopic : {} begin autoCreate", consumer.getConsumerGroup(), newTopic); instance.getMQAdminImpl().createTopic(instance.getDefaultMQProducer().getCreateTopicKey(), newTopic, DEFAULT_AUTO_CREATE_TOPIC_QUEUE_SIZE); log.info("consumerGroup : {}, newTopic : {} end autoCreate", consumer.getConsumerGroup(), newTopic); return true; } }); }
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
