GitHub user jiakme edited a comment on the discussion: single instance with multi DefaultLitePullConsumer(use same topic and different tag) don't poll message right
```java private void initRocketMQPushConsumer() throws MQClientException { List<DefaultLitePullConsumer> pullConsumers = new ArrayList<>(); DefaultLitePullConsumer defaultLitePullConsumer = getDefaultLitePullConsumer("high"); pullConsumers.add(defaultLitePullConsumer); DefaultLitePullConsumer defaultLitePullConsumer_1 = getDefaultLitePullConsumer("low"); pullConsumers.add(defaultLitePullConsumer_1); PULL_CONSUMERS = Collections.unmodifiableList(pullConsumers); } private DefaultLitePullConsumer getDefaultLitePullConsumer(String tag) throws MQClientException { RocketMQProperties.Consumer consumerConfig = new RocketMQProperties.Consumer(); consumerConfig.setGroup(MqConfig.CATEGORY_SMART_CAMPUS_APP_CENTER + "_priority"); consumerConfig.setTopic(TOPIC); consumerConfig.setPullBatchSize(50); DefaultLitePullConsumer defaultLitePullConsumer; consumerConfig.setSelectorExpression(tag); defaultLitePullConsumer = buildPullConsumer(consumerConfig); return defaultLitePullConsumer; } private DefaultLitePullConsumer buildPullConsumer(RocketMQProperties.Consumer consumerConfig) throws MQClientException { /** * {@link org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration#defaultLitePullConsumer(RocketMQProperties)} */ String nameServer = rocketMQProperties.getNameServer(); String groupName = consumerConfig.getGroup(); String topicName = consumerConfig.getTopic(); Assert.hasText(nameServer, "[rocketmq.name-server] must not be null"); Assert.hasText(groupName, "[rocketmq.pull-consumer.group] must not be null"); Assert.hasText(topicName, "[rocketmq.pull-consumer.topic] must not be null"); String accessChannel = rocketMQProperties.getAccessChannel(); MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel()); SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType()); String selectorExpression = consumerConfig.getSelectorExpression(); String ak = consumerConfig.getAccessKey(); String sk = consumerConfig.getSecretKey(); int pullBatchSize = consumerConfig.getPullBatchSize(); boolean useTLS = consumerConfig.isTlsEnable(); DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel, groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS); litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace()); litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic()); litePullConsumer.setNamespace(consumerConfig.getNamespace()); litePullConsumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()); litePullConsumer.setAutoCommit(true); return litePullConsumer; } ``` GitHub link: https://github.com/apache/rocketmq/discussions/6578#discussioncomment-5587162 ---- This is an automatically sent email for dev@rocketmq.apache.org. To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org