shirenchuang opened a new issue, #7332:
URL: https://github.com/apache/rocketmq/issues/7332
### Before Creating the Enhancement Request
- [X] I have confirmed that this should be classified as an enhancement
rather than a bug/feature.
### Summary
消费者在消费完成之后, 需要处理消费的结果, 是成功或失败
`ConsumeMessageConcurrentlyService#processConsumeResult`
光看代码理解的意思如下
1. 如果处理结果为**CONSUME_SUCCESS**,无需重试, 则记录一下监控指标, 消费成功TPS 、和 消费失败TPS ;
这里用户是可以自己通过设置`context.setAckIndex()`来设置ACK的索引值的; 比如你本批次消息量10条, 你这里设置为4;
则表示前面5条成功,后面5条失败; 当然了,这里并不会给失败的做重试;
2. 如果处理结果为**RECONSUME_LATER**, 则表示需要重试, 将该批次的所有消息遍历同步发送回Broker中;
如果某个同步请求失败,则会记录下来; 一会在本地客户端重新消费 ;
**用户可自己决定从哪条消息开始重试**
上面其实已经说了, 用户可以通过入参`ConsumeConcurrentlyContext`来设置ackIndex控制重试的起始索引;
```java
/**
* 石臻臻的杂货铺
* vx: shiyanzu001
**/
consumer.registerMessageListener((MessageListenerConcurrently) (msg,
context) -> {
System.out.printf(" ----- %s 消费消息: %s 本批次大小: %s ------ ",
Thread.currentThread().getName(), msg, msg.size());
for (int i = 0; i < msg.size(); i++) {
System.out.println("第 " + i + " 条消息, MSG: " + msg.get(i));
try{
// 消费逻辑
}catch(Exception e){
// 这条消息失败, 从这条消息以及其后的消息都需要重试
context.setAckIndex(i-1);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
```
<font color=red><b> PS: 目前所看为版本(5.1.3), 我始终觉得ackIndex这个设置有点问题; </b></font>
1. 消费成功的时候,设置ackIndex才会生效,既然用户返回的都是成功,则表示它并不需要重试; 设置这个值总感觉很别扭。
2. 消费失败的时候,ackIndex被强制设置为了-1,表示所有的都要重试,
正常情况来说,批量消费的时候,碰到其中一条失败,那么就应该从这条的索引开始往后的消息都需要重试,前面已经消费的并且成功的并不需要重试;
<font color=red><b> 关于这一点,我更倾向于这是一个Bug; 或者设计缺陷 </b></font>
**优化建议:**
1. 为了兼容之前的逻辑,成功的状态的逻辑就不去修改了
2. 失败的情况,没有必要强制设置为-1,导致全部重试, 让用户自己也能够通过ackIndex来设置重试的部分消息,而不用全部重试
### Motivation
。
### Describe the Solution You'd Like
。
### Describe Alternatives You've Considered
。
### Additional Context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]