jiakme opened a new issue, #549:
URL: https://github.com/apache/rocketmq-spring/issues/549
1. Please describe the issue you observed:
- single java springboot server instance with multi DefaultLitePullConsumer
- invoke DefaultLitePullConsumer#poll() in loop
- some queue don't be consumed
2. Please tell us about your environment:
- system: linux
- springboot: 2.4.2
- rocketmq-spring: 2.2.2
3. Other information
`
// DefaultLitePullConsumer create
private void initRocketMQPushConsumer() throws MQClientException {
log.info("消息优先级发送: init pull consumer start");
List<DefaultLitePullConsumer> pullConsumers = new ArrayList<>();
DefaultLitePullConsumer defaultLitePullConsumer =
getDefaultLitePullConsumer("high");
pullConsumers.add(defaultLitePullConsumer);
DefaultLitePullConsumer defaultLitePullConsumer_1 =
getDefaultLitePullConsumer("low");
pullConsumers.add(defaultLitePullConsumer_1);
PULL_CONSUMERS = Collections.unmodifiableList(pullConsumers);
}
private DefaultLitePullConsumer getDefaultLitePullConsumer(String tag)
throws MQClientException {
RocketMQProperties.Consumer consumerConfig = new
RocketMQProperties.Consumer();
consumerConfig.setGroup(MqConfig.CATEGORY_SMART_CAMPUS_APP_CENTER +
"_priority");
// TODO:
consumerConfig.setTopic(TOPIC);
consumerConfig.setPullBatchSize(50);
DefaultLitePullConsumer defaultLitePullConsumer;
String separator = "||";
// consumerConfig.setSelectorExpression("priority_9" + separator +
"priority_8" + separator + "priority_7");
consumerConfig.setSelectorExpression(tag);
defaultLitePullConsumer = buildPullConsumer(consumerConfig);
return defaultLitePullConsumer;
}
private DefaultLitePullConsumer
buildPullConsumer(RocketMQProperties.Consumer consumerConfig) throws
MQClientException {
/**
* {@link
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration#defaultLitePullConsumer(RocketMQProperties)}
*/
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
String topicName = consumerConfig.getTopic();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be
null");
Assert.hasText(groupName, "[rocketmq.pull-consumer.group] must not
be null");
Assert.hasText(topicName, "[rocketmq.pull-consumer.topic] must not
be null");
String accessChannel = rocketMQProperties.getAccessChannel();
MessageModel messageModel =
MessageModel.valueOf(consumerConfig.getMessageModel());
SelectorType selectorType =
SelectorType.valueOf(consumerConfig.getSelectorType());
String selectorExpression = consumerConfig.getSelectorExpression();
String ak = consumerConfig.getAccessKey();
String sk = consumerConfig.getSecretKey();
int pullBatchSize = consumerConfig.getPullBatchSize();
boolean useTLS = consumerConfig.isTlsEnable();
DefaultLitePullConsumer litePullConsumer =
RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
groupName, topicName, messageModel, selectorType,
selectorExpression, ak, sk, pullBatchSize, useTLS);
litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
litePullConsumer.setNamespace(consumerConfig.getNamespace());
litePullConsumer.setAllocateMessageQueueStrategy(new
AllocateMessageQueueAveragely());
litePullConsumer.setAutoCommit(true);
//
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
return litePullConsumer;
}
// invoke poll
for (DefaultLitePullConsumer pullConsumer : pullConsumers) {
for (;;) {
List<MessageExt> messageExts = pullConsumer.poll();
if (CollUtil.isNotEmpty(messageExts)) {
System.out.println(JSON.toJSONString(pullConsumer) +
"::::::::::" + JSON.toJSONString(messageExts));
continue;
}
break;
}
}
`
rocketmq-dashboard:

--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]