pingww commented on code in PR #6577:
URL: https://github.com/apache/rocketmq/pull/6577#discussion_r1208768081


##########
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java:
##########
@@ -1063,73 +1063,124 @@ private PopResult processPopResponse(final String 
brokerName, final RemotingComm
         PopResult popResult = new PopResult(popStatus, msgFoundList);
         PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) 
response.decodeCommandCustomHeader(PopMessageResponseHeader.class);
         popResult.setRestNum(responseHeader.getRestNum());
+        if (popStatus != PopStatus.FOUND) {
+            return popResult;
+        }
         // it is a pop command if pop time greater than 0, we should set the 
check point info to extraInfo field
-        if (popStatus == PopStatus.FOUND) {
-            Map<String, Long> startOffsetInfo = null;
-            Map<String, List<Long>> msgOffsetInfo = null;
-            Map<String, Integer> orderCountInfo = null;
+        Map<String, Long> startOffsetInfo = null;
+        Map<String, List<Long>> msgOffsetInfo = null;
+        Map<String, Integer> orderCountInfo = null;
+        if (requestHeader instanceof PopMessageRequestHeader) {
+            popResult.setInvisibleTime(responseHeader.getInvisibleTime());
+            popResult.setPopTime(responseHeader.getPopTime());
+            startOffsetInfo = 
ExtraInfoUtil.parseStartOffsetInfo(responseHeader.getStartOffsetInfo());
+            msgOffsetInfo = 
ExtraInfoUtil.parseMsgOffsetInfo(responseHeader.getMsgOffsetInfo());
+            orderCountInfo = 
ExtraInfoUtil.parseOrderCountInfo(responseHeader.getOrderCountInfo());
+        }
+        Map<String/*topicMark@queueId*/, List<Long>/*msg queueOffset*/> sortMap
+            = buildQueueOffsetSortedMap(topic, msgFoundList);
+        Map<String, String> map = new HashMap<>(5);
+        for (MessageExt messageExt : msgFoundList) {
             if (requestHeader instanceof PopMessageRequestHeader) {
-                popResult.setInvisibleTime(responseHeader.getInvisibleTime());
-                popResult.setPopTime(responseHeader.getPopTime());
-                startOffsetInfo = 
ExtraInfoUtil.parseStartOffsetInfo(responseHeader.getStartOffsetInfo());
-                msgOffsetInfo = 
ExtraInfoUtil.parseMsgOffsetInfo(responseHeader.getMsgOffsetInfo());
-                orderCountInfo = 
ExtraInfoUtil.parseOrderCountInfo(responseHeader.getOrderCountInfo());
-            }
-            Map<String/*topicMark@queueId*/, List<Long>/*msg queueOffset*/> 
sortMap = new HashMap<>(16);
-            for (MessageExt messageExt : msgFoundList) {
-                String key = 
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), 
messageExt.getQueueId());
-                if (!sortMap.containsKey(key)) {
-                    sortMap.put(key, new ArrayList<>(4));
-                }
-                sortMap.get(key).add(messageExt.getQueueOffset());
-            }
-            Map<String, String> map = new HashMap<>(5);
-            for (MessageExt messageExt : msgFoundList) {
-                if (requestHeader instanceof PopMessageRequestHeader) {
-                    if (startOffsetInfo == null) {
-                        // we should set the check point info to extraInfo 
field , if the command is popMsg
-                        // find pop ck offset
-                        String key = messageExt.getTopic() + 
messageExt.getQueueId();
-                        if (!map.containsKey(messageExt.getTopic() + 
messageExt.getQueueId())) {
-                            map.put(key, 
ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), 
responseHeader.getPopTime(), responseHeader.getInvisibleTime(), 
responseHeader.getReviveQid(),
-                                messageExt.getTopic(), brokerName, 
messageExt.getQueueId()));
+                if (startOffsetInfo == null) {
+                    // we should set the check point info to extraInfo field , 
if the command is popMsg
+                    // find pop ck offset
+                    String key = messageExt.getTopic() + 
messageExt.getQueueId();
+                    if (!map.containsKey(messageExt.getTopic() + 
messageExt.getQueueId())) {
+                        map.put(key, 
ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), 
responseHeader.getPopTime(), responseHeader.getInvisibleTime(), 
responseHeader.getReviveQid(),
+                            messageExt.getTopic(), brokerName, 
messageExt.getQueueId()));
 
-                        }
-                        
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + 
MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
-                    } else {
-                        if 
(messageExt.getProperty(MessageConst.PROPERTY_POP_CK) == null) {
-                            String queueIdKey = 
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), 
messageExt.getQueueId());
-                            String queueOffsetKey = 
ExtraInfoUtil.getQueueOffsetMapKey(messageExt.getTopic(), 
messageExt.getQueueId(), messageExt.getQueueOffset());
-                            int index = 
sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset());
-                            Long msgQueueOffset = 
msgOffsetInfo.get(queueIdKey).get(index);
+                    }
+                    
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + 
MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
+                } else {
+                    if (messageExt.getProperty(MessageConst.PROPERTY_POP_CK) 
== null) {
+                        final String queueIdKey;
+                        final String queueOffsetKey;
+                        final int index;
+                        final Long msgQueueOffset;
+                        if (MixAll.isLmq(topic) && 
messageExt.getReconsumeTimes() == 0 && StringUtils.isNotEmpty(
+                            
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
+                            // process LMQ, LMQ topic has only 1 queue, which 
queue id is 0

Review Comment:
   Should we define this queue id 0 as a constant ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to