jiakme opened a new issue, #549:
URL: https://github.com/apache/rocketmq-spring/issues/549

   1. Please describe the issue you observed:
   - single java springboot server instance  with multi DefaultLitePullConsumer
   - invoke DefaultLitePullConsumer#poll() in loop
   - some queue don't be consumed
   
    2. Please tell us about your environment:
    - system: linux
    - springboot: 2.4.2
    - rocketmq-spring: 2.2.2
    3. Other information
   
   `
   // DefaultLitePullConsumer create
   private void initRocketMQPushConsumer() throws MQClientException {
           log.info("消息优先级发送: init pull consumer start");
   
           List<DefaultLitePullConsumer> pullConsumers = new ArrayList<>();
   
           DefaultLitePullConsumer defaultLitePullConsumer = 
getDefaultLitePullConsumer("high");
           pullConsumers.add(defaultLitePullConsumer);
   
           DefaultLitePullConsumer defaultLitePullConsumer_1 = 
getDefaultLitePullConsumer("low");
           pullConsumers.add(defaultLitePullConsumer_1);
   
           PULL_CONSUMERS = Collections.unmodifiableList(pullConsumers);
       }
   
       private DefaultLitePullConsumer getDefaultLitePullConsumer(String tag) 
throws MQClientException {
           RocketMQProperties.Consumer consumerConfig = new 
RocketMQProperties.Consumer();
           consumerConfig.setGroup(MqConfig.CATEGORY_SMART_CAMPUS_APP_CENTER + 
"_priority");
           // TODO: 
           consumerConfig.setTopic(TOPIC);
           consumerConfig.setPullBatchSize(50);
           DefaultLitePullConsumer defaultLitePullConsumer;
   
           String separator = "||";
   
   //        consumerConfig.setSelectorExpression("priority_9" + separator + 
"priority_8" + separator + "priority_7");
           consumerConfig.setSelectorExpression(tag);
           defaultLitePullConsumer = buildPullConsumer(consumerConfig);
           return defaultLitePullConsumer;
       }
       private DefaultLitePullConsumer 
buildPullConsumer(RocketMQProperties.Consumer consumerConfig) throws 
MQClientException {
           /**
            * {@link 
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration#defaultLitePullConsumer(RocketMQProperties)}
            */
           String nameServer = rocketMQProperties.getNameServer();
           String groupName = consumerConfig.getGroup();
           String topicName = consumerConfig.getTopic();
           Assert.hasText(nameServer, "[rocketmq.name-server] must not be 
null");
           Assert.hasText(groupName, "[rocketmq.pull-consumer.group] must not 
be null");
           Assert.hasText(topicName, "[rocketmq.pull-consumer.topic] must not 
be null");
   
           String accessChannel = rocketMQProperties.getAccessChannel();
           MessageModel messageModel = 
MessageModel.valueOf(consumerConfig.getMessageModel());
           SelectorType selectorType = 
SelectorType.valueOf(consumerConfig.getSelectorType());
           String selectorExpression = consumerConfig.getSelectorExpression();
           String ak = consumerConfig.getAccessKey();
           String sk = consumerConfig.getSecretKey();
           int pullBatchSize = consumerConfig.getPullBatchSize();
           boolean useTLS = consumerConfig.isTlsEnable();
   
           DefaultLitePullConsumer litePullConsumer = 
RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
                   groupName, topicName, messageModel, selectorType, 
selectorExpression, ak, sk, pullBatchSize, useTLS);
           
litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
           
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
           litePullConsumer.setNamespace(consumerConfig.getNamespace());
           litePullConsumer.setAllocateMessageQueueStrategy(new 
AllocateMessageQueueAveragely());
           litePullConsumer.setAutoCommit(true);
   //        
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
   
           return litePullConsumer;
       }
   
   
   // invoke poll
   for (DefaultLitePullConsumer pullConsumer : pullConsumers) {
                   for (;;) {
                       List<MessageExt> messageExts = pullConsumer.poll();
                       if (CollUtil.isNotEmpty(messageExts)) {
                           System.out.println(JSON.toJSONString(pullConsumer) + 
 "::::::::::" + JSON.toJSONString(messageExts));
                           continue;
                       }
                       break;
                   }
   }
   
   `
   rocketmq-dashboard:
   
![image](https://user-images.githubusercontent.com/8815138/231166815-e57cf0e6-e79c-455d-a159-17e6894d2be1.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to