This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new ce27e1fc64 [ISSUE #8332] fix: ack msg which has reached
maxReconsumeTimes
ce27e1fc64 is described below
commit ce27e1fc643d4b6a47b7a63784fab7e8070322e9
Author: cserwen <[email protected]>
AuthorDate: Tue Jul 30 09:11:55 2024 +0800
[ISSUE #8332] fix: ack msg which has reached maxReconsumeTimes
Co-authored-by: dengzhiwen1 <[email protected]>
---
.../consumer/ConsumeMessagePopConcurrentlyService.java | 2 +-
.../client/impl/consumer/DefaultMQPushConsumerImpl.java | 14 ++++++++++++--
2 files changed, 13 insertions(+), 3 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
index 3713d1aba4..d519187110 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessagePopConcurrentlyService.java
@@ -471,7 +471,7 @@ public class ConsumeMessagePopConcurrentlyService
implements ConsumeMessageServi
processQueue.decFoundMsg(-msgs.size());
}
- log.warn("processQueue invalid. isDropped={}, isPopTimeout={},
messageQueue={}, msgs={}",
+ log.warn("processQueue invalid or popTimeout. isDropped={},
isPopTimeout={}, messageQueue={}, msgs={}",
processQueue.isDropped(), isPopTimeout(),
messageQueue, msgs);
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 3e832e5a9a..e66a9825f3 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -621,10 +621,9 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
private PopResult processPopResult(final PopResult popResult, final
SubscriptionData subscriptionData) {
if (PopStatus.FOUND == popResult.getPopStatus()) {
List<MessageExt> msgFoundList = popResult.getMsgFoundList();
- List<MessageExt> msgListFilterAgain = msgFoundList;
+ List<MessageExt> msgListFilterAgain = new
ArrayList<>(popResult.getMsgFoundList().size());
if (!subscriptionData.getTagsSet().isEmpty() &&
!subscriptionData.isClassFilterMode()
&& popResult.getMsgFoundList().size() > 0) {
- msgListFilterAgain = new
ArrayList<>(popResult.getMsgFoundList().size());
for (MessageExt msg : popResult.getMsgFoundList()) {
if (msg.getTags() != null) {
if
(subscriptionData.getTagsSet().contains(msg.getTags())) {
@@ -632,6 +631,8 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
}
}
}
+ } else {
+ msgListFilterAgain.addAll(msgFoundList);
}
if (!this.filterMessageHookList.isEmpty()) {
@@ -649,6 +650,15 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
}
}
+ Iterator<MessageExt> iterator = msgListFilterAgain.iterator();
+ while (iterator.hasNext()) {
+ MessageExt msg = iterator.next();
+ if (msg.getReconsumeTimes() >
defaultMQPushConsumer.getMaxReconsumeTimes()) {
+ iterator.remove();
+ log.info("Reconsume times has reached {}, so ack msg={}",
msg.getReconsumeTimes(), msg);
+ }
+ }
+
if (msgFoundList.size() != msgListFilterAgain.size()) {
for (MessageExt msg : msgFoundList) {
if (!msgListFilterAgain.contains(msg)) {