This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 55e0cdb2a fix: IndexOutOfBoundsException when process pop response 
(#7003)
55e0cdb2a is described below

commit 55e0cdb2af3ab75a6d892f919d60797f17a99fda
Author: redlsz <[email protected]>
AuthorDate: Tue Aug 15 19:19:45 2023 +0800

    fix: IndexOutOfBoundsException when process pop response (#7003)
---
 .../main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java   | 5 ++++-
 .../apache/rocketmq/proxy/service/message/LocalMessageService.java   | 5 ++++-
 .../org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java  | 4 ++++
 3 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 708a6acd1..5101ffc8e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1174,7 +1174,10 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
                     
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
                 continue;
             }
-            key = 
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), 
messageExt.getQueueId());
+            // Value of POP_CK is used to determine whether it is a pop retry,
+            // cause topic could be rewritten by broker.
+            key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
+                messageExt.getProperty(MessageConst.PROPERTY_POP_CK), 
messageExt.getQueueId());
             if (!sortMap.containsKey(key)) {
                 sortMap.put(key, new ArrayList<>(4));
             }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index 115c140ff..eb2c4d9ee 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -249,7 +249,10 @@ public class LocalMessageService implements MessageService 
{
                 // <topicMark@queueId, msg queueOffset>
                 Map<String, List<Long>> sortMap = new HashMap<>(16);
                 for (MessageExt messageExt : messageExtList) {
-                    String key = 
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), 
messageExt.getQueueId());
+                    // Value of POP_CK is used to determine whether it is a 
pop retry,
+                    // cause topic could be rewritten by broker.
+                    String key = 
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
+                        messageExt.getProperty(MessageConst.PROPERTY_POP_CK), 
messageExt.getQueueId());
                     if (!sortMap.containsKey(key)) {
                         sortMap.put(key, new ArrayList<>(4));
                     }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
index 9a5fa89ab..13094331e 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
@@ -282,6 +282,10 @@ public class ExtraInfoUtil {
         return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? 
RETRY_TOPIC : NORMAL_TOPIC) + "@" + key;
     }
 
+    public static String getStartOffsetInfoMapKey(String topic, String popCk, 
long key) {
+        return ((topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || popCk != 
null) ? RETRY_TOPIC : NORMAL_TOPIC) + "@" + key;
+    }
+
     public static String getQueueOffsetKeyValueKey(long queueId, long 
queueOffset) {
         return QUEUE_OFFSET + queueId + "%" + queueOffset;
     }

Reply via email to