This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 03ffc45a8 [ISSUE #5364] Support changeInvisibleTime for pop orderly 
(#5367)
03ffc45a8 is described below

commit 03ffc45a8b93844b692b7a834cf28b922015de3a
Author: lk <[email protected]>
AuthorDate: Mon Oct 24 10:44:06 2022 +0800

    [ISSUE #5364] Support changeInvisibleTime for pop orderly (#5367)
---
 .../broker/offset/ConsumerOrderInfoManager.java    | 466 ++++++++++++-------
 .../broker/processor/AckMessageProcessor.java      |   3 +-
 .../processor/ChangeInvisibleTimeProcessor.java    |  33 ++
 .../broker/processor/PopMessageProcessor.java      |  18 +-
 .../offset/ConsumerOrderInfoManagerTest.java       | 500 +++++++++++++++++++++
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  16 +-
 .../common/protocol/header/ExtraInfoUtil.java      |  33 +-
 .../common/protocol/header/ExtraInfoUtilTest.java  |  46 ++
 .../rocketmq/test/client/rmq/RMQPopClient.java     | 163 +++++++
 .../rocketmq/test/factory/ConsumerFactory.java     |   8 +
 .../org/apache/rocketmq/test/base/BaseConf.java    |   3 +
 .../rocketmq/test/client/consumer/pop/BasePop.java |  42 ++
 .../test/client/consumer/pop/BasePopOrderly.java   |  89 ++++
 .../pop/ChangeInvisibleTimeMidMsgOrderlyIT.java    | 102 +++++
 14 files changed, 1359 insertions(+), 163 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
index 68c767fd4..894a6c373 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
@@ -17,16 +17,21 @@
 package org.apache.rocketmq.broker.offset;
 
 import com.alibaba.fastjson.annotation.JSONField;
+import com.google.common.base.MoreObjects;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
@@ -57,16 +62,29 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
         this.table = table;
     }
 
+    protected static String buildKey(String topic, String group) {
+        return topic + TOPIC_GROUP_SEPARATOR + group;
+    }
+
+    protected static String[] decodeKey(String key) {
+        return key.split(TOPIC_GROUP_SEPARATOR);
+    }
+
     /**
-     * not thread safe.
+     * update the message list received
      *
-     * @param topic
-     * @param group
-     * @param queueId
-     * @param msgOffsetList
+     * @param isRetry is retry topic or not
+     * @param topic topic
+     * @param group group
+     * @param queueId queue id of message
+     * @param popTime the time of pop message
+     * @param invisibleTime invisible time
+     * @param msgQueueOffsetList the queue offsets of messages
+     * @param orderInfoBuilder will append order info to this builder
      */
-    public int update(String topic, String group, int queueId, List<Long> 
msgOffsetList) {
-        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+    public void update(boolean isRetry, String topic, String group, int 
queueId, long popTime, long invisibleTime,
+        List<Long> msgQueueOffsetList, StringBuilder orderInfoBuilder) {
+        String key = buildKey(topic, group);
         ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
         if (qs == null) {
             qs = new ConcurrentHashMap<>(16);
@@ -78,33 +96,42 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
 
         OrderInfo orderInfo = qs.get(queueId);
 
-        // start is same.
-        List<Long> simple = OrderInfo.simpleO(msgOffsetList);
-        if (orderInfo != null && 
simple.get(0).equals(orderInfo.getOffsetList().get(0))) {
-            if (simple.equals(orderInfo.getOffsetList())) {
-                orderInfo.setConsumedCount(orderInfo.getConsumedCount() + 1);
-            } else {
-                // reset, because msgs are changed.
-                orderInfo.setConsumedCount(0);
-            }
-            orderInfo.setLastConsumeTimestamp(System.currentTimeMillis());
-            orderInfo.setOffsetList(simple);
-            orderInfo.setCommitOffsetBit(0);
+        if (orderInfo != null) {
+            OrderInfo newOrderInfo = new OrderInfo(popTime, invisibleTime, 
msgQueueOffsetList, System.currentTimeMillis(), 0);
+            newOrderInfo.mergeOffsetConsumedCount(orderInfo.offsetList, 
orderInfo.offsetConsumedCount);
+
+            orderInfo = newOrderInfo;
         } else {
-            orderInfo = new OrderInfo();
-            orderInfo.setOffsetList(simple);
-            orderInfo.setLastConsumeTimestamp(System.currentTimeMillis());
-            orderInfo.setConsumedCount(0);
-            orderInfo.setCommitOffsetBit(0);
+            orderInfo = new OrderInfo(popTime, invisibleTime, 
msgQueueOffsetList, System.currentTimeMillis(), 0);
+        }
+        qs.put(queueId, orderInfo);
+
+        Map<Long, Integer> offsetConsumedCount = orderInfo.offsetConsumedCount;
+        int minConsumedTimes = Integer.MAX_VALUE;
+        if (offsetConsumedCount != null) {
+            Set<Long> offsetSet = offsetConsumedCount.keySet();
+            for (Long offset : offsetSet) {
+                Integer consumedTimes = 
offsetConsumedCount.getOrDefault(offset, 0);
+                ExtraInfoUtil.buildQueueOffsetOrderCountInfo(orderInfoBuilder, 
isRetry, queueId, offset, consumedTimes);
+                minConsumedTimes = Math.min(minConsumedTimes, consumedTimes);
+            }
 
-            qs.put(queueId, orderInfo);
+            if (offsetConsumedCount.size() != orderInfo.offsetList.size()) {
+                // offsetConsumedCount only save messages which consumed count 
is greater than 0
+                // if size not equal, means there are some new messages
+                minConsumedTimes = 0;
+            }
+        } else {
+            minConsumedTimes = 0;
         }
 
-        return orderInfo.getConsumedCount();
+        // for compatibility
+        // the old pop sdk use queueId to get consumedTimes from orderCountInfo
+        ExtraInfoUtil.buildQueueIdOrderCountInfo(orderInfoBuilder, isRetry, 
queueId, minConsumedTimes);
     }
 
     public boolean checkBlock(String topic, String group, int queueId, long 
invisibleTime) {
-        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+        String key = buildKey(topic, group);
         ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
         if (qs == null) {
             qs = new ConcurrentHashMap<>(16);
@@ -119,84 +146,98 @@ public class ConsumerOrderInfoManager extends 
ConfigManager {
         if (orderInfo == null) {
             return false;
         }
-
-        boolean isBlock = System.currentTimeMillis() - 
orderInfo.getLastConsumeTimestamp() < invisibleTime;
-
-        return isBlock && !orderInfo.isDone();
+        return orderInfo.needBlock(invisibleTime);
     }
 
     /**
-     * @param topic
-     * @param group
-     * @param queueId
-     * @param offset
+     * mark message is consumed finished. return the consumer offset
+     *
+     * @param topic topic
+     * @param group group
+     * @param queueId queue id of message
+     * @param queueOffset queue offset of message
      * @return -1 : illegal, -2 : no need commit, >= 0 : commit
      */
-    public long commitAndNext(String topic, String group, int queueId, long 
offset) {
-        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+    public long commitAndNext(String topic, String group, int queueId, long 
queueOffset, long popTime) {
+        String key = buildKey(topic, group);
         ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
 
         if (qs == null) {
-            return offset + 1;
+            return queueOffset + 1;
         }
         OrderInfo orderInfo = qs.get(queueId);
         if (orderInfo == null) {
-            log.warn("OrderInfo is null, {}, {}, {}", key, offset, orderInfo);
-            return offset + 1;
+            log.warn("OrderInfo is null, {}, {}, {}", key, queueOffset, 
orderInfo);
+            return queueOffset + 1;
         }
 
-        List<Long> offsetList = orderInfo.getOffsetList();
-        if (offsetList == null || offsetList.isEmpty()) {
-            log.warn("OrderInfo is empty, {}, {}, {}", key, offset, orderInfo);
+        List<Long> o = orderInfo.offsetList;
+        if (o == null || o.isEmpty()) {
+            log.warn("OrderInfo is empty, {}, {}, {}", key, queueOffset, 
orderInfo);
             return -1;
         }
-        Long first = offsetList.get(0);
-        int i = 0, size = offsetList.size();
+
+        if (popTime != orderInfo.popTime) {
+            log.warn("popTime is not equal to orderInfo saved. key: {}, 
offset: {}, orderInfo: {}, popTime: {}", key, queueOffset, orderInfo, popTime);
+            return -2;
+        }
+
+        Long first = o.get(0);
+        int i = 0, size = o.size();
         for (; i < size; i++) {
             long temp;
             if (i == 0) {
                 temp = first;
             } else {
-                temp = first + offsetList.get(i);
+                temp = first + o.get(i);
             }
-            if (offset == temp) {
+            if (queueOffset == temp) {
                 break;
             }
         }
         // not found
         if (i >= size) {
-            log.warn("OrderInfo not found commit offset, {}, {}, {}", key, 
offset, orderInfo);
+            log.warn("OrderInfo not found commit offset, {}, {}, {}", key, 
queueOffset, orderInfo);
             return -1;
         }
         //set bit
-        orderInfo.setCommitOffsetBit(orderInfo.getCommitOffsetBit() | (1L << 
i));
-        if (orderInfo.isDone()) {
-            if (size == 1) {
-                return offsetList.get(0) + 1;
-            } else {
-                return offsetList.get(size - 1) + first + 1;
-            }
-        }
-        return -2;
+        orderInfo.setCommitOffsetBit(orderInfo.commitOffsetBit | (1L << i));
+        long nextOffset = orderInfo.getNextOffset();
+
+        return nextOffset;
     }
 
-    public OrderInfo get(String topic, String group, int queueId) {
-        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+    /**
+     * update next visible time of this message
+     *
+     * @param topic topic
+     * @param group group
+     * @param queueId queue id of message
+     * @param queueOffset queue offset of message
+     * @param nextVisibleTime nex visible time
+     */
+    public void updateNextVisibleTime(String topic, String group, int queueId, 
long queueOffset, long popTime, long nextVisibleTime) {
+        String key = buildKey(topic, group);
         ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
 
         if (qs == null) {
-            return null;
+            log.warn("orderInfo of queueId is null. key: {}, queueOffset: {}, 
queueId: {}", key, queueOffset, queueId);
+            return;
+        }
+        OrderInfo orderInfo = qs.get(queueId);
+        if (orderInfo == null) {
+            log.warn("orderInfo is null, key: {}, queueOffset: {}, queueId: 
{}", key, queueOffset, queueId);
+            return;
+        }
+        if (popTime != orderInfo.popTime) {
+            log.warn("popTime is not equal to orderInfo saved. key: {}, 
queueOffset: {}, orderInfo: {}, popTime: {}", key, queueOffset, orderInfo, 
popTime);
+            return;
         }
 
-        return qs.get(queueId);
-    }
-
-    public int getConsumeCount(String topic, String group, int queueId) {
-        OrderInfo orderInfo = get(topic, group, queueId);
-        return orderInfo == null ? 0 : orderInfo.getConsumedCount();
+        orderInfo.updateOffsetNextVisibleTime(queueOffset, nextVisibleTime);
     }
 
-    private void autoClean() {
+    protected void autoClean() {
         if (brokerController == null) {
             return;
         }
@@ -207,7 +248,7 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
                 iterator.next();
             String topicAtGroup = entry.getKey();
             ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = 
entry.getValue();
-            String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+            String[] arrays = decodeKey(topicAtGroup);
             if (arrays.length != 2) {
                 continue;
             }
@@ -246,7 +287,6 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
                 if (System.currentTimeMillis() - 
qsEntry.getValue().getLastConsumeTimestamp() > CLEAN_SPAN_FROM_LAST) {
                     qsIterator.remove();
                     log.info("Not consume long time, Clean order info, {}:{}, 
{}", topicAtGroup, entry.getValue(), topicConfig);
-                    continue;
                 }
             }
         }
@@ -279,58 +319,58 @@ public class ConsumerOrderInfoManager extends 
ConfigManager {
     @Override
     public String encode(boolean prettyFormat) {
         this.autoClean();
-
-        StringBuilder stringBuilder = new StringBuilder();
-        stringBuilder.append("{\n").append("\t\"table\":{");
-        Iterator<Map.Entry<String/* topic@group*/, 
ConcurrentHashMap<Integer/*queueId*/, OrderInfo>>> iterator =
-            this.table.entrySet().iterator();
-        int count1 = 0;
-        while (iterator.hasNext()) {
-            Map.Entry<String/* topic@group*/, 
ConcurrentHashMap<Integer/*queueId*/, OrderInfo>> entry =
-                iterator.next();
-            if (count1 > 0) {
-                stringBuilder.append(",");
-            }
-            
stringBuilder.append("\n\t\t\"").append(entry.getKey()).append("\":{");
-            Iterator<Map.Entry<Integer/*queueId*/, OrderInfo>> qsIterator = 
entry.getValue().entrySet().iterator();
-            int count2 = 0;
-            while (qsIterator.hasNext()) {
-                Map.Entry<Integer/*queueId*/, OrderInfo> qsEntry = 
qsIterator.next();
-                if (count2 > 0) {
-                    stringBuilder.append(",");
-                }
-                
stringBuilder.append("\n\t\t\t").append(qsEntry.getKey()).append(":")
-                    .append(qsEntry.getValue().encode());
-                count2++;
-            }
-            stringBuilder.append("\n\t\t}");
-            count1++;
-        }
-        stringBuilder.append("\n\t}").append("\n}");
-        return stringBuilder.toString();
+        return RemotingSerializable.toJson(this, prettyFormat);
     }
 
     public static class OrderInfo {
+        private long popTime;
+        /**
+         * the invisibleTime when pop message
+         */
+        @JSONField(name = "i")
+        private Long invisibleTime;
         /**
          * offset
+         * offsetList[0] is the queue offset of message
+         * offsetList[i] (i > 0) is the distance between current message and 
offsetList[0]
          */
+        @JSONField(name = "o")
         private List<Long> offsetList;
         /**
-         * consumed count
+         * next visible timestamp for message
+         * key: message queue offset
+         */
+        @JSONField(name = "ot")
+        private Map<Long, Long> offsetNextVisibleTime;
+        /**
+         * message consumed count for offset
+         * key: message queue offset
          */
-        private int consumedCount;
+        @JSONField(name = "oc")
+        private Map<Long, Integer> offsetConsumedCount;
         /**
          * last consume timestamp
          */
+        @JSONField(name = "l")
         private long lastConsumeTimestamp;
         /**
          * commit offset bit
          */
+        @JSONField(name = "cm")
         private long commitOffsetBit;
 
         public OrderInfo() {
         }
 
+        public OrderInfo(long popTime, long invisibleTime, List<Long> 
queueOffsetList, long lastConsumeTimestamp,
+            long commitOffsetBit) {
+            this.popTime = popTime;
+            this.invisibleTime = invisibleTime;
+            this.offsetList = buildOffsetList(queueOffsetList);
+            this.lastConsumeTimestamp = lastConsumeTimestamp;
+            this.commitOffsetBit = commitOffsetBit;
+        }
+
         public List<Long> getOffsetList() {
             return offsetList;
         }
@@ -339,28 +379,6 @@ public class ConsumerOrderInfoManager extends 
ConfigManager {
             this.offsetList = offsetList;
         }
 
-        public static List<Long> simpleO(List<Long> offsetList) {
-            List<Long> simple = new ArrayList<>();
-            if (offsetList.size() == 1) {
-                simple.addAll(offsetList);
-                return simple;
-            }
-            Long first = offsetList.get(0);
-            simple.add(first);
-            for (int i = 1; i < offsetList.size(); i++) {
-                simple.add(offsetList.get(i) - first);
-            }
-            return simple;
-        }
-
-        public int getConsumedCount() {
-            return consumedCount;
-        }
-
-        public void setConsumedCount(int consumedCount) {
-            this.consumedCount = consumedCount;
-        }
-
         public long getLastConsumeTimestamp() {
             return lastConsumeTimestamp;
         }
@@ -377,50 +395,198 @@ public class ConsumerOrderInfoManager extends 
ConfigManager {
             this.commitOffsetBit = commitOffsetBit;
         }
 
+        public long getPopTime() {
+            return popTime;
+        }
+
+        public void setPopTime(long popTime) {
+            this.popTime = popTime;
+        }
+
+        public Long getInvisibleTime() {
+            return invisibleTime;
+        }
+
+        public void setInvisibleTime(Long invisibleTime) {
+            this.invisibleTime = invisibleTime;
+        }
+
+        public Map<Long, Long> getOffsetNextVisibleTime() {
+            return offsetNextVisibleTime;
+        }
+
+        public void setOffsetNextVisibleTime(Map<Long, Long> 
offsetNextVisibleTime) {
+            this.offsetNextVisibleTime = offsetNextVisibleTime;
+        }
+
+        public Map<Long, Integer> getOffsetConsumedCount() {
+            return offsetConsumedCount;
+        }
+
+        public void setOffsetConsumedCount(Map<Long, Integer> 
offsetConsumedCount) {
+            this.offsetConsumedCount = offsetConsumedCount;
+        }
+
+        public static List<Long> buildOffsetList(List<Long> queueOffsetList) {
+            List<Long> simple = new ArrayList<>();
+            if (queueOffsetList.size() == 1) {
+                simple.addAll(queueOffsetList);
+                return simple;
+            }
+            Long first = queueOffsetList.get(0);
+            simple.add(first);
+            for (int i = 1; i < queueOffsetList.size(); i++) {
+                simple.add(queueOffsetList.get(i) - first);
+            }
+            return simple;
+        }
+
+        @JSONField(serialize = false, deserialize = false)
+        public boolean needBlock(long currentInvisibleTime) {
+            if (offsetList == null || offsetList.isEmpty()) {
+                return false;
+            }
+            int num = offsetList.size();
+            int i = 0;
+            if (this.invisibleTime == null || this.invisibleTime <= 0) {
+                this.invisibleTime = currentInvisibleTime;
+            }
+            long currentTime = System.currentTimeMillis();
+            for (; i < num; i++) {
+                if (isNotAck(i)) {
+                    long nextVisibleTime = popTime + invisibleTime;
+                    if (offsetNextVisibleTime != null) {
+                        Long time = 
offsetNextVisibleTime.get(this.getQueueOffset(i));
+                        if (time != null) {
+                            nextVisibleTime = time;
+                        }
+                    }
+                    if (currentTime < nextVisibleTime) {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        }
+
         @JSONField(serialize = false, deserialize = false)
-        public boolean isDone() {
+        public Long getLockFreeTimestamp() {
             if (offsetList == null || offsetList.isEmpty()) {
-                return true;
+                return null;
             }
             int num = offsetList.size();
-            for (byte i = 0; i < num; i++) {
-                if ((commitOffsetBit & (1L << i)) == 0) {
-                    return false;
+            int i = 0;
+            long currentTime = System.currentTimeMillis();
+            for (; i < num; i++) {
+                if (isNotAck(i)) {
+                    if (invisibleTime == null || invisibleTime <= 0) {
+                        return null;
+                    }
+                    long nextVisibleTime = popTime + invisibleTime;
+                    if (offsetNextVisibleTime != null) {
+                        Long time = 
offsetNextVisibleTime.get(this.getQueueOffset(i));
+                        if (time != null) {
+                            nextVisibleTime = time;
+                        }
+                    }
+                    if (currentTime < nextVisibleTime) {
+                        return nextVisibleTime;
+                    }
                 }
             }
-            return true;
+            return currentTime;
+        }
+
+        @JSONField(serialize = false, deserialize = false)
+        public void updateOffsetNextVisibleTime(long queueOffset, long 
nextVisibleTime) {
+            if (this.offsetNextVisibleTime == null) {
+                this.offsetNextVisibleTime = new HashMap<>();
+            }
+            this.offsetNextVisibleTime.put(queueOffset, nextVisibleTime);
         }
 
         @JSONField(serialize = false, deserialize = false)
-        public String encode() {
-            StringBuilder sb = new StringBuilder();
-            sb.append("{").append("\"c\":").append(getConsumedCount());
-            sb.append(",").append("\"cm\":").append(getCommitOffsetBit());
-            sb.append(",").append("\"l\":").append(getLastConsumeTimestamp());
-            sb.append(",").append("\"o\":[");
-            if (getOffsetList() != null) {
-                for (int i = 0; i < getOffsetList().size(); i++) {
-                    sb.append(getOffsetList().get(i));
-                    if (i < getOffsetList().size() - 1) {
-                        sb.append(",");
+        public long getNextOffset() {
+            if (offsetList == null || offsetList.isEmpty()) {
+                return -2;
+            }
+            int num = offsetList.size();
+            int i = 0;
+            for (; i < num; i++) {
+                if (isNotAck(i)) {
+                    break;
+                }
+            }
+            if (i == num) {
+                // all ack
+                return getQueueOffset(num - 1) + 1;
+            }
+            return getQueueOffset(i);
+        }
+
+        /**
+         * convert the offset at the index of offsetList to queue offset
+         *
+         * @param offsetIndex the index of offsetList
+         * @return queue offset of message
+         */
+        @JSONField(serialize = false, deserialize = false)
+        public long getQueueOffset(int offsetIndex) {
+            return getQueueOffset(this.offsetList, offsetIndex);
+        }
+
+        protected static long getQueueOffset(List<Long> offsetList, int 
offsetIndex) {
+            if (offsetIndex == 0) {
+                return offsetList.get(0);
+            }
+            return offsetList.get(0) + offsetList.get(offsetIndex);
+        }
+
+        @JSONField(serialize = false, deserialize = false)
+        public boolean isNotAck(int offsetIndex) {
+            return (commitOffsetBit & (1L << offsetIndex)) == 0;
+        }
+
+        /**
+         * calculate message consumed count of each message, and put nonzero 
value into offsetConsumedCount
+         *
+         * @param prevOffsetConsumedCount the offset list of message
+         */
+        @JSONField(serialize = false, deserialize = false)
+        public void mergeOffsetConsumedCount(List<Long> preOffsetList, 
Map<Long, Integer> prevOffsetConsumedCount) {
+            Map<Long, Integer> offsetConsumedCount = new HashMap<>();
+            if (prevOffsetConsumedCount == null) {
+                prevOffsetConsumedCount = new HashMap<>();
+            }
+            Set<Long> preQueueOffsetSet = new HashSet<>();
+            for (int i = 0; i < preOffsetList.size(); i++) {
+                preQueueOffsetSet.add(getQueueOffset(preOffsetList, i));
+            }
+            for (int i = 0; i < offsetList.size(); i++) {
+                long queueOffset = this.getQueueOffset(i);
+                if (preQueueOffsetSet.contains(queueOffset)) {
+                    int count = 1;
+                    Integer preCount = 
prevOffsetConsumedCount.get(queueOffset);
+                    if (preCount != null) {
+                        count = preCount + 1;
                     }
+                    offsetConsumedCount.put(queueOffset, count);
                 }
             }
-            sb.append("]").append("}");
-            return sb.toString();
+            this.offsetConsumedCount = offsetConsumedCount;
         }
 
         @Override
         public String toString() {
-            final StringBuilder sb = new StringBuilder("OrderInfo");
-            sb.append("@").append(this.hashCode());
-            sb.append("{offsetList=").append(offsetList);
-            sb.append(", consumedCount=").append(consumedCount);
-            sb.append(", lastConsumeTimestamp=").append(lastConsumeTimestamp);
-            sb.append(", commitOffsetBit=").append(commitOffsetBit);
-            sb.append(", isDone=").append(isDone());
-            sb.append('}');
-            return sb.toString();
+            return MoreObjects.toStringHelper(this)
+                .add("popTime", popTime)
+                .add("invisibleTime", invisibleTime)
+                .add("offsetList", offsetList)
+                .add("offsetNextVisibleTime", offsetNextVisibleTime)
+                .add("offsetConsumedCount", offsetConsumedCount)
+                .add("lastConsumeTimestamp", lastConsumeTimestamp)
+                .add("commitOffsetBit", commitOffsetBit)
+                .toString();
         }
     }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 8bd3c613c..9493deab8 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -164,7 +164,8 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
                 }
                 long nextOffset = 
brokerController.getConsumerOrderInfoManager().commitAndNext(
                     requestHeader.getTopic(), requestHeader.getConsumerGroup(),
-                    requestHeader.getQueueId(), requestHeader.getOffset());
+                    requestHeader.getQueueId(), requestHeader.getOffset(),
+                    ExtraInfoUtil.getPopTime(extraInfo));
                 if (nextOffset > -1) {
                     
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
                         requestHeader.getConsumerGroup(), 
requestHeader.getTopic(),
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index 76c1b908e..b1092db23 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -96,6 +96,10 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
 
         String[] extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
 
+        if (ExtraInfoUtil.isOrder(extraInfo)) {
+            return processChangeInvisibleTimeForOrder(requestHeader, 
extraInfo, response, responseHeader);
+        }
+
         // add new ck
         long now = System.currentTimeMillis();
         PutMessageResult ckResult = appendCheckPoint(requestHeader, 
ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(), 
requestHeader.getOffset(), now, ExtraInfoUtil.getBrokerName(extraInfo));
@@ -123,6 +127,35 @@ public class ChangeInvisibleTimeProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
+    protected RemotingCommand 
processChangeInvisibleTimeForOrder(ChangeInvisibleTimeRequestHeader 
requestHeader, String[] extraInfo, RemotingCommand response, 
ChangeInvisibleTimeResponseHeader responseHeader) {
+        long popTime = ExtraInfoUtil.getPopTime(extraInfo);
+        long oldOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
+            requestHeader.getTopic(), requestHeader.getQueueId());
+        if (requestHeader.getOffset() < oldOffset) {
+            return response;
+        }
+        while 
(!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(requestHeader.getTopic(),
 requestHeader.getConsumerGroup(), requestHeader.getQueueId())) {
+        }
+        try {
+            oldOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
+                requestHeader.getTopic(), requestHeader.getQueueId());
+            if (requestHeader.getOffset() < oldOffset) {
+                return response;
+            }
+
+            long nextVisibleTime = System.currentTimeMillis() + 
requestHeader.getInvisibleTime();
+            
this.brokerController.getConsumerOrderInfoManager().updateNextVisibleTime(
+                requestHeader.getTopic(), requestHeader.getConsumerGroup(), 
requestHeader.getQueueId(), requestHeader.getOffset(), popTime, 
nextVisibleTime);
+
+            responseHeader.setInvisibleTime(nextVisibleTime - popTime);
+            responseHeader.setPopTime(popTime);
+            responseHeader.setReviveQid(ExtraInfoUtil.getReviveQid(extraInfo));
+        } finally {
+            
this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(requestHeader.getTopic(),
 requestHeader.getConsumerGroup(), requestHeader.getQueueId());
+        }
+        return response;
+    }
+
     private void ackOrigin(final ChangeInvisibleTimeRequestHeader 
requestHeader, String[] extraInfo) {
         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
         AckMsg ackMsg = new AckMsg();
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 0d2c5f9b5..df85fc7e9 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -522,12 +522,12 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
                     getMessageTmpResult.getBufferTotalSize());
 
                 if (isOrder) {
-                    int count = 
brokerController.getConsumerOrderInfoManager().update(topic,
+                    
this.brokerController.getConsumerOrderInfoManager().update(isRetry, topic,
                         requestHeader.getConsumerGroup(),
-                        queueId, getMessageTmpResult.getMessageQueueOffset());
+                        queueId, popTime, requestHeader.getInvisibleTime(), 
getMessageTmpResult.getMessageQueueOffset(),
+                        orderCountInfo);
                     
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
                         requestHeader.getConsumerGroup(), topic, queueId, 
offset);
-                    ExtraInfoUtil.buildOrderCountInfo(orderCountInfo, isRetry, 
queueId, count);
                 } else {
                     appendCheckPoint(requestHeader, topic, reviveQid, queueId, 
offset, getMessageTmpResult, popTime, 
this.brokerController.getBrokerConfig().getBrokerName());
                 }
@@ -901,6 +901,14 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
     public class QueueLockManager extends ServiceThread {
         private ConcurrentHashMap<String, TimedLock> expiredLocalCache = new 
ConcurrentHashMap<>(100000);
 
+        public String buildLockKey(String topic, String consumerGroup, int 
queueId) {
+            return topic + PopAckConstants.SPLIT + consumerGroup + 
PopAckConstants.SPLIT + queueId;
+        }
+
+        public boolean tryLock(String topic, String consumerGroup, int 
queueId) {
+            return tryLock(buildLockKey(topic, consumerGroup, queueId));
+        }
+
         public boolean tryLock(String key) {
             TimedLock timedLock = expiredLocalCache.get(key);
 
@@ -946,6 +954,10 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
             return total;
         }
 
+        public void unLock(String topic, String consumerGroup, int queueId) {
+            unLock(buildLockKey(topic, consumerGroup, queueId));
+        }
+
         public void unLock(String key) {
             TimedLock timedLock = expiredLocalCache.get(key);
             if (timedLock != null) {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
new file mode 100644
index 000000000..b7bb075a4
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.offset;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.assertj.core.util.Lists;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConsumerOrderInfoManagerTest {
+
+    private static final String TOPIC = "topic";
+    private static final String GROUP = "group";
+    private static final int QUEUE_ID_0 = 0;
+    private static final int QUEUE_ID_1 = 1;
+
+    private long popTime;
+    private ConsumerOrderInfoManager consumerOrderInfoManager;
+
+    @Before
+    public void before() {
+        consumerOrderInfoManager = new ConsumerOrderInfoManager();
+        popTime = System.currentTimeMillis();
+    }
+
+    @Test
+    public void testCommitAndNext() {
+        consumerOrderInfoManager.update(
+            false,
+            TOPIC,
+            GROUP,
+            QUEUE_ID_0,
+            popTime,
+            3000,
+            Lists.newArrayList(1L),
+            new StringBuilder()
+        );
+        assertEncodeAndDecode();
+        assertEquals(-2, consumerOrderInfoManager.commitAndNext(
+            TOPIC,
+            GROUP,
+            QUEUE_ID_0,
+            1L,
+            popTime - 10
+        ));
+        assertEncodeAndDecode();
+        assertTrue(consumerOrderInfoManager.checkBlock(
+            TOPIC,
+            GROUP,
+            QUEUE_ID_0,
+            TimeUnit.SECONDS.toMillis(3)
+        ));
+
+        assertEquals(2, consumerOrderInfoManager.commitAndNext(
+            TOPIC,
+            GROUP,
+            QUEUE_ID_0,
+            1L,
+            popTime
+        ));
+        assertEncodeAndDecode();
+        assertFalse(consumerOrderInfoManager.checkBlock(
+            TOPIC,
+            GROUP,
+            QUEUE_ID_0,
+            TimeUnit.SECONDS.toMillis(3)
+        ));
+    }
+
+    @Test
+    public void testConsumedCount() {
+        {
+            // consume three new messages
+            StringBuilder orderInfoBuilder = new StringBuilder();
+            consumerOrderInfoManager.update(
+                false,
+                TOPIC,
+                GROUP,
+                QUEUE_ID_0,
+                popTime,
+                3000,
+                Lists.newArrayList(1L, 2L, 3L),
+                orderInfoBuilder
+            );
+            assertEncodeAndDecode();
+            Map<String, Integer> orderInfoMap = 
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+            assertEquals(1, orderInfoMap.size());
+            assertEquals(0, 
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC, 
QUEUE_ID_0)).intValue());
+        }
+
+        {
+            // reconsume same messages
+            StringBuilder orderInfoBuilder = new StringBuilder();
+            consumerOrderInfoManager.update(
+                false,
+                TOPIC,
+                GROUP,
+                QUEUE_ID_0,
+                popTime,
+                3000,
+                Lists.newArrayList(1L, 2L, 3L),
+                orderInfoBuilder
+            );
+            assertEncodeAndDecode();
+            Map<String, Integer> orderInfoMap = 
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+            assertEquals(4, orderInfoMap.size());
+            assertEquals(1, 
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC, 
QUEUE_ID_0)).intValue());
+            for (int i = 1; i <= 3; i++) {
+                assertEquals(1, 
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0, 
i)).intValue());
+            }
+        }
+
+        {
+            // reconsume last two message
+            StringBuilder orderInfoBuilder = new StringBuilder();
+            consumerOrderInfoManager.update(
+                false,
+                TOPIC,
+                GROUP,
+                QUEUE_ID_0,
+                popTime,
+                3000,
+                Lists.newArrayList(2L, 3L),
+                orderInfoBuilder
+            );
+            assertEncodeAndDecode();
+            Map<String, Integer> orderInfoMap = 
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+            assertEquals(3, orderInfoMap.size());
+            assertEquals(2, 
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC, 
QUEUE_ID_0)).intValue());
+            for (int i = 2; i <= 3; i++) {
+                assertEquals(2, 
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0, 
i)).intValue());
+            }
+        }
+
+        {
+            // consume a new message and reconsume last message
+            StringBuilder orderInfoBuilder = new StringBuilder();
+            consumerOrderInfoManager.update(
+                false,
+                TOPIC,
+                GROUP,
+                QUEUE_ID_0,
+                popTime,
+                3000,
+                Lists.newArrayList(3L, 4L),
+                orderInfoBuilder
+            );
+            assertEncodeAndDecode();
+            Map<String, Integer> orderInfoMap = 
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+            assertEquals(2, orderInfoMap.size());
+            assertEquals(0, 
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC, 
QUEUE_ID_0)).intValue());
+            assertEquals(3, 
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0, 
3)).intValue());
+        }
+
+        {
+            // consume two new messages
+            StringBuilder orderInfoBuilder = new StringBuilder();
+            consumerOrderInfoManager.update(
+                false,
+                TOPIC,
+                GROUP,
+                QUEUE_ID_0,
+                popTime,
+                3000,
+                Lists.newArrayList(5L, 6L),
+                orderInfoBuilder
+            );
+            assertEncodeAndDecode();
+            Map<String, Integer> orderInfoMap = 
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+            assertEquals(1, orderInfoMap.size());
+            assertEquals(0, 
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC, 
QUEUE_ID_0)).intValue());
+        }
+    }
+
+    @Test
+    public void testConsumedCountForMultiQueue() {
+        {
+            // consume two new messages
+            StringBuilder orderInfoBuilder = new StringBuilder();
+            consumerOrderInfoManager.update(
+                false,
+                TOPIC,
+                GROUP,
+                QUEUE_ID_0,
+                popTime,
+                3000,
+                Lists.newArrayList(0L),
+                orderInfoBuilder
+            );
+            consumerOrderInfoManager.update(
+                false,
+                TOPIC,
+                GROUP,
+                QUEUE_ID_1,
+                popTime,
+                3000,
+                Lists.newArrayList(0L),
+                orderInfoBuilder
+            );
+            assertEncodeAndDecode();
+            Map<String, Integer> orderInfoMap = 
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+            assertEquals(2, orderInfoMap.size());
+            assertEquals(0, 
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC, 
QUEUE_ID_0)).intValue());
+            assertEquals(0, 
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC, 
QUEUE_ID_1)).intValue());
+        }
+        {
+            // reconsume two message
+            StringBuilder orderInfoBuilder = new StringBuilder();
+            consumerOrderInfoManager.update(
+                false,
+                TOPIC,
+                GROUP,
+                QUEUE_ID_0,
+                popTime,
+                3000,
+                Lists.newArrayList(0L),
+                orderInfoBuilder
+            );
+            consumerOrderInfoManager.update(
+                false,
+                TOPIC,
+                GROUP,
+                QUEUE_ID_1,
+                popTime,
+                3000,
+                Lists.newArrayList(0L),
+                orderInfoBuilder
+            );
+            assertEncodeAndDecode();
+            Map<String, Integer> orderInfoMap = 
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+            assertEquals(4, orderInfoMap.size());
+            assertEquals(1, 
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC, 
QUEUE_ID_0)).intValue());
+            assertEquals(1, 
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC, 
QUEUE_ID_1)).intValue());
+            assertEquals(1, 
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0, 
0L)).intValue());
+            assertEquals(1, 
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_1, 
0L)).intValue());
+        }
+        {
+            // reconsume with a new message
+            StringBuilder orderInfoBuilder = new StringBuilder();
+            consumerOrderInfoManager.update(
+                false,
+                TOPIC,
+                GROUP,
+                QUEUE_ID_0,
+                popTime,
+                3000,
+                Lists.newArrayList(0L, 1L),
+                orderInfoBuilder
+            );
+            consumerOrderInfoManager.update(
+                false,
+                TOPIC,
+                GROUP,
+                QUEUE_ID_1,
+                popTime,
+                3000,
+                Lists.newArrayList(0L),
+                orderInfoBuilder
+            );
+            assertEncodeAndDecode();
+            Map<String, Integer> orderInfoMap = 
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+            assertEquals(4, orderInfoMap.size());
+            assertEquals(0, 
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC, 
QUEUE_ID_0)).intValue());
+            assertEquals(2, 
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC, 
QUEUE_ID_1)).intValue());
+            assertEquals(2, 
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0, 
0L)).intValue());
+            
assertNull(orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, 
QUEUE_ID_0, 1L)));
+            assertEquals(2, 
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_1, 
0L)).intValue());
+        }
+    }
+
+    @Test
+    public void testUpdateNextVisibleTime() {
+        long invisibleTime = 3000;
+
+        StringBuilder orderInfoBuilder = new StringBuilder();
+        consumerOrderInfoManager.update(
+            false,
+            TOPIC,
+            GROUP,
+            QUEUE_ID_0,
+            popTime,
+            1,
+            Lists.newArrayList(1L, 2L, 3L),
+            orderInfoBuilder
+        );
+
+        consumerOrderInfoManager.updateNextVisibleTime(TOPIC, GROUP, 
QUEUE_ID_0, 2L, popTime, System.currentTimeMillis() + invisibleTime);
+        assertEncodeAndDecode();
+
+        assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP, 
QUEUE_ID_0, 1L, popTime));
+        assertEncodeAndDecode();
+        assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP, 
QUEUE_ID_0, 3L, popTime));
+        assertEncodeAndDecode();
+
+        await().atMost(Duration.ofSeconds(invisibleTime + 1)).until(() -> 
!consumerOrderInfoManager.checkBlock(TOPIC, GROUP, QUEUE_ID_0, invisibleTime));
+
+        orderInfoBuilder = new StringBuilder();
+        consumerOrderInfoManager.update(
+            false,
+            TOPIC,
+            GROUP,
+            QUEUE_ID_0,
+            popTime,
+            1,
+            Lists.newArrayList(2L, 3L, 4L),
+            orderInfoBuilder
+        );
+
+        consumerOrderInfoManager.updateNextVisibleTime(TOPIC, GROUP, 
QUEUE_ID_0, 2L, popTime, System.currentTimeMillis() + invisibleTime);
+        assertEncodeAndDecode();
+
+        assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP, 
QUEUE_ID_0, 3L, popTime));
+        assertEncodeAndDecode();
+        assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP, 
QUEUE_ID_0, 4L, popTime));
+        assertEncodeAndDecode();
+        assertTrue(consumerOrderInfoManager.checkBlock(TOPIC, GROUP, 
QUEUE_ID_0, invisibleTime));
+
+        assertEquals(5L, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP, 
QUEUE_ID_0, 2L, popTime));
+        assertEncodeAndDecode();
+        assertFalse(consumerOrderInfoManager.checkBlock(TOPIC, GROUP, 
QUEUE_ID_0, invisibleTime));
+    }
+
+    @Test
+    public void testAutoCleanAndEncode() {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        BrokerController brokerController = mock(BrokerController.class);
+        TopicConfigManager topicConfigManager = mock(TopicConfigManager.class);
+        when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+        
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+
+        SubscriptionGroupManager subscriptionGroupManager = 
mock(SubscriptionGroupManager.class);
+        
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
+        ConcurrentMap<String, SubscriptionGroupConfig> 
subscriptionGroupConfigConcurrentMap = new ConcurrentHashMap<>();
+        subscriptionGroupConfigConcurrentMap.put(GROUP, new 
SubscriptionGroupConfig());
+        
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(subscriptionGroupConfigConcurrentMap);
+
+        TopicConfig topicConfig = new TopicConfig(TOPIC);
+        
when(topicConfigManager.selectTopicConfig(eq(TOPIC))).thenReturn(topicConfig);
+
+        ConsumerOrderInfoManager consumerOrderInfoManager = new 
ConsumerOrderInfoManager(brokerController);
+
+        {
+            consumerOrderInfoManager.update(false,
+                "errTopic",
+                "errGroup",
+                QUEUE_ID_0,
+                popTime,
+                1,
+                Lists.newArrayList(2L, 3L, 4L),
+                new StringBuilder());
+
+            consumerOrderInfoManager.autoClean();
+            assertEquals(0, consumerOrderInfoManager.getTable().size());
+        }
+        {
+            consumerOrderInfoManager.update(false,
+                TOPIC,
+                "errGroup",
+                QUEUE_ID_0,
+                popTime,
+                1,
+                Lists.newArrayList(2L, 3L, 4L),
+                new StringBuilder());
+
+            consumerOrderInfoManager.autoClean();
+            assertEquals(0, consumerOrderInfoManager.getTable().size());
+        }
+        {
+            topicConfig.setReadQueueNums(0);
+            consumerOrderInfoManager.update(false,
+                TOPIC,
+                GROUP,
+                QUEUE_ID_0,
+                popTime,
+                1,
+                Lists.newArrayList(2L, 3L, 4L),
+                new StringBuilder());
+
+            await().atMost(Duration.ofSeconds(1)).until(() -> {
+                consumerOrderInfoManager.autoClean();
+                return consumerOrderInfoManager.getTable().size() == 0;
+            });
+        }
+        {
+            topicConfig.setReadQueueNums(8);
+            consumerOrderInfoManager.update(false,
+                TOPIC,
+                GROUP,
+                QUEUE_ID_0,
+                popTime,
+                1,
+                Lists.newArrayList(2L, 3L, 4L),
+                new StringBuilder());
+
+            consumerOrderInfoManager.autoClean();
+            assertEquals(1, consumerOrderInfoManager.getTable().size());
+            for (ConcurrentHashMap<Integer, 
ConsumerOrderInfoManager.OrderInfo> orderInfoMap : 
consumerOrderInfoManager.getTable().values()) {
+                assertEquals(1, orderInfoMap.size());
+                assertNotNull(orderInfoMap.get(QUEUE_ID_0));
+                break;
+            }
+        }
+    }
+
+    private void assertEncodeAndDecode() {
+        ConsumerOrderInfoManager.OrderInfo prevOrderInfo = 
consumerOrderInfoManager.getTable().values().stream().findFirst()
+            .get().get(QUEUE_ID_0);
+
+        String dataEncoded = consumerOrderInfoManager.encode();
+
+        consumerOrderInfoManager.decode(dataEncoded);
+        ConsumerOrderInfoManager.OrderInfo newOrderInfo = 
consumerOrderInfoManager.getTable().values().stream().findFirst()
+            .get().get(QUEUE_ID_0);
+
+        assertNotSame(prevOrderInfo, newOrderInfo);
+        assertEquals(prevOrderInfo.getPopTime(), newOrderInfo.getPopTime());
+        assertEquals(prevOrderInfo.getInvisibleTime(), 
newOrderInfo.getInvisibleTime());
+        assertEquals(prevOrderInfo.getOffsetList(), 
newOrderInfo.getOffsetList());
+        assertEquals(prevOrderInfo.getOffsetConsumedCount(), 
newOrderInfo.getOffsetConsumedCount());
+        assertEquals(prevOrderInfo.getOffsetNextVisibleTime(), 
newOrderInfo.getOffsetNextVisibleTime());
+        assertEquals(prevOrderInfo.getLastConsumeTimestamp(), 
newOrderInfo.getLastConsumeTimestamp());
+        assertEquals(prevOrderInfo.getCommitOffsetBit(), 
newOrderInfo.getCommitOffsetBit());
+    }
+
+    @Test
+    public void testLoadFromOldVersionOrderInfoData() {
+        consumerOrderInfoManager.update(false,
+            TOPIC,
+            GROUP,
+            QUEUE_ID_0,
+            popTime,
+            1,
+            Lists.newArrayList(2L, 3L, 4L),
+            new StringBuilder());
+        ConsumerOrderInfoManager.OrderInfo orderInfo = 
consumerOrderInfoManager.getTable().values().stream().findFirst()
+            .get().get(QUEUE_ID_0);
+
+        orderInfo.setInvisibleTime(null);
+        orderInfo.setOffsetConsumedCount(null);
+        orderInfo.setOffsetNextVisibleTime(null);
+
+        String dataEncoded = consumerOrderInfoManager.encode();
+
+        consumerOrderInfoManager.decode(dataEncoded);
+        assertTrue(consumerOrderInfoManager.checkBlock(TOPIC, GROUP, 
QUEUE_ID_0, 3000));
+
+        StringBuilder orderInfoBuilder = new StringBuilder();
+        consumerOrderInfoManager.update(false,
+            TOPIC,
+            GROUP,
+            QUEUE_ID_0,
+            popTime,
+            1,
+            Lists.newArrayList(3L, 4L, 5L),
+            orderInfoBuilder);
+        assertEncodeAndDecode();
+        Map<String, Integer> orderInfoMap = 
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+        assertEquals(3, orderInfoMap.size());
+        assertEquals(0, 
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC, 
QUEUE_ID_0)).intValue());
+        assertEquals(1, 
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0, 
3)).intValue());
+        assertEquals(1, 
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0, 
4)).intValue());
+    }
+}
\ No newline at end of file
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 5f393cb57..854fb73a6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1101,19 +1101,23 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
                         }
                         
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + 
MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
                     } else {
-                        String key = 
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), 
messageExt.getQueueId());
-                        int index = 
sortMap.get(key).indexOf(messageExt.getQueueOffset());
-                        Long msgQueueOffset = 
msgOffsetInfo.get(key).get(index);
+                        String queueIdKey = 
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), 
messageExt.getQueueId());
+                        String queueOffsetKey = 
ExtraInfoUtil.getQueueOffsetMapKey(messageExt.getTopic(), 
messageExt.getQueueId(), messageExt.getQueueOffset());
+                        int index = 
sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset());
+                        Long msgQueueOffset = 
msgOffsetInfo.get(queueIdKey).get(index);
                         if (msgQueueOffset != messageExt.getQueueOffset()) {
                             log.warn("Queue offset[%d] of msg is strange, not 
equal to the stored in msg, %s", msgQueueOffset, messageExt);
                         }
 
                         
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
-                            
ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(key).longValue(), 
responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
-                                responseHeader.getReviveQid(), 
messageExt.getTopic(), brokerName, messageExt.getQueueId(), 
msgQueueOffset.longValue())
+                            
ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), 
responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
+                                responseHeader.getReviveQid(), 
messageExt.getTopic(), brokerName, messageExt.getQueueId(), msgQueueOffset)
                         );
                         if (((PopMessageRequestHeader) 
requestHeader).isOrder() && orderCountInfo != null) {
-                            Integer count = orderCountInfo.get(key);
+                            Integer count = orderCountInfo.get(queueOffsetKey);
+                            if (count == null) {
+                                count = orderCountInfo.get(queueIdKey);
+                            }
                             if (count != null && count > 0) {
                                 messageExt.setReconsumeTimes(count);
                             }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java
index 9a777208a..442060456 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java
@@ -27,6 +27,7 @@ import org.apache.rocketmq.common.message.MessageConst;
 public class ExtraInfoUtil {
     private static final String NORMAL_TOPIC = "0";
     private static final String RETRY_TOPIC = "1";
+    private static final String QUEUE_OFFSET = "qo";
 
     public static String[] split(String extraInfo) {
         if (extraInfo == null) {
@@ -131,7 +132,7 @@ public class ExtraInfoUtil {
             .append(MessageConst.KEY_SEPARATOR).append(startOffset);
     }
 
-    public static void buildOrderCountInfo(StringBuilder stringBuilder, 
boolean retry, int queueId, int orderCount) {
+    public static void buildQueueIdOrderCountInfo(StringBuilder stringBuilder, 
boolean retry, int queueId, int orderCount) {
         if (stringBuilder == null) {
             stringBuilder = new StringBuilder(64);
         }
@@ -145,6 +146,20 @@ public class ExtraInfoUtil {
                 .append(MessageConst.KEY_SEPARATOR).append(orderCount);
     }
 
+    public static void buildQueueOffsetOrderCountInfo(StringBuilder 
stringBuilder, boolean retry, long queueId, long queueOffset, int orderCount) {
+        if (stringBuilder == null) {
+            stringBuilder = new StringBuilder(64);
+        }
+
+        if (stringBuilder.length() > 0) {
+            stringBuilder.append(";");
+        }
+
+        stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC)
+            
.append(MessageConst.KEY_SEPARATOR).append(getQueueOffsetKeyValueKey(queueId, 
queueOffset))
+            .append(MessageConst.KEY_SEPARATOR).append(orderCount);
+    }
+
     public static void buildMsgOffsetInfo(StringBuilder stringBuilder, boolean 
retry, int queueId, List<Long> msgOffsets) {
         if (stringBuilder == null) {
             stringBuilder = new StringBuilder(64);
@@ -252,7 +267,19 @@ public class ExtraInfoUtil {
         return startOffsetMap;
     }
 
-    public static String getStartOffsetInfoMapKey(String topic, int queueId) {
-        return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? 
RETRY_TOPIC : NORMAL_TOPIC) + "@" + queueId;
+    public static String getStartOffsetInfoMapKey(String topic, long key) {
+        return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? 
RETRY_TOPIC : NORMAL_TOPIC) + "@" + key;
+    }
+
+    public static String getQueueOffsetKeyValueKey(long queueId, long 
queueOffset) {
+        return QUEUE_OFFSET + queueId + "%" + queueOffset;
+    }
+
+    public static String getQueueOffsetMapKey(String topic, long queueId, long 
queueOffset) {
+        return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ? 
RETRY_TOPIC : NORMAL_TOPIC) + "@" + getQueueOffsetKeyValueKey(queueId, 
queueOffset);
+    }
+
+    public static boolean isOrder(String[] extraInfo) {
+        return ExtraInfoUtil.getReviveQid(extraInfo) == 
KeyBuilder.POP_ORDER_REVIVE_QUEUE;
     }
 }
diff --git 
a/common/src/test/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtilTest.java
 
b/common/src/test/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtilTest.java
new file mode 100644
index 000000000..2da78b6e4
--- /dev/null
+++ 
b/common/src/test/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtilTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import java.util.Map;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ExtraInfoUtilTest {
+
+    @Test
+    public void testOrderCountInfo() {
+        String topic = "TOPIC";
+        int queueId = 0;
+        long queueOffset = 1234;
+
+        Integer queueIdCount = 1;
+        Integer queueOffsetCount = 2;
+
+        String queueIdKey = ExtraInfoUtil.getStartOffsetInfoMapKey(topic, 
queueId);
+        String queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(topic, 
queueId, queueOffset);
+
+        StringBuilder sb = new StringBuilder();
+        ExtraInfoUtil.buildQueueIdOrderCountInfo(sb, false, queueId, 
queueIdCount);
+        ExtraInfoUtil.buildQueueOffsetOrderCountInfo(sb, false, queueId, 
queueOffset, queueOffsetCount);
+        Map<String, Integer> orderCountInfo = 
ExtraInfoUtil.parseOrderCountInfo(sb.toString());
+
+        assertEquals(queueIdCount, orderCountInfo.get(queueIdKey));
+        assertEquals(queueOffsetCount, orderCountInfo.get(queueOffsetKey));
+    }
+}
\ No newline at end of file
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
new file mode 100644
index 000000000..558acb804
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.AckCallback;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.PopCallback;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.test.clientinterface.MQConsumer;
+import org.apache.rocketmq.test.util.RandomUtil;
+
+public class RMQPopClient implements MQConsumer {
+    private static final long DEFAULT_TIMEOUT = 3000;
+    private MQClientAPIImpl mqClientAPI;
+
+    @Override
+    public void create() {
+        create(false);
+    }
+
+    @Override
+    public void create(boolean useTLS) {
+        ClientConfig clientConfig = new ClientConfig();
+        clientConfig.setInstanceName(RandomUtil.getStringByUUID());
+
+        NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        nettyClientConfig.setUseTLS(useTLS);
+        this.mqClientAPI = new MQClientAPIImpl(nettyClientConfig,
+            new ClientRemotingProcessor(null),
+            null,
+            clientConfig);
+    }
+
+    @Override
+    public void start() {
+        this.mqClientAPI.start();
+    }
+
+    @Override
+    public void shutdown() {
+        this.mqClientAPI.shutdown();
+    }
+
+    public CompletableFuture<PopResult> popMessageAsync(String brokerAddr, 
MessageQueue mq, long invisibleTime,
+        int maxNums, String consumerGroup, long timeout, boolean poll, int 
initMode, boolean order,
+        String expressionType, String expression) {
+        PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
+        requestHeader.setConsumerGroup(consumerGroup);
+        requestHeader.setTopic(mq.getTopic());
+        requestHeader.setQueueId(mq.getQueueId());
+        requestHeader.setMaxMsgNums(maxNums);
+        requestHeader.setInvisibleTime(invisibleTime);
+        requestHeader.setInitMode(initMode);
+        requestHeader.setExpType(expressionType);
+        requestHeader.setExp(expression);
+        requestHeader.setOrder(order);
+        if (poll) {
+            requestHeader.setPollTime(timeout);
+            requestHeader.setBornTime(System.currentTimeMillis());
+            timeout += 10 * 1000;
+        }
+        CompletableFuture<PopResult> future = new CompletableFuture<>();
+        try {
+            this.mqClientAPI.popMessageAsync(mq.getBrokerName(), brokerAddr, 
requestHeader, timeout, new PopCallback() {
+                @Override
+                public void onSuccess(PopResult popResult) {
+                    future.complete(popResult);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    future.completeExceptionally(e);
+                }
+            });
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
+        }
+        return future;
+    }
+
+    public CompletableFuture<AckResult> ackMessageAsync(String brokerAddr, 
String topic, String consumerGroup,
+        String extraInfo) {
+        String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
+        AckMessageRequestHeader requestHeader = new AckMessageRequestHeader();
+        requestHeader.setTopic(ExtraInfoUtil.getRealTopic(extraInfoStrs, 
topic, consumerGroup));
+        requestHeader.setQueueId(ExtraInfoUtil.getQueueId(extraInfoStrs));
+        requestHeader.setOffset(ExtraInfoUtil.getQueueOffset(extraInfoStrs));
+        requestHeader.setConsumerGroup(consumerGroup);
+        requestHeader.setExtraInfo(extraInfo);
+        CompletableFuture<AckResult> future = new CompletableFuture<>();
+        try {
+            this.mqClientAPI.ackMessageAsync(brokerAddr, DEFAULT_TIMEOUT, new 
AckCallback() {
+                @Override
+                public void onSuccess(AckResult ackResult) {
+                    future.complete(ackResult);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    future.completeExceptionally(e);
+                }
+            }, requestHeader);
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
+        }
+        return future;
+    }
+
+    public CompletableFuture<AckResult> changeInvisibleTimeAsync(String 
brokerAddr, String brokerName, String topic,
+        String consumerGroup, String extraInfo, long invisibleTime) {
+        String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
+        ChangeInvisibleTimeRequestHeader requestHeader = new 
ChangeInvisibleTimeRequestHeader();
+        requestHeader.setTopic(ExtraInfoUtil.getRealTopic(extraInfoStrs, 
topic, consumerGroup));
+        requestHeader.setQueueId(ExtraInfoUtil.getQueueId(extraInfoStrs));
+        requestHeader.setOffset(ExtraInfoUtil.getQueueOffset(extraInfoStrs));
+        requestHeader.setConsumerGroup(consumerGroup);
+        requestHeader.setExtraInfo(extraInfo);
+        requestHeader.setInvisibleTime(invisibleTime);
+
+        CompletableFuture<AckResult> future = new CompletableFuture<>();
+        try {
+            this.mqClientAPI.changeInvisibleTimeAsync(brokerName, brokerAddr, 
requestHeader, DEFAULT_TIMEOUT, new AckCallback() {
+                @Override
+                public void onSuccess(AckResult ackResult) {
+                    future.complete(ackResult);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    future.completeExceptionally(e);
+                }
+            });
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
+        }
+        return future;
+    }
+}
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java 
b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
index d530db98b..27f5dcbdd 100644
--- a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
@@ -22,6 +22,7 @@ import 
org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQPopClient;
 import org.apache.rocketmq.test.client.rmq.RMQPopConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
 import org.apache.rocketmq.test.listener.AbstractListener;
@@ -73,6 +74,13 @@ public class ConsumerFactory {
         return consumer;
     }
 
+    public static RMQPopClient getRMQPopClient() {
+        RMQPopClient client = new RMQPopClient();
+        client.create();
+        client.start();
+        return client;
+    }
+
     public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr, 
String consumerGroup) throws Exception {
         DefaultMQPullConsumer defaultMQPullConsumer = new 
DefaultMQPullConsumer(consumerGroup);
         defaultMQPullConsumer.setInstanceName(UUID.randomUUID().toString());
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java 
b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 035a8be68..079064c96 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -44,6 +44,7 @@ import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
 import org.apache.rocketmq.test.client.rmq.RMQTransactionalProducer;
 import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
 import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
+import org.apache.rocketmq.test.clientinterface.MQConsumer;
 import org.apache.rocketmq.test.factory.ConsumerFactory;
 import org.apache.rocketmq.test.listener.AbstractListener;
 import org.apache.rocketmq.test.util.MQAdminTestUtils;
@@ -317,6 +318,8 @@ public class BaseConf {
                 ((MQPullConsumer) mqClient).shutdown();
             } else if (mqClient instanceof MQPushConsumer) {
                 ((MQPushConsumer) mqClient).shutdown();
+            } else if (mqClient instanceof MQConsumer) {
+                ((MQConsumer) mqClient).shutdown();
             }
         }));
     }
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePop.java 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePop.java
new file mode 100644
index 000000000..29ff90261
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePop.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.pop;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQPopClient;
+import org.apache.rocketmq.test.factory.ConsumerFactory;
+
+public class BasePop extends BaseConf {
+
+    public RMQPopClient getRMQPopClient() {
+        RMQPopClient client = ConsumerFactory.getRMQPopClient();
+        mqClients.add(client);
+        return client;
+    }
+
+    protected static class MsgRcv {
+        public final long rcvTime;
+        public final MessageExt messageExt;
+
+        public MsgRcv(long rcvTime, MessageExt messageExt) {
+            this.rcvTime = rcvTime;
+            this.messageExt = messageExt;
+        }
+    }
+}
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
new file mode 100644
index 000000000..1ef40b281
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.pop;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.IntegrationTestBase;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.client.rmq.RMQPopClient;
+import org.apache.rocketmq.test.util.MQRandomUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+
+import static org.junit.Assert.assertEquals;
+
+@Ignore
+public class BasePopOrderly extends BasePop {
+    protected static final long POP_TIMEOUT = 500;
+    protected String topic;
+    protected String group;
+    protected RMQNormalProducer producer = null;
+    protected RMQPopClient client = null;
+    protected String brokerAddr;
+    protected MessageQueue messageQueue;
+    protected final Map<String, List<MsgRcv>> msgRecv = new 
ConcurrentHashMap<>();
+    protected final List<String> msgRecvSequence = new 
CopyOnWriteArrayList<>();
+
+    @Before
+    public void setUp() {
+        brokerAddr = brokerController1.getBrokerAddr();
+        topic = MQRandomUtils.getRandomTopic();
+        group = initConsumerGroup();
+        IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 1, 
CQType.SimpleCQ, TopicMessageType.FIFO);
+        producer = getProducer(NAMESRV_ADDR, topic);
+        client = getRMQPopClient();
+        messageQueue = new MessageQueue(topic, BROKER1_NAME, -1);
+    }
+
+    @After
+    public void tearDown() {
+        shutdown();
+    }
+
+    protected void assertMsgRecv(int seqId, int expectNum) {
+        String msgId = msgRecvSequence.get(seqId);
+        List<MsgRcv> msgRcvList = msgRecv.get(msgId);
+        assertEquals(expectNum, msgRcvList.size());
+        assertConsumeTimes(msgRcvList);
+    }
+
+    protected void assertConsumeTimes(List<MsgRcv> msgRcvList) {
+        for (int i = 0; i < msgRcvList.size(); i++) {
+            assertEquals(i, msgRcvList.get(i).messageExt.getReconsumeTimes());
+        }
+    }
+
+    protected void onRecvNewMessage(MessageExt messageExt) {
+        msgRecvSequence.add(messageExt.getMsgId());
+        msgRecv.compute(messageExt.getMsgId(), (k, msgRcvList) -> {
+            if (msgRcvList == null) {
+                msgRcvList = new CopyOnWriteArrayList<>();
+            }
+            msgRcvList.add(new MsgRcv(System.currentTimeMillis(), messageExt));
+            return msgRcvList;
+        });
+    }
+}
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/ChangeInvisibleTimeMidMsgOrderlyIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/ChangeInvisibleTimeMidMsgOrderlyIT.java
new file mode 100644
index 000000000..ea1ac1e82
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/ChangeInvisibleTimeMidMsgOrderlyIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.pop;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+public class ChangeInvisibleTimeMidMsgOrderlyIT extends BasePopOrderly {
+    /**
+     * send three messages (msg1, msg2, msg3, msg4) and the max message num of 
pop request is three
+     * <p>
+     * ack msg1 and msg3, changeInvisibleTime msg2
+     * <p>
+     * expect the sequence of message received is: msg1, msg2, msg3, msg2, 
msg3, msg4
+     */
+    @Test
+    public void test() {
+        producer.send(4);
+
+        await().atMost(Duration.ofSeconds(5)).until(() -> {
+            changeInvisibleTimeMidMessage().get();
+            return msgRecvSequence.size() == 6;
+        });
+
+        assertMsgRecv(0, 1);
+        assertMsgRecv(1, 2);
+        assertMsgRecv(2, 2);
+        assertMsgRecv(5, 1);
+
+        assertEquals(msgRecvSequence.get(1), msgRecvSequence.get(3));
+        assertEquals(msgRecvSequence.get(2), msgRecvSequence.get(4));
+    }
+
+    private CompletableFuture<Void> changeInvisibleTimeMidMessage() {
+        CompletableFuture<PopResult> future = client.popMessageAsync(
+            brokerAddr, messageQueue, 5000, 3, group, POP_TIMEOUT, true,
+            ConsumeInitMode.MIN, true, ExpressionType.TAG, "*");
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        future.whenComplete((popResult, throwable) -> {
+            if (throwable != null) {
+                resultFuture.completeExceptionally(throwable);
+                return;
+            }
+            if (popResult.getMsgFoundList() == null || 
popResult.getMsgFoundList().isEmpty()) {
+                resultFuture.complete(null);
+                return;
+            }
+            try {
+                for (MessageExt messageExt : popResult.getMsgFoundList()) {
+                    onRecvNewMessage(messageExt);
+                    if (msgRecv.size() != 2) {
+                        try {
+                            client.ackMessageAsync(brokerAddr, topic, group, 
messageExt.getProperty(MessageConst.PROPERTY_POP_CK)).get();
+                        } catch (Exception e) {
+                            resultFuture.completeExceptionally(e);
+                            return;
+                        }
+                    } else {
+                        try {
+                            TimeUnit.MILLISECONDS.sleep(1);
+                            client.changeInvisibleTimeAsync(
+                                brokerAddr, BROKER1_NAME, topic, group,
+                                
messageExt.getProperty(MessageConst.PROPERTY_POP_CK), 3000).get();
+                        } catch (Exception e) {
+                            resultFuture.completeExceptionally(e);
+                            return;
+                        }
+                    }
+                }
+                resultFuture.complete(null);
+            } catch (Throwable t) {
+                resultFuture.completeExceptionally(t);
+            }
+        });
+        return resultFuture;
+    }
+}

Reply via email to