GitHub user jiakme edited a comment on the discussion: single instance with 
multi DefaultLitePullConsumer(use same topic and different tag) don't poll 
message right

```java
private void initRocketMQPushConsumer() throws MQClientException {

    List<DefaultLitePullConsumer> pullConsumers = new ArrayList<>();

    DefaultLitePullConsumer defaultLitePullConsumer = 
getDefaultLitePullConsumer("high");
    pullConsumers.add(defaultLitePullConsumer);

    DefaultLitePullConsumer defaultLitePullConsumer_1 = 
getDefaultLitePullConsumer("low");
    pullConsumers.add(defaultLitePullConsumer_1);

    PULL_CONSUMERS = Collections.unmodifiableList(pullConsumers);
}

private DefaultLitePullConsumer getDefaultLitePullConsumer(String tag) throws 
MQClientException {
    RocketMQProperties.Consumer consumerConfig = new 
RocketMQProperties.Consumer();
    consumerConfig.setGroup(MqConfig.CATEGORY_SMART_CAMPUS_APP_CENTER + 
"_priority");

    consumerConfig.setTopic(TOPIC);
    consumerConfig.setPullBatchSize(50);
    DefaultLitePullConsumer defaultLitePullConsumer;

    consumerConfig.setSelectorExpression(tag);
    defaultLitePullConsumer = buildPullConsumer(consumerConfig);
    return defaultLitePullConsumer;
}
private DefaultLitePullConsumer buildPullConsumer(RocketMQProperties.Consumer 
consumerConfig) throws MQClientException {
    /**
     * {@link 
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration#defaultLitePullConsumer(RocketMQProperties)}
     */
    String nameServer = rocketMQProperties.getNameServer();
    String groupName = consumerConfig.getGroup();
    String topicName = consumerConfig.getTopic();
    Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
    Assert.hasText(groupName, "[rocketmq.pull-consumer.group] must not be 
null");
    Assert.hasText(topicName, "[rocketmq.pull-consumer.topic] must not be 
null");

    String accessChannel = rocketMQProperties.getAccessChannel();
    MessageModel messageModel = 
MessageModel.valueOf(consumerConfig.getMessageModel());
    SelectorType selectorType = 
SelectorType.valueOf(consumerConfig.getSelectorType());
    String selectorExpression = consumerConfig.getSelectorExpression();
    String ak = consumerConfig.getAccessKey();
    String sk = consumerConfig.getSecretKey();
    int pullBatchSize = consumerConfig.getPullBatchSize();
    boolean useTLS = consumerConfig.isTlsEnable();

    DefaultLitePullConsumer litePullConsumer = 
RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
            groupName, topicName, messageModel, selectorType, 
selectorExpression, ak, sk, pullBatchSize, useTLS);
    litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
    
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
    litePullConsumer.setNamespace(consumerConfig.getNamespace());
    litePullConsumer.setAllocateMessageQueueStrategy(new 
AllocateMessageQueueAveragely());
    litePullConsumer.setAutoCommit(true);
    return litePullConsumer;
}

```

GitHub link: 
https://github.com/apache/rocketmq/discussions/6578#discussioncomment-5587162

----
This is an automatically sent email for dev@rocketmq.apache.org.
To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org

Reply via email to