This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 55e0cdb2a fix: IndexOutOfBoundsException when process pop response
(#7003)
55e0cdb2a is described below
commit 55e0cdb2af3ab75a6d892f919d60797f17a99fda
Author: redlsz <[email protected]>
AuthorDate: Tue Aug 15 19:19:45 2023 +0800
fix: IndexOutOfBoundsException when process pop response (#7003)
---
.../main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 5 ++++-
.../apache/rocketmq/proxy/service/message/LocalMessageService.java | 5 ++++-
.../org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java | 4 ++++
3 files changed, 12 insertions(+), 2 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 708a6acd1..5101ffc8e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1174,7 +1174,10 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
continue;
}
- key =
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
messageExt.getQueueId());
+ // Value of POP_CK is used to determine whether it is a pop retry,
+ // cause topic could be rewritten by broker.
+ key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
+ messageExt.getProperty(MessageConst.PROPERTY_POP_CK),
messageExt.getQueueId());
if (!sortMap.containsKey(key)) {
sortMap.put(key, new ArrayList<>(4));
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index 115c140ff..eb2c4d9ee 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -249,7 +249,10 @@ public class LocalMessageService implements MessageService
{
// <topicMark@queueId, msg queueOffset>
Map<String, List<Long>> sortMap = new HashMap<>(16);
for (MessageExt messageExt : messageExtList) {
- String key =
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
messageExt.getQueueId());
+ // Value of POP_CK is used to determine whether it is a
pop retry,
+ // cause topic could be rewritten by broker.
+ String key =
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
+ messageExt.getProperty(MessageConst.PROPERTY_POP_CK),
messageExt.getQueueId());
if (!sortMap.containsKey(key)) {
sortMap.put(key, new ArrayList<>(4));
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
index 9a5fa89ab..13094331e 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
@@ -282,6 +282,10 @@ public class ExtraInfoUtil {
return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ?
RETRY_TOPIC : NORMAL_TOPIC) + "@" + key;
}
+ public static String getStartOffsetInfoMapKey(String topic, String popCk,
long key) {
+ return ((topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || popCk !=
null) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + key;
+ }
+
public static String getQueueOffsetKeyValueKey(long queueId, long
queueOffset) {
return QUEUE_OFFSET + queueId + "%" + queueOffset;
}