This is an automated email from the ASF dual-hosted git repository.

pingww pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b44a1ebc36 [ISSUE apache#6576] Fix pop lmq message in client (#6577)
b44a1ebc36 is described below

commit b44a1ebc36114f67a7c7a5dc03b44b766bdc8d9f
Author: cnScarb <[email protected]>
AuthorDate: Tue Jun 13 16:29:05 2023 +0800

    [ISSUE apache#6576] Fix pop lmq message in client (#6577)
---
 .../rocketmq/client/impl/MQClientAPIImpl.java      | 147 ++++++++++++++-------
 .../rocketmq/client/impl/MQClientAPIImplTest.java  |  77 +++++++++++
 .../java/org/apache/rocketmq/common/MixAll.java    |   1 +
 3 files changed, 177 insertions(+), 48 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 995362bb77..d89d3f93b7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1063,45 +1063,64 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
         PopResult popResult = new PopResult(popStatus, msgFoundList);
         PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) 
response.decodeCommandCustomHeader(PopMessageResponseHeader.class);
         popResult.setRestNum(responseHeader.getRestNum());
+        if (popStatus != PopStatus.FOUND) {
+            return popResult;
+        }
         // it is a pop command if pop time greater than 0, we should set the 
check point info to extraInfo field
-        if (popStatus == PopStatus.FOUND) {
-            Map<String, Long> startOffsetInfo = null;
-            Map<String, List<Long>> msgOffsetInfo = null;
-            Map<String, Integer> orderCountInfo = null;
+        Map<String, Long> startOffsetInfo = null;
+        Map<String, List<Long>> msgOffsetInfo = null;
+        Map<String, Integer> orderCountInfo = null;
+        if (requestHeader instanceof PopMessageRequestHeader) {
+            popResult.setInvisibleTime(responseHeader.getInvisibleTime());
+            popResult.setPopTime(responseHeader.getPopTime());
+            startOffsetInfo = 
ExtraInfoUtil.parseStartOffsetInfo(responseHeader.getStartOffsetInfo());
+            msgOffsetInfo = 
ExtraInfoUtil.parseMsgOffsetInfo(responseHeader.getMsgOffsetInfo());
+            orderCountInfo = 
ExtraInfoUtil.parseOrderCountInfo(responseHeader.getOrderCountInfo());
+        }
+        Map<String/*topicMark@queueId*/, List<Long>/*msg queueOffset*/> sortMap
+            = buildQueueOffsetSortedMap(topic, msgFoundList);
+        Map<String, String> map = new HashMap<>(5);
+        for (MessageExt messageExt : msgFoundList) {
             if (requestHeader instanceof PopMessageRequestHeader) {
-                popResult.setInvisibleTime(responseHeader.getInvisibleTime());
-                popResult.setPopTime(responseHeader.getPopTime());
-                startOffsetInfo = 
ExtraInfoUtil.parseStartOffsetInfo(responseHeader.getStartOffsetInfo());
-                msgOffsetInfo = 
ExtraInfoUtil.parseMsgOffsetInfo(responseHeader.getMsgOffsetInfo());
-                orderCountInfo = 
ExtraInfoUtil.parseOrderCountInfo(responseHeader.getOrderCountInfo());
-            }
-            Map<String/*topicMark@queueId*/, List<Long>/*msg queueOffset*/> 
sortMap = new HashMap<>(16);
-            for (MessageExt messageExt : msgFoundList) {
-                String key = 
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), 
messageExt.getQueueId());
-                if (!sortMap.containsKey(key)) {
-                    sortMap.put(key, new ArrayList<>(4));
-                }
-                sortMap.get(key).add(messageExt.getQueueOffset());
-            }
-            Map<String, String> map = new HashMap<>(5);
-            for (MessageExt messageExt : msgFoundList) {
-                if (requestHeader instanceof PopMessageRequestHeader) {
-                    if (startOffsetInfo == null) {
-                        // we should set the check point info to extraInfo 
field , if the command is popMsg
-                        // find pop ck offset
-                        String key = messageExt.getTopic() + 
messageExt.getQueueId();
-                        if (!map.containsKey(messageExt.getTopic() + 
messageExt.getQueueId())) {
-                            map.put(key, 
ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), 
responseHeader.getPopTime(), responseHeader.getInvisibleTime(), 
responseHeader.getReviveQid(),
-                                messageExt.getTopic(), brokerName, 
messageExt.getQueueId()));
+                if (startOffsetInfo == null) {
+                    // we should set the check point info to extraInfo field , 
if the command is popMsg
+                    // find pop ck offset
+                    String key = messageExt.getTopic() + 
messageExt.getQueueId();
+                    if (!map.containsKey(messageExt.getTopic() + 
messageExt.getQueueId())) {
+                        map.put(key, 
ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), 
responseHeader.getPopTime(), responseHeader.getInvisibleTime(), 
responseHeader.getReviveQid(),
+                            messageExt.getTopic(), brokerName, 
messageExt.getQueueId()));
 
-                        }
-                        
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + 
MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
-                    } else {
-                        if 
(messageExt.getProperty(MessageConst.PROPERTY_POP_CK) == null) {
-                            String queueIdKey = 
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), 
messageExt.getQueueId());
-                            String queueOffsetKey = 
ExtraInfoUtil.getQueueOffsetMapKey(messageExt.getTopic(), 
messageExt.getQueueId(), messageExt.getQueueOffset());
-                            int index = 
sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset());
-                            Long msgQueueOffset = 
msgOffsetInfo.get(queueIdKey).get(index);
+                    }
+                    
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + 
MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
+                } else {
+                    if (messageExt.getProperty(MessageConst.PROPERTY_POP_CK) 
== null) {
+                        final String queueIdKey;
+                        final String queueOffsetKey;
+                        final int index;
+                        final Long msgQueueOffset;
+                        if (MixAll.isLmq(topic) && 
messageExt.getReconsumeTimes() == 0 && StringUtils.isNotEmpty(
+                            
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
+                            // process LMQ, LMQ topic has only 1 queue, which 
queue id is 0
+                            queueIdKey = 
ExtraInfoUtil.getStartOffsetInfoMapKey(topic, MixAll.LMQ_QUEUE_ID);
+                            queueOffsetKey = 
ExtraInfoUtil.getQueueOffsetMapKey(topic, MixAll.LMQ_QUEUE_ID, Long.parseLong(
+                                
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
+                            index = sortMap.get(queueIdKey).indexOf(
+                                
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
+                            msgQueueOffset = 
msgOffsetInfo.get(queueIdKey).get(index);
+                            if (msgQueueOffset != Long.parseLong(
+                                
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET))) {
+                                log.warn("Queue offset[%d] of msg is strange, 
not equal to the stored in msg, %s",
+                                    msgQueueOffset, messageExt);
+                            }
+                            
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
+                                
ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), 
responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
+                                    responseHeader.getReviveQid(), topic, 
brokerName, 0, msgQueueOffset)
+                            );
+                        } else {
+                            queueIdKey = 
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), 
messageExt.getQueueId());
+                            queueOffsetKey = 
ExtraInfoUtil.getQueueOffsetMapKey(messageExt.getTopic(), 
messageExt.getQueueId(), messageExt.getQueueOffset());
+                            index = 
sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset());
+                            msgQueueOffset = 
msgOffsetInfo.get(queueIdKey).get(index);
                             if (msgQueueOffset != messageExt.getQueueOffset()) 
{
                                 log.warn("Queue offset[%d] of msg is strange, 
not equal to the stored in msg, %s", msgQueueOffset, messageExt);
                             }
@@ -1109,27 +1128,59 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
                                 
ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), 
responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
                                     responseHeader.getReviveQid(), 
messageExt.getTopic(), brokerName, messageExt.getQueueId(), msgQueueOffset)
                             );
-                            if (((PopMessageRequestHeader) 
requestHeader).isOrder() && orderCountInfo != null) {
-                                Integer count = 
orderCountInfo.get(queueOffsetKey);
-                                if (count == null) {
-                                    count = orderCountInfo.get(queueIdKey);
-                                }
-                                if (count != null && count > 0) {
-                                    messageExt.setReconsumeTimes(count);
-                                }
+                        }
+                        if (((PopMessageRequestHeader) 
requestHeader).isOrder() && orderCountInfo != null) {
+                            Integer count = orderCountInfo.get(queueOffsetKey);
+                            if (count == null) {
+                                count = orderCountInfo.get(queueIdKey);
+                            }
+                            if (count != null && count > 0) {
+                                messageExt.setReconsumeTimes(count);
                             }
                         }
                     }
-                    messageExt.getProperties().computeIfAbsent(
-                        MessageConst.PROPERTY_FIRST_POP_TIME, k -> 
String.valueOf(responseHeader.getPopTime()));
                 }
-                messageExt.setBrokerName(brokerName);
-                messageExt.setTopic(NamespaceUtil.withoutNamespace(topic, 
this.clientConfig.getNamespace()));
+                messageExt.getProperties().computeIfAbsent(
+                    MessageConst.PROPERTY_FIRST_POP_TIME, k -> 
String.valueOf(responseHeader.getPopTime()));
             }
+            messageExt.setBrokerName(brokerName);
+            messageExt.setTopic(NamespaceUtil.withoutNamespace(topic, 
this.clientConfig.getNamespace()));
         }
         return popResult;
     }
 
+    /**
+     * Build queue offset sorted map
+     *
+     * @param topic pop consumer topic
+     * @param msgFoundList popped message list
+     * @return sorted map, key is topicMark@queueId, value is sorted msg 
queueOffset list
+     */
+    private static Map<String, List<Long>> buildQueueOffsetSortedMap(String 
topic, List<MessageExt> msgFoundList) {
+        Map<String/*topicMark@queueId*/, List<Long>/*msg queueOffset*/> 
sortMap = new HashMap<>(16);
+        for (MessageExt messageExt : msgFoundList) {
+            final String key;
+            if (MixAll.isLmq(topic) && messageExt.getReconsumeTimes() == 0
+                && 
StringUtils.isNotEmpty(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH)))
 {
+                // process LMQ, LMQ topic has only 1 queue, which queue id is 0
+                key = ExtraInfoUtil.getStartOffsetInfoMapKey(
+                    
messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH), 0);
+                if (!sortMap.containsKey(key)) {
+                    sortMap.put(key, new ArrayList<>(4));
+                }
+                sortMap.get(key).add(
+                    
Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET)));
+                continue;
+            }
+            key = 
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), 
messageExt.getQueueId());
+            if (!sortMap.containsKey(key)) {
+                sortMap.put(key, new ArrayList<>(4));
+            }
+            sortMap.get(key).add(messageExt.getQueueOffset());
+        }
+        return sortMap;
+    }
+
     public MessageExt viewMessage(final String addr, final long phyoffset, 
final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException {
         ViewMessageRequestHeader requestHeader = new 
ViewMessageRequestHeader();
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index 890301a48b..d13f2cfe43 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.Message;
@@ -496,6 +497,82 @@ public class MQClientAPIImplTest {
         done.await();
     }
 
+    @Test
+    public void testPopLmqMessage_async() throws Exception {
+        final long popTime = System.currentTimeMillis();
+        final int invisibleTime = 10 * 1000;
+        final String lmqTopic = MixAll.LMQ_PREFIX + "lmq1";
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock mock) throws Throwable {
+                InvokeCallback callback = mock.getArgument(3);
+                RemotingCommand request = mock.getArgument(1);
+                ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
+                RemotingCommand response = 
RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
+                response.setCode(ResponseCode.SUCCESS);
+                response.setOpaque(request.getOpaque());
+
+                PopMessageResponseHeader responseHeader = 
(PopMessageResponseHeader) response.readCustomHeader();
+                responseHeader.setInvisibleTime(invisibleTime);
+                responseHeader.setPopTime(popTime);
+                responseHeader.setReviveQid(0);
+                responseHeader.setRestNum(1);
+                StringBuilder startOffsetInfo = new StringBuilder(64);
+                ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, false, 0, 
0L);
+                responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
+                StringBuilder msgOffsetInfo = new StringBuilder(64);
+                ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, false, 0, 
Collections.singletonList(0L));
+                responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
+                response.setRemark("FOUND");
+                response.makeCustomHeaderToNet();
+
+                MessageExt message = new MessageExt();
+                message.setQueueId(3);
+                message.setFlag(0);
+                message.setQueueOffset(5L);
+                message.setCommitLogOffset(11111L);
+                message.setSysFlag(0);
+                message.setBornTimestamp(System.currentTimeMillis());
+                message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
+                message.setStoreTimestamp(System.currentTimeMillis());
+                message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
+                message.setBody("body".getBytes());
+                message.setTopic(topic);
+                message.putUserProperty("key", "value");
+                
message.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, lmqTopic);
+                
message.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, 
String.valueOf(0));
+                response.setBody(MessageDecoder.encode(message, false));
+                responseFuture.setResponseCommand(response);
+                callback.operationComplete(responseFuture);
+                return null;
+            }
+        }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+        final CountDownLatch done = new CountDownLatch(1);
+        final PopMessageRequestHeader requestHeader = new 
PopMessageRequestHeader();
+        requestHeader.setTopic(lmqTopic);
+        mqClientAPI.popMessageAsync(brokerName, brokerAddr, requestHeader, 10 
* 1000, new PopCallback() {
+            @Override
+            public void onSuccess(PopResult popResult) {
+                
assertThat(popResult.getPopStatus()).isEqualTo(PopStatus.FOUND);
+                assertThat(popResult.getRestNum()).isEqualTo(1);
+                
assertThat(popResult.getInvisibleTime()).isEqualTo(invisibleTime);
+                assertThat(popResult.getPopTime()).isEqualTo(popTime);
+                assertThat(popResult.getMsgFoundList()).size().isEqualTo(1);
+                
assertThat(popResult.getMsgFoundList().get(0).getTopic()).isEqualTo(lmqTopic);
+                
assertThat(popResult.getMsgFoundList().get(0).getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
+                    .isEqualTo(lmqTopic);
+                done.countDown();
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                Assertions.fail("want no exception but got one", e);
+                done.countDown();
+            }
+        });
+        done.await();
+    }
+
     @Test
     public void testAckMessageAsync_Success() throws Exception {
         doAnswer(new Answer<Void>() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java 
b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index dc1d69fe10..6b3d2d6ae0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -97,6 +97,7 @@ public class MixAll {
     public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
     public static final String REPLY_MESSAGE_FLAG = "reply";
     public static final String LMQ_PREFIX = "%LMQ%";
+    public static final long LMQ_QUEUE_ID = 0;
     public static final String MULTI_DISPATCH_QUEUE_SPLITTER = ",";
     public static final String REQ_T = "ReqT";
     public static final String ROCKETMQ_ZONE_ENV = "ROCKETMQ_ZONE";

Reply via email to