This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new bca6393261 [ISSUE #9891] Optimize pop orderly implementation to 
facilitate expansion (#9892)
bca6393261 is described below

commit bca6393261536f5f363e2f10a4e1a306b39bcff6
Author: qianye <[email protected]>
AuthorDate: Tue Dec 2 16:30:15 2025 +0800

    [ISSUE #9891] Optimize pop orderly implementation to facilitate expansion 
(#9892)
    
    Change-Id: Ie2faff9942027a510c4550a9f2bf2df8c6283137
---
 .../apache/rocketmq/broker/BrokerController.java   |  52 ++++----
 .../rocketmq/broker/pop/PopConsumerService.java    |   6 +-
 .../pop/orderly/ConsumerOrderInfoManager.java      | 142 +++++++++++++++++++++
 .../orderly/QueueLevelConsumerManager.java}        |  74 ++++++++---
 .../QueueLevelConsumerOrderInfoLockManager.java}   |  22 ++--
 .../broker/processor/AckMessageProcessor.java      |   4 +-
 .../processor/ChangeInvisibleTimeProcessor.java    |   6 +-
 .../broker/processor/PopMessageProcessor.java      |   3 +-
 .../broker/pop/PopConsumerServiceTest.java         |   4 +-
 ...ConsumerOrderInfoManagerLockFreeNotifyTest.java |   8 +-
 .../orderly}/ConsumerOrderInfoManagerTest.java     |  16 +--
 .../rocketmq/common/OrderedConsumptionLevel.java   |  39 ++++++
 12 files changed, 296 insertions(+), 80 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 7b1701c61a..3fb9149056 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -18,6 +18,29 @@ package org.apache.rocketmq.broker;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import java.net.InetSocketAddress;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.apache.rocketmq.auth.authentication.factory.AuthenticationFactory;
 import 
org.apache.rocketmq.auth.authentication.manager.AuthenticationMetadataManager;
 import org.apache.rocketmq.auth.authorization.factory.AuthorizationFactory;
@@ -58,11 +81,12 @@ import 
org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
 import org.apache.rocketmq.broker.offset.BroadcastOffsetManager;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
-import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
 import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
 import org.apache.rocketmq.broker.out.BrokerOuterAPI;
 import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
 import org.apache.rocketmq.broker.pop.PopConsumerService;
+import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
+import org.apache.rocketmq.broker.pop.orderly.QueueLevelConsumerManager;
 import org.apache.rocketmq.broker.processor.AckMessageProcessor;
 import org.apache.rocketmq.broker.processor.AdminBrokerProcessor;
 import org.apache.rocketmq.broker.processor.ChangeInvisibleTimeProcessor;
@@ -156,30 +180,6 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
 import org.apache.rocketmq.store.timer.TimerMetrics;
 
-import java.net.InetSocketAddress;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
 public class BrokerController {
     protected static final Logger LOG = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final Logger LOG_PROTECTION = 
LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
@@ -387,7 +387,7 @@ public class BrokerController {
         this.consumerManager = new 
ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager, 
this.brokerConfig);
         this.producerManager = new ProducerManager(this.brokerStatsManager);
         this.consumerFilterManager = new ConsumerFilterManager(this);
-        this.consumerOrderInfoManager = new ConsumerOrderInfoManager(this);
+        this.consumerOrderInfoManager = new QueueLevelConsumerManager(this);
         this.popInflightMessageCounter = new PopInflightMessageCounter(this);
         this.popConsumerService = brokerConfig.isPopConsumerKVServiceInit() ? 
new PopConsumerService(this) : null;
         this.clientHousekeepingService = new ClientHousekeepingService(this);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index 57fac798b2..7678daa1d3 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -167,7 +167,7 @@ public class PopConsumerService extends ServiceThread {
 
         if (GetMessageStatus.FOUND.equals(result.getStatus()) && 
!result.getMessageQueueOffset().isEmpty()) {
             if (context.isFifo()) {
-                this.setFifoBlocked(context, context.getGroupId(), topicId, 
queueId, result.getMessageQueueOffset());
+                this.setFifoBlocked(context, context.getGroupId(), topicId, 
queueId, result.getMessageQueueOffset(), result);
             }
             // build response header here
             context.addGetMessageResult(result, topicId, queueId, retryType, 
offset);
@@ -275,10 +275,10 @@ public class PopConsumerService extends ServiceThread {
      * Fifo message does not have retry feature in broker
      */
     public void setFifoBlocked(PopConsumerContext context,
-        String groupId, String topicId, int queueId, List<Long> 
queueOffsetList) {
+        String groupId, String topicId, int queueId, List<Long> 
queueOffsetList, GetMessageResult getMessageResult) {
         brokerController.getConsumerOrderInfoManager().update(
             context.getAttemptId(), false, topicId, groupId, queueId,
-            context.getPopTime(), context.getInvisibleTime(), queueOffsetList, 
context.getOrderCountInfoBuilder());
+            context.getPopTime(), context.getInvisibleTime(), queueOffsetList, 
context.getOrderCountInfoBuilder(), getMessageResult);
     }
 
     public boolean isFifoBlocked(PopConsumerContext context, String groupId, 
String topicId, int queueId) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManager.java
new file mode 100644
index 0000000000..f8f56992b1
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManager.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.pop.orderly;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.OrderedConsumptionLevel;
+import org.apache.rocketmq.store.GetMessageResult;
+
+/**
+ *
+ * Ordered Consumption Controller Interface
+ * This is the top-level interface that encapsulates complete ordered 
consumption management functionality,
+ * supporting different concurrency strategy implementations
+ * <p>
+ * Design Goals:
+ * 1. Support queue-level ordered consumption (existing implementation)
+ * 2. Support message group-level ordered consumption (improve concurrency)
+ * 3. Support custom ordered consumption strategies
+ * </p>
+ */
+public interface ConsumerOrderInfoManager {
+
+    /**
+     * Update the reception status of message list
+     * Called by handleGetMessageResult when consumer POPs messages, used to 
record message status and build consumption information
+     *
+     * @param attemptId          Distinguish different pop requests
+     * @param isRetry            Whether it is a retry topic
+     * @param topic              Topic name
+     * @param group              Consumer group name
+     * @param queueId            Queue ID
+     * @param popTime            Time when messages are popped
+     * @param invisibleTime      Message invisible time
+     * @param msgQueueOffsetList List of message queue offsets
+     * @param orderInfoBuilder   String builder for constructing order 
information
+     * @param getMessageResult   Return new result
+     */
+    void update(String attemptId, boolean isRetry, String topic, String group, 
int queueId,
+        long popTime, long invisibleTime, List<Long> msgQueueOffsetList,
+        StringBuilder orderInfoBuilder, GetMessageResult getMessageResult);
+
+    /**
+     * Check whether the current POP request needs to be blocked
+     * Used to ensure ordered consumption of ordered messages
+     * Called when consumer POPs messages
+     *
+     * @param attemptId     Attempt ID
+     * @param topic         Topic name
+     * @param group         Consumer group name
+     * @param queueId       Queue ID
+     * @param invisibleTime Invisible time
+     * @return true indicates blocking is needed, false indicates can proceed
+     */
+    boolean checkBlock(String attemptId, String topic, String group, int 
queueId, long invisibleTime);
+
+    /**
+     * Commit message and calculate next consumption offset
+     * Called when consumer ACKs messages
+     *
+     * @param topic       Topic name
+     * @param group       Consumer group name
+     * @param queueId     Queue ID
+     * @param queueOffset Message queue offset
+     * @param popTime     Pop time, used for validation
+     * @return -1: invalid, -2: no need to commit, >=0: offset that needs to 
be committed (indicates messages below this offset have been consumed)
+     */
+    long commitAndNext(String topic, String group, int queueId, long 
queueOffset, long popTime);
+
+    /**
+     * Update the next visible time of message
+     * Used for delayed message re-consumption
+     *
+     * @param topic           Topic name
+     * @param group           Consumer group name
+     * @param queueId         Queue ID
+     * @param queueOffset     Message offset
+     * @param popTime         Pop time, used for validation
+     * @param nextVisibleTime Next visible time
+     */
+    void updateNextVisibleTime(String topic, String group, int queueId, long 
queueOffset,
+        long popTime, long nextVisibleTime);
+
+    /**
+     * Clear the blocking status of specified queue
+     * Usually called during consumer rebalancing or queue reassignment
+     *
+     * @param topic   Topic name
+     * @param group   Consumer group name
+     * @param queueId Queue ID
+     */
+    void clearBlock(String topic, String group, int queueId);
+
+    /**
+     * Get ordered consumption level
+     * Used to distinguish different implementation strategies
+     *
+     * @return Ordered consumption level, such as: QUEUE, MESSAGE_GROUP, etc.
+     */
+    OrderedConsumptionLevel getOrderedConsumptionLevel();
+
+    /**
+     * Start the controller
+     * Initialize necessary resources, such as timers, thread pools, etc.
+     */
+    void start();
+
+    /**
+     * Shutdown the controller
+     * Release resources, clean up scheduled tasks, etc.
+     */
+    void shutdown();
+
+    /**
+     * Persist the controller
+     * Persist the controller's data
+     */
+    void persist();
+
+    boolean load();
+
+    /**
+     * Get available message result
+     * Used to retrieve messages from cache
+     */
+    CompletableFuture<GetMessageResult> getAvailableMessageResult(String 
attemptId, long popTime, long invisibleTime, String groupId,
+        String topicId, int queueId, int batchSize, StringBuilder 
orderCountInfoBuilder);
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java
similarity index 89%
rename from 
broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
rename to 
broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java
index 120f5b104c..79bd59fb78 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.pop.orderly;
 
 import com.alibaba.fastjson.annotation.JSONField;
 import com.google.common.annotations.VisibleForTesting;
@@ -26,18 +26,21 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.OrderedConsumptionLevel;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.store.GetMessageResult;
 
-public class ConsumerOrderInfoManager extends ConfigManager {
+public class QueueLevelConsumerManager extends ConfigManager implements 
ConsumerOrderInfoManager {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final String TOPIC_GROUP_SEPARATOR = "@";
@@ -46,15 +49,15 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
     private ConcurrentHashMap<String/* topic@group*/, 
ConcurrentHashMap<Integer/*queueId*/, OrderInfo>> table =
         new ConcurrentHashMap<>(128);
 
-    private transient ConsumerOrderInfoLockManager 
consumerOrderInfoLockManager;
+    private transient QueueLevelConsumerOrderInfoLockManager 
queueLevelConsumerOrderInfoLockManager;
     private transient BrokerController brokerController;
 
-    public ConsumerOrderInfoManager() {
+    public QueueLevelConsumerManager() {
     }
 
-    public ConsumerOrderInfoManager(BrokerController brokerController) {
+    public QueueLevelConsumerManager(BrokerController brokerController) {
         this.brokerController = brokerController;
-        this.consumerOrderInfoLockManager = new 
ConsumerOrderInfoLockManager(brokerController);
+        this.queueLevelConsumerOrderInfoLockManager = new 
QueueLevelConsumerOrderInfoLockManager(brokerController);
     }
 
     public ConcurrentHashMap<String, ConcurrentHashMap<Integer, OrderInfo>> 
getTable() {
@@ -74,8 +77,8 @@ public class ConsumerOrderInfoManager extends ConfigManager {
     }
 
     private void updateLockFreeTimestamp(String topic, String group, int 
queueId, OrderInfo orderInfo) {
-        if (consumerOrderInfoLockManager != null) {
-            consumerOrderInfoLockManager.updateLockFreeTimestamp(topic, group, 
queueId, orderInfo);
+        if (queueLevelConsumerOrderInfoLockManager != null) {
+            
queueLevelConsumerOrderInfoLockManager.updateLockFreeTimestamp(topic, group, 
queueId, orderInfo);
         }
     }
 
@@ -91,7 +94,8 @@ public class ConsumerOrderInfoManager extends ConfigManager {
      * @param msgQueueOffsetList the queue offsets of messages
      * @param orderInfoBuilder will append order info to this builder
      */
-    public void update(String attemptId, boolean isRetry, String topic, String 
group, int queueId, long popTime, long invisibleTime,
+    public void update(String attemptId, boolean isRetry, String topic, String 
group, int queueId, long popTime,
+        long invisibleTime,
         List<Long> msgQueueOffsetList, StringBuilder orderInfoBuilder) {
         String key = buildKey(topic, group);
         ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
@@ -140,6 +144,14 @@ public class ConsumerOrderInfoManager extends 
ConfigManager {
         updateLockFreeTimestamp(topic, group, queueId, orderInfo);
     }
 
+    @Override
+    public void update(String attemptId, boolean isRetry, String topic, String 
group, int queueId, long popTime,
+        long invisibleTime,
+        List<Long> msgQueueOffsetList, StringBuilder orderInfoBuilder, 
GetMessageResult getMessageResult) {
+        update(attemptId, isRetry, topic, group, queueId, popTime, 
invisibleTime, msgQueueOffsetList, orderInfoBuilder);
+    }
+
+    @Override
     public boolean checkBlock(String attemptId, String topic, String group, 
int queueId, long invisibleTime) {
         String key = buildKey(topic, group);
         ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
@@ -159,6 +171,7 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
         return orderInfo.needBlock(attemptId, invisibleTime);
     }
 
+    @Override
     public void clearBlock(String topic, String group, int queueId) {
         table.computeIfPresent(buildKey(topic, group), (key, val) -> {
             val.remove(queueId);
@@ -166,6 +179,15 @@ public class ConsumerOrderInfoManager extends 
ConfigManager {
         });
     }
 
+    @Override
+    public OrderedConsumptionLevel getOrderedConsumptionLevel() {
+        return OrderedConsumptionLevel.QUEUE;
+    }
+
+    @Override
+    public void start() {
+    }
+
     /**
      * mark message is consumed finished. return the consumer offset
      *
@@ -175,6 +197,7 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
      * @param queueOffset queue offset of message
      * @return -1 : illegal, -2 : no need commit, >= 0 : commit
      */
+    @Override
     public long commitAndNext(String topic, String group, int queueId, long 
queueOffset, long popTime) {
         String key = buildKey(topic, group);
         ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
@@ -234,7 +257,9 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
      * @param queueOffset queue offset of message
      * @param nextVisibleTime nex visible time
      */
-    public void updateNextVisibleTime(String topic, String group, int queueId, 
long queueOffset, long popTime, long nextVisibleTime) {
+    @Override
+    public void updateNextVisibleTime(String topic, String group, int queueId, 
long queueOffset, long popTime,
+        long nextVisibleTime) {
         String key = buildKey(topic, group);
         ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
 
@@ -256,6 +281,7 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
         updateLockFreeTimestamp(topic, group, queueId, orderInfo);
     }
 
+    @VisibleForTesting
     protected void autoClean() {
         if (brokerController == null) {
             return;
@@ -328,11 +354,11 @@ public class ConsumerOrderInfoManager extends 
ConfigManager {
     @Override
     public void decode(String jsonString) {
         if (jsonString != null) {
-            ConsumerOrderInfoManager obj = 
RemotingSerializable.fromJson(jsonString, ConsumerOrderInfoManager.class);
+            QueueLevelConsumerManager obj = 
RemotingSerializable.fromJson(jsonString, QueueLevelConsumerManager.class);
             if (obj != null) {
                 this.table = obj.table;
-                if (this.consumerOrderInfoLockManager != null) {
-                    this.consumerOrderInfoLockManager.recover(this.table);
+                if (this.queueLevelConsumerOrderInfoLockManager != null) {
+                    
this.queueLevelConsumerOrderInfoLockManager.recover(this.table);
                 }
             }
         }
@@ -345,14 +371,20 @@ public class ConsumerOrderInfoManager extends 
ConfigManager {
     }
 
     public void shutdown() {
-        if (this.consumerOrderInfoLockManager != null) {
-            this.consumerOrderInfoLockManager.shutdown();
+        if (this.queueLevelConsumerOrderInfoLockManager != null) {
+            this.queueLevelConsumerOrderInfoLockManager.shutdown();
         }
     }
 
+    @Override
+    public CompletableFuture<GetMessageResult> 
getAvailableMessageResult(String attemptId, long popTime, long invisibleTime,
+        String groupId, String topicId, int queueId, int batchSize, 
StringBuilder orderCountInfoBuilder) {
+        return CompletableFuture.completedFuture(null);
+    }
+
     @VisibleForTesting
-    protected ConsumerOrderInfoLockManager getConsumerOrderInfoLockManager() {
-        return consumerOrderInfoLockManager;
+    QueueLevelConsumerOrderInfoLockManager getConsumerOrderInfoLockManager() {
+        return queueLevelConsumerOrderInfoLockManager;
     }
 
     public static class OrderInfo {
@@ -397,7 +429,8 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
         public OrderInfo() {
         }
 
-        public OrderInfo(String attemptId, long popTime, long invisibleTime, 
List<Long> queueOffsetList, long lastConsumeTimestamp,
+        public OrderInfo(String attemptId, long popTime, long invisibleTime, 
List<Long> queueOffsetList,
+            long lastConsumeTimestamp,
             long commitOffsetBit) {
             this.popTime = popTime;
             this.invisibleTime = invisibleTime;
@@ -600,7 +633,8 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
          * @param prevOffsetConsumedCount the offset list of message
          */
         @JSONField(serialize = false, deserialize = false)
-        public void mergeOffsetConsumedCount(String preAttemptId, List<Long> 
preOffsetList, Map<Long, Integer> prevOffsetConsumedCount) {
+        public void mergeOffsetConsumedCount(String preAttemptId, List<Long> 
preOffsetList,
+            Map<Long, Integer> prevOffsetConsumedCount) {
             Map<Long, Integer> offsetConsumedCount = new HashMap<>();
             if (prevOffsetConsumedCount == null) {
                 prevOffsetConsumedCount = new HashMap<>();
@@ -641,4 +675,4 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
                 .toString();
         }
     }
-}
+}
\ No newline at end of file
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoLockManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerOrderInfoLockManager.java
similarity index 88%
rename from 
broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoLockManager.java
rename to 
broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerOrderInfoLockManager.java
index 37b3eed230..d65b01d89c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoLockManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerOrderInfoLockManager.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.pop.orderly;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
@@ -32,14 +32,16 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
-public class ConsumerOrderInfoLockManager {
+public class QueueLevelConsumerOrderInfoLockManager {
     private static final Logger POP_LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
+    private ConsumerOrderInfoManager consumerOrderInfoManager;
+
     private final BrokerController brokerController;
     private final Map<Key, Timeout> timeoutMap = new ConcurrentHashMap<>();
     private final Timer timer;
     private static final int TIMER_TICK_MS = 100;
 
-    public ConsumerOrderInfoLockManager(BrokerController brokerController) {
+    public QueueLevelConsumerOrderInfoLockManager(BrokerController 
brokerController) {
         this.brokerController = brokerController;
         this.timer = new HashedWheelTimer(
             new ThreadFactoryImpl("ConsumerOrderInfoLockManager_"),
@@ -47,22 +49,22 @@ public class ConsumerOrderInfoLockManager {
     }
 
     /**
-     * when ConsumerOrderInfoManager load from disk, recover data
+     * when QueueLevelConsumerManager load from disk, recover data
      */
-    public void recover(Map<String/* topic@group*/, 
ConcurrentHashMap<Integer/*queueId*/, ConsumerOrderInfoManager.OrderInfo>> 
table) {
+    public void recover(Map<String/* topic@group*/, 
ConcurrentHashMap<Integer/*queueId*/, QueueLevelConsumerManager.OrderInfo>> 
table) {
         if 
(!this.brokerController.getBrokerConfig().isEnableNotifyAfterPopOrderLockRelease())
 {
             return;
         }
-        for (Map.Entry<String, ConcurrentHashMap<Integer, 
ConsumerOrderInfoManager.OrderInfo>> entry : table.entrySet()) {
+        for (Map.Entry<String, ConcurrentHashMap<Integer, 
QueueLevelConsumerManager.OrderInfo>> entry : table.entrySet()) {
             String topicAtGroup = entry.getKey();
-            ConcurrentHashMap<Integer/*queueId*/, 
ConsumerOrderInfoManager.OrderInfo> qs = entry.getValue();
-            String[] arrays = ConsumerOrderInfoManager.decodeKey(topicAtGroup);
+            ConcurrentHashMap<Integer/*queueId*/, 
QueueLevelConsumerManager.OrderInfo> qs = entry.getValue();
+            String[] arrays = 
QueueLevelConsumerManager.decodeKey(topicAtGroup);
             if (arrays.length != 2) {
                 continue;
             }
             String topic = arrays[0];
             String group = arrays[1];
-            for (Map.Entry<Integer, ConsumerOrderInfoManager.OrderInfo> 
qsEntry : qs.entrySet()) {
+            for (Map.Entry<Integer, QueueLevelConsumerManager.OrderInfo> 
qsEntry : qs.entrySet()) {
                 Long lockFreeTimestamp = 
qsEntry.getValue().getLockFreeTimestamp();
                 if (lockFreeTimestamp == null || lockFreeTimestamp <= 
System.currentTimeMillis()) {
                     continue;
@@ -72,7 +74,7 @@ public class ConsumerOrderInfoLockManager {
         }
     }
 
-    public void updateLockFreeTimestamp(String topic, String group, int 
queueId, ConsumerOrderInfoManager.OrderInfo orderInfo) {
+    public void updateLockFreeTimestamp(String topic, String group, int 
queueId, QueueLevelConsumerManager.OrderInfo orderInfo) {
         this.updateLockFreeTimestamp(topic, group, queueId, 
orderInfo.getLockFreeTimestamp());
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 493dbaa9a6..864f40d296 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -19,12 +19,12 @@ package org.apache.rocketmq.broker.processor;
 import com.alibaba.fastjson.JSON;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
-import java.util.BitSet;
 import java.nio.charset.StandardCharsets;
+import java.util.BitSet;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
-import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
 import org.apache.rocketmq.broker.pop.PopConsumerLockService;
+import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.TopicConfig;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index e8e2a90995..f104e76a52 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -19,13 +19,13 @@ package org.apache.rocketmq.broker.processor;
 import com.alibaba.fastjson.JSON;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.nio.charset.StandardCharsets;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
-import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
 import org.apache.rocketmq.broker.pop.PopConsumerLockService;
+import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -37,11 +37,11 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import 
org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
-import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
 import 
org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
 import org.apache.rocketmq.store.PutMessageStatus;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 6e0d235f00..9967955656 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -44,7 +44,6 @@ import org.apache.rocketmq.broker.longpolling.PollingHeader;
 import org.apache.rocketmq.broker.longpolling.PollingResult;
 import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
 import org.apache.rocketmq.broker.longpolling.PopRequest;
-
 import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
 import org.apache.rocketmq.broker.pop.PopConsumerContext;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -806,7 +805,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
                         
this.brokerController.getConsumerOrderInfoManager().update(requestHeader.getAttemptId(),
 isRetry, topic,
                             requestHeader.getConsumerGroup(),
                             queueId, popTime, 
requestHeader.getInvisibleTime(), result.getMessageQueueOffset(),
-                            orderCountInfo);
+                            orderCountInfo, result);
                         
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
                             requestHeader.getConsumerGroup(), topic, queueId, 
finalOffset);
                     } else {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
index 9c23a8625e..db5f60fb17 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
@@ -35,7 +35,7 @@ import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.failover.EscapeBridge;
 import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
-import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
+import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
 import org.apache.rocketmq.broker.processor.PopMessageProcessor;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -233,7 +233,7 @@ public class PopConsumerServiceTest {
         // fifo block
         PopConsumerContext context = new PopConsumerContext(
             clientHost, System.currentTimeMillis(), 20000, groupId, false, 
ConsumeInitMode.MIN, attemptId);
-        consumerService.setFifoBlocked(context, groupId, topicId, queueId, 
Collections.singletonList(100L));
+        consumerService.setFifoBlocked(context, groupId, topicId, queueId, 
Collections.singletonList(100L), resetGetMessageResult);
         Mockito.when(brokerController.getConsumerOrderInfoManager()
             .checkBlock(anyString(), anyString(), anyString(), anyInt(), 
anyLong())).thenReturn(true);
         Assert.assertTrue(consumerService.isFifoBlocked(context, groupId, 
topicId, queueId));
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManagerLockFreeNotifyTest.java
similarity index 95%
rename from 
broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
rename to 
broker/src/test/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManagerLockFreeNotifyTest.java
index 1fdf454d5e..d3c0df987c 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManagerLockFreeNotifyTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.pop.orderly;
 
 import java.time.Duration;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -43,7 +43,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
     private static final int QUEUE_ID_0 = 0;
 
     private long popTime;
-    private ConsumerOrderInfoManager consumerOrderInfoManager;
+    private QueueLevelConsumerManager consumerOrderInfoManager;
     private AtomicBoolean notified;
 
     private final BrokerConfig brokerConfig = new BrokerConfig();
@@ -61,7 +61,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
             return null;
         
}).when(popMessageProcessor).notifyLongPollingRequestIfNeed(anyString(), 
anyString(), anyInt());
 
-        consumerOrderInfoManager = new 
ConsumerOrderInfoManager(brokerController);
+        consumerOrderInfoManager = new 
QueueLevelConsumerManager(brokerController);
         popTime = System.currentTimeMillis();
     }
 
@@ -158,7 +158,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
 
     @Test
     public void testRecover() {
-        ConsumerOrderInfoManager savedConsumerOrderInfoManager = new 
ConsumerOrderInfoManager();
+        QueueLevelConsumerManager savedConsumerOrderInfoManager = new 
QueueLevelConsumerManager();
         savedConsumerOrderInfoManager.update(
             null,
             false,
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManagerTest.java
similarity index 96%
rename from 
broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
rename to 
broker/src/test/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManagerTest.java
index 4414eda54e..7ab3c921ed 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManagerTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.pop.orderly;
 
 import java.time.Duration;
 import java.util.Map;
@@ -51,11 +51,11 @@ public class ConsumerOrderInfoManagerTest {
     private static final int QUEUE_ID_1 = 1;
 
     private long popTime;
-    private ConsumerOrderInfoManager consumerOrderInfoManager;
+    private QueueLevelConsumerManager consumerOrderInfoManager;
 
     @Before
     public void before() {
-        consumerOrderInfoManager = new ConsumerOrderInfoManager();
+        consumerOrderInfoManager = new QueueLevelConsumerManager();
         popTime = System.currentTimeMillis();
     }
 
@@ -387,7 +387,7 @@ public class ConsumerOrderInfoManagerTest {
         TopicConfig topicConfig = new TopicConfig(TOPIC);
         
when(topicConfigManager.selectTopicConfig(eq(TOPIC))).thenReturn(topicConfig);
 
-        ConsumerOrderInfoManager consumerOrderInfoManager = new 
ConsumerOrderInfoManager(brokerController);
+        QueueLevelConsumerManager consumerOrderInfoManager = new 
QueueLevelConsumerManager(brokerController);
 
         {
             consumerOrderInfoManager.update(null, false,
@@ -444,7 +444,7 @@ public class ConsumerOrderInfoManagerTest {
 
             consumerOrderInfoManager.autoClean();
             assertEquals(1, consumerOrderInfoManager.getTable().size());
-            for (ConcurrentHashMap<Integer, 
ConsumerOrderInfoManager.OrderInfo> orderInfoMap : 
consumerOrderInfoManager.getTable().values()) {
+            for (ConcurrentHashMap<Integer, 
QueueLevelConsumerManager.OrderInfo> orderInfoMap : 
consumerOrderInfoManager.getTable().values()) {
                 assertEquals(1, orderInfoMap.size());
                 assertNotNull(orderInfoMap.get(QUEUE_ID_0));
                 break;
@@ -453,13 +453,13 @@ public class ConsumerOrderInfoManagerTest {
     }
 
     private void assertEncodeAndDecode() {
-        ConsumerOrderInfoManager.OrderInfo prevOrderInfo = 
consumerOrderInfoManager.getTable().values().stream().findFirst()
+        QueueLevelConsumerManager.OrderInfo prevOrderInfo = 
consumerOrderInfoManager.getTable().values().stream().findFirst()
             .get().get(QUEUE_ID_0);
 
         String dataEncoded = consumerOrderInfoManager.encode();
 
         consumerOrderInfoManager.decode(dataEncoded);
-        ConsumerOrderInfoManager.OrderInfo newOrderInfo = 
consumerOrderInfoManager.getTable().values().stream().findFirst()
+        QueueLevelConsumerManager.OrderInfo newOrderInfo = 
consumerOrderInfoManager.getTable().values().stream().findFirst()
             .get().get(QUEUE_ID_0);
 
         assertNotSame(prevOrderInfo, newOrderInfo);
@@ -482,7 +482,7 @@ public class ConsumerOrderInfoManagerTest {
             1,
             Lists.newArrayList(2L, 3L, 4L),
             new StringBuilder());
-        ConsumerOrderInfoManager.OrderInfo orderInfo = 
consumerOrderInfoManager.getTable().values().stream().findFirst()
+        QueueLevelConsumerManager.OrderInfo orderInfo = 
consumerOrderInfoManager.getTable().values().stream().findFirst()
             .get().get(QUEUE_ID_0);
 
         orderInfo.setInvisibleTime(null);
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/OrderedConsumptionLevel.java 
b/common/src/main/java/org/apache/rocketmq/common/OrderedConsumptionLevel.java
new file mode 100644
index 0000000000..bdf972abda
--- /dev/null
+++ 
b/common/src/main/java/org/apache/rocketmq/common/OrderedConsumptionLevel.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+public enum OrderedConsumptionLevel {
+    QUEUE(0),
+    SHARDING_KEY(1);
+
+    private final int value;
+
+    OrderedConsumptionLevel(int value) {
+        this.value = value;
+    }
+
+    public int getValue() {
+        return value;
+    }
+
+    public static OrderedConsumptionLevel valueOf(int value) {
+        if (value == 1) {
+            return SHARDING_KEY;
+        }
+        return QUEUE;
+    }
+}


Reply via email to