This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new bca6393261 [ISSUE #9891] Optimize pop orderly implementation to
facilitate expansion (#9892)
bca6393261 is described below
commit bca6393261536f5f363e2f10a4e1a306b39bcff6
Author: qianye <[email protected]>
AuthorDate: Tue Dec 2 16:30:15 2025 +0800
[ISSUE #9891] Optimize pop orderly implementation to facilitate expansion
(#9892)
Change-Id: Ie2faff9942027a510c4550a9f2bf2df8c6283137
---
.../apache/rocketmq/broker/BrokerController.java | 52 ++++----
.../rocketmq/broker/pop/PopConsumerService.java | 6 +-
.../pop/orderly/ConsumerOrderInfoManager.java | 142 +++++++++++++++++++++
.../orderly/QueueLevelConsumerManager.java} | 74 ++++++++---
.../QueueLevelConsumerOrderInfoLockManager.java} | 22 ++--
.../broker/processor/AckMessageProcessor.java | 4 +-
.../processor/ChangeInvisibleTimeProcessor.java | 6 +-
.../broker/processor/PopMessageProcessor.java | 3 +-
.../broker/pop/PopConsumerServiceTest.java | 4 +-
...ConsumerOrderInfoManagerLockFreeNotifyTest.java | 8 +-
.../orderly}/ConsumerOrderInfoManagerTest.java | 16 +--
.../rocketmq/common/OrderedConsumptionLevel.java | 39 ++++++
12 files changed, 296 insertions(+), 80 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 7b1701c61a..3fb9149056 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -18,6 +18,29 @@ package org.apache.rocketmq.broker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import java.net.InetSocketAddress;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.apache.rocketmq.auth.authentication.factory.AuthenticationFactory;
import
org.apache.rocketmq.auth.authentication.manager.AuthenticationMetadataManager;
import org.apache.rocketmq.auth.authorization.factory.AuthorizationFactory;
@@ -58,11 +81,12 @@ import
org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.broker.offset.BroadcastOffsetManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
-import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.pop.PopConsumerService;
+import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
+import org.apache.rocketmq.broker.pop.orderly.QueueLevelConsumerManager;
import org.apache.rocketmq.broker.processor.AckMessageProcessor;
import org.apache.rocketmq.broker.processor.AdminBrokerProcessor;
import org.apache.rocketmq.broker.processor.ChangeInvisibleTimeProcessor;
@@ -156,30 +180,6 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.timer.TimerMetrics;
-import java.net.InetSocketAddress;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
public class BrokerController {
protected static final Logger LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger LOG_PROTECTION =
LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
@@ -387,7 +387,7 @@ public class BrokerController {
this.consumerManager = new
ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager,
this.brokerConfig);
this.producerManager = new ProducerManager(this.brokerStatsManager);
this.consumerFilterManager = new ConsumerFilterManager(this);
- this.consumerOrderInfoManager = new ConsumerOrderInfoManager(this);
+ this.consumerOrderInfoManager = new QueueLevelConsumerManager(this);
this.popInflightMessageCounter = new PopInflightMessageCounter(this);
this.popConsumerService = brokerConfig.isPopConsumerKVServiceInit() ?
new PopConsumerService(this) : null;
this.clientHousekeepingService = new ClientHousekeepingService(this);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index 57fac798b2..7678daa1d3 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -167,7 +167,7 @@ public class PopConsumerService extends ServiceThread {
if (GetMessageStatus.FOUND.equals(result.getStatus()) &&
!result.getMessageQueueOffset().isEmpty()) {
if (context.isFifo()) {
- this.setFifoBlocked(context, context.getGroupId(), topicId,
queueId, result.getMessageQueueOffset());
+ this.setFifoBlocked(context, context.getGroupId(), topicId,
queueId, result.getMessageQueueOffset(), result);
}
// build response header here
context.addGetMessageResult(result, topicId, queueId, retryType,
offset);
@@ -275,10 +275,10 @@ public class PopConsumerService extends ServiceThread {
* Fifo message does not have retry feature in broker
*/
public void setFifoBlocked(PopConsumerContext context,
- String groupId, String topicId, int queueId, List<Long>
queueOffsetList) {
+ String groupId, String topicId, int queueId, List<Long>
queueOffsetList, GetMessageResult getMessageResult) {
brokerController.getConsumerOrderInfoManager().update(
context.getAttemptId(), false, topicId, groupId, queueId,
- context.getPopTime(), context.getInvisibleTime(), queueOffsetList,
context.getOrderCountInfoBuilder());
+ context.getPopTime(), context.getInvisibleTime(), queueOffsetList,
context.getOrderCountInfoBuilder(), getMessageResult);
}
public boolean isFifoBlocked(PopConsumerContext context, String groupId,
String topicId, int queueId) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManager.java
new file mode 100644
index 0000000000..f8f56992b1
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManager.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.pop.orderly;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.OrderedConsumptionLevel;
+import org.apache.rocketmq.store.GetMessageResult;
+
+/**
+ *
+ * Ordered Consumption Controller Interface
+ * This is the top-level interface that encapsulates complete ordered
consumption management functionality,
+ * supporting different concurrency strategy implementations
+ * <p>
+ * Design Goals:
+ * 1. Support queue-level ordered consumption (existing implementation)
+ * 2. Support message group-level ordered consumption (improve concurrency)
+ * 3. Support custom ordered consumption strategies
+ * </p>
+ */
+public interface ConsumerOrderInfoManager {
+
+ /**
+ * Update the reception status of message list
+ * Called by handleGetMessageResult when consumer POPs messages, used to
record message status and build consumption information
+ *
+ * @param attemptId Distinguish different pop requests
+ * @param isRetry Whether it is a retry topic
+ * @param topic Topic name
+ * @param group Consumer group name
+ * @param queueId Queue ID
+ * @param popTime Time when messages are popped
+ * @param invisibleTime Message invisible time
+ * @param msgQueueOffsetList List of message queue offsets
+ * @param orderInfoBuilder String builder for constructing order
information
+ * @param getMessageResult Return new result
+ */
+ void update(String attemptId, boolean isRetry, String topic, String group,
int queueId,
+ long popTime, long invisibleTime, List<Long> msgQueueOffsetList,
+ StringBuilder orderInfoBuilder, GetMessageResult getMessageResult);
+
+ /**
+ * Check whether the current POP request needs to be blocked
+ * Used to ensure ordered consumption of ordered messages
+ * Called when consumer POPs messages
+ *
+ * @param attemptId Attempt ID
+ * @param topic Topic name
+ * @param group Consumer group name
+ * @param queueId Queue ID
+ * @param invisibleTime Invisible time
+ * @return true indicates blocking is needed, false indicates can proceed
+ */
+ boolean checkBlock(String attemptId, String topic, String group, int
queueId, long invisibleTime);
+
+ /**
+ * Commit message and calculate next consumption offset
+ * Called when consumer ACKs messages
+ *
+ * @param topic Topic name
+ * @param group Consumer group name
+ * @param queueId Queue ID
+ * @param queueOffset Message queue offset
+ * @param popTime Pop time, used for validation
+ * @return -1: invalid, -2: no need to commit, >=0: offset that needs to
be committed (indicates messages below this offset have been consumed)
+ */
+ long commitAndNext(String topic, String group, int queueId, long
queueOffset, long popTime);
+
+ /**
+ * Update the next visible time of message
+ * Used for delayed message re-consumption
+ *
+ * @param topic Topic name
+ * @param group Consumer group name
+ * @param queueId Queue ID
+ * @param queueOffset Message offset
+ * @param popTime Pop time, used for validation
+ * @param nextVisibleTime Next visible time
+ */
+ void updateNextVisibleTime(String topic, String group, int queueId, long
queueOffset,
+ long popTime, long nextVisibleTime);
+
+ /**
+ * Clear the blocking status of specified queue
+ * Usually called during consumer rebalancing or queue reassignment
+ *
+ * @param topic Topic name
+ * @param group Consumer group name
+ * @param queueId Queue ID
+ */
+ void clearBlock(String topic, String group, int queueId);
+
+ /**
+ * Get ordered consumption level
+ * Used to distinguish different implementation strategies
+ *
+ * @return Ordered consumption level, such as: QUEUE, MESSAGE_GROUP, etc.
+ */
+ OrderedConsumptionLevel getOrderedConsumptionLevel();
+
+ /**
+ * Start the controller
+ * Initialize necessary resources, such as timers, thread pools, etc.
+ */
+ void start();
+
+ /**
+ * Shutdown the controller
+ * Release resources, clean up scheduled tasks, etc.
+ */
+ void shutdown();
+
+ /**
+ * Persist the controller
+ * Persist the controller's data
+ */
+ void persist();
+
+ boolean load();
+
+ /**
+ * Get available message result
+ * Used to retrieve messages from cache
+ */
+ CompletableFuture<GetMessageResult> getAvailableMessageResult(String
attemptId, long popTime, long invisibleTime, String groupId,
+ String topicId, int queueId, int batchSize, StringBuilder
orderCountInfoBuilder);
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java
similarity index 89%
rename from
broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
rename to
broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java
index 120f5b104c..79bd59fb78 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerManager.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.pop.orderly;
import com.alibaba.fastjson.annotation.JSONField;
import com.google.common.annotations.VisibleForTesting;
@@ -26,18 +26,21 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.OrderedConsumptionLevel;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.store.GetMessageResult;
-public class ConsumerOrderInfoManager extends ConfigManager {
+public class QueueLevelConsumerManager extends ConfigManager implements
ConsumerOrderInfoManager {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final String TOPIC_GROUP_SEPARATOR = "@";
@@ -46,15 +49,15 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
private ConcurrentHashMap<String/* topic@group*/,
ConcurrentHashMap<Integer/*queueId*/, OrderInfo>> table =
new ConcurrentHashMap<>(128);
- private transient ConsumerOrderInfoLockManager
consumerOrderInfoLockManager;
+ private transient QueueLevelConsumerOrderInfoLockManager
queueLevelConsumerOrderInfoLockManager;
private transient BrokerController brokerController;
- public ConsumerOrderInfoManager() {
+ public QueueLevelConsumerManager() {
}
- public ConsumerOrderInfoManager(BrokerController brokerController) {
+ public QueueLevelConsumerManager(BrokerController brokerController) {
this.brokerController = brokerController;
- this.consumerOrderInfoLockManager = new
ConsumerOrderInfoLockManager(brokerController);
+ this.queueLevelConsumerOrderInfoLockManager = new
QueueLevelConsumerOrderInfoLockManager(brokerController);
}
public ConcurrentHashMap<String, ConcurrentHashMap<Integer, OrderInfo>>
getTable() {
@@ -74,8 +77,8 @@ public class ConsumerOrderInfoManager extends ConfigManager {
}
private void updateLockFreeTimestamp(String topic, String group, int
queueId, OrderInfo orderInfo) {
- if (consumerOrderInfoLockManager != null) {
- consumerOrderInfoLockManager.updateLockFreeTimestamp(topic, group,
queueId, orderInfo);
+ if (queueLevelConsumerOrderInfoLockManager != null) {
+
queueLevelConsumerOrderInfoLockManager.updateLockFreeTimestamp(topic, group,
queueId, orderInfo);
}
}
@@ -91,7 +94,8 @@ public class ConsumerOrderInfoManager extends ConfigManager {
* @param msgQueueOffsetList the queue offsets of messages
* @param orderInfoBuilder will append order info to this builder
*/
- public void update(String attemptId, boolean isRetry, String topic, String
group, int queueId, long popTime, long invisibleTime,
+ public void update(String attemptId, boolean isRetry, String topic, String
group, int queueId, long popTime,
+ long invisibleTime,
List<Long> msgQueueOffsetList, StringBuilder orderInfoBuilder) {
String key = buildKey(topic, group);
ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
@@ -140,6 +144,14 @@ public class ConsumerOrderInfoManager extends
ConfigManager {
updateLockFreeTimestamp(topic, group, queueId, orderInfo);
}
+ @Override
+ public void update(String attemptId, boolean isRetry, String topic, String
group, int queueId, long popTime,
+ long invisibleTime,
+ List<Long> msgQueueOffsetList, StringBuilder orderInfoBuilder,
GetMessageResult getMessageResult) {
+ update(attemptId, isRetry, topic, group, queueId, popTime,
invisibleTime, msgQueueOffsetList, orderInfoBuilder);
+ }
+
+ @Override
public boolean checkBlock(String attemptId, String topic, String group,
int queueId, long invisibleTime) {
String key = buildKey(topic, group);
ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
@@ -159,6 +171,7 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
return orderInfo.needBlock(attemptId, invisibleTime);
}
+ @Override
public void clearBlock(String topic, String group, int queueId) {
table.computeIfPresent(buildKey(topic, group), (key, val) -> {
val.remove(queueId);
@@ -166,6 +179,15 @@ public class ConsumerOrderInfoManager extends
ConfigManager {
});
}
+ @Override
+ public OrderedConsumptionLevel getOrderedConsumptionLevel() {
+ return OrderedConsumptionLevel.QUEUE;
+ }
+
+ @Override
+ public void start() {
+ }
+
/**
* mark message is consumed finished. return the consumer offset
*
@@ -175,6 +197,7 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
* @param queueOffset queue offset of message
* @return -1 : illegal, -2 : no need commit, >= 0 : commit
*/
+ @Override
public long commitAndNext(String topic, String group, int queueId, long
queueOffset, long popTime) {
String key = buildKey(topic, group);
ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
@@ -234,7 +257,9 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
* @param queueOffset queue offset of message
* @param nextVisibleTime nex visible time
*/
- public void updateNextVisibleTime(String topic, String group, int queueId,
long queueOffset, long popTime, long nextVisibleTime) {
+ @Override
+ public void updateNextVisibleTime(String topic, String group, int queueId,
long queueOffset, long popTime,
+ long nextVisibleTime) {
String key = buildKey(topic, group);
ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
@@ -256,6 +281,7 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
updateLockFreeTimestamp(topic, group, queueId, orderInfo);
}
+ @VisibleForTesting
protected void autoClean() {
if (brokerController == null) {
return;
@@ -328,11 +354,11 @@ public class ConsumerOrderInfoManager extends
ConfigManager {
@Override
public void decode(String jsonString) {
if (jsonString != null) {
- ConsumerOrderInfoManager obj =
RemotingSerializable.fromJson(jsonString, ConsumerOrderInfoManager.class);
+ QueueLevelConsumerManager obj =
RemotingSerializable.fromJson(jsonString, QueueLevelConsumerManager.class);
if (obj != null) {
this.table = obj.table;
- if (this.consumerOrderInfoLockManager != null) {
- this.consumerOrderInfoLockManager.recover(this.table);
+ if (this.queueLevelConsumerOrderInfoLockManager != null) {
+
this.queueLevelConsumerOrderInfoLockManager.recover(this.table);
}
}
}
@@ -345,14 +371,20 @@ public class ConsumerOrderInfoManager extends
ConfigManager {
}
public void shutdown() {
- if (this.consumerOrderInfoLockManager != null) {
- this.consumerOrderInfoLockManager.shutdown();
+ if (this.queueLevelConsumerOrderInfoLockManager != null) {
+ this.queueLevelConsumerOrderInfoLockManager.shutdown();
}
}
+ @Override
+ public CompletableFuture<GetMessageResult>
getAvailableMessageResult(String attemptId, long popTime, long invisibleTime,
+ String groupId, String topicId, int queueId, int batchSize,
StringBuilder orderCountInfoBuilder) {
+ return CompletableFuture.completedFuture(null);
+ }
+
@VisibleForTesting
- protected ConsumerOrderInfoLockManager getConsumerOrderInfoLockManager() {
- return consumerOrderInfoLockManager;
+ QueueLevelConsumerOrderInfoLockManager getConsumerOrderInfoLockManager() {
+ return queueLevelConsumerOrderInfoLockManager;
}
public static class OrderInfo {
@@ -397,7 +429,8 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
public OrderInfo() {
}
- public OrderInfo(String attemptId, long popTime, long invisibleTime,
List<Long> queueOffsetList, long lastConsumeTimestamp,
+ public OrderInfo(String attemptId, long popTime, long invisibleTime,
List<Long> queueOffsetList,
+ long lastConsumeTimestamp,
long commitOffsetBit) {
this.popTime = popTime;
this.invisibleTime = invisibleTime;
@@ -600,7 +633,8 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
* @param prevOffsetConsumedCount the offset list of message
*/
@JSONField(serialize = false, deserialize = false)
- public void mergeOffsetConsumedCount(String preAttemptId, List<Long>
preOffsetList, Map<Long, Integer> prevOffsetConsumedCount) {
+ public void mergeOffsetConsumedCount(String preAttemptId, List<Long>
preOffsetList,
+ Map<Long, Integer> prevOffsetConsumedCount) {
Map<Long, Integer> offsetConsumedCount = new HashMap<>();
if (prevOffsetConsumedCount == null) {
prevOffsetConsumedCount = new HashMap<>();
@@ -641,4 +675,4 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
.toString();
}
}
-}
+}
\ No newline at end of file
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoLockManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerOrderInfoLockManager.java
similarity index 88%
rename from
broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoLockManager.java
rename to
broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerOrderInfoLockManager.java
index 37b3eed230..d65b01d89c 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoLockManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/orderly/QueueLevelConsumerOrderInfoLockManager.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.pop.orderly;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
@@ -32,14 +32,16 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-public class ConsumerOrderInfoLockManager {
+public class QueueLevelConsumerOrderInfoLockManager {
private static final Logger POP_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
+ private ConsumerOrderInfoManager consumerOrderInfoManager;
+
private final BrokerController brokerController;
private final Map<Key, Timeout> timeoutMap = new ConcurrentHashMap<>();
private final Timer timer;
private static final int TIMER_TICK_MS = 100;
- public ConsumerOrderInfoLockManager(BrokerController brokerController) {
+ public QueueLevelConsumerOrderInfoLockManager(BrokerController
brokerController) {
this.brokerController = brokerController;
this.timer = new HashedWheelTimer(
new ThreadFactoryImpl("ConsumerOrderInfoLockManager_"),
@@ -47,22 +49,22 @@ public class ConsumerOrderInfoLockManager {
}
/**
- * when ConsumerOrderInfoManager load from disk, recover data
+ * when QueueLevelConsumerManager load from disk, recover data
*/
- public void recover(Map<String/* topic@group*/,
ConcurrentHashMap<Integer/*queueId*/, ConsumerOrderInfoManager.OrderInfo>>
table) {
+ public void recover(Map<String/* topic@group*/,
ConcurrentHashMap<Integer/*queueId*/, QueueLevelConsumerManager.OrderInfo>>
table) {
if
(!this.brokerController.getBrokerConfig().isEnableNotifyAfterPopOrderLockRelease())
{
return;
}
- for (Map.Entry<String, ConcurrentHashMap<Integer,
ConsumerOrderInfoManager.OrderInfo>> entry : table.entrySet()) {
+ for (Map.Entry<String, ConcurrentHashMap<Integer,
QueueLevelConsumerManager.OrderInfo>> entry : table.entrySet()) {
String topicAtGroup = entry.getKey();
- ConcurrentHashMap<Integer/*queueId*/,
ConsumerOrderInfoManager.OrderInfo> qs = entry.getValue();
- String[] arrays = ConsumerOrderInfoManager.decodeKey(topicAtGroup);
+ ConcurrentHashMap<Integer/*queueId*/,
QueueLevelConsumerManager.OrderInfo> qs = entry.getValue();
+ String[] arrays =
QueueLevelConsumerManager.decodeKey(topicAtGroup);
if (arrays.length != 2) {
continue;
}
String topic = arrays[0];
String group = arrays[1];
- for (Map.Entry<Integer, ConsumerOrderInfoManager.OrderInfo>
qsEntry : qs.entrySet()) {
+ for (Map.Entry<Integer, QueueLevelConsumerManager.OrderInfo>
qsEntry : qs.entrySet()) {
Long lockFreeTimestamp =
qsEntry.getValue().getLockFreeTimestamp();
if (lockFreeTimestamp == null || lockFreeTimestamp <=
System.currentTimeMillis()) {
continue;
@@ -72,7 +74,7 @@ public class ConsumerOrderInfoLockManager {
}
}
- public void updateLockFreeTimestamp(String topic, String group, int
queueId, ConsumerOrderInfoManager.OrderInfo orderInfo) {
+ public void updateLockFreeTimestamp(String topic, String group, int
queueId, QueueLevelConsumerManager.OrderInfo orderInfo) {
this.updateLockFreeTimestamp(topic, group, queueId,
orderInfo.getLockFreeTimestamp());
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 493dbaa9a6..864f40d296 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -19,12 +19,12 @@ package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
-import java.util.BitSet;
import java.nio.charset.StandardCharsets;
+import java.util.BitSet;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
-import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
import org.apache.rocketmq.broker.pop.PopConsumerLockService;
+import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.TopicConfig;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index e8e2a90995..f104e76a52 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -19,13 +19,13 @@ package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
-import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
import org.apache.rocketmq.broker.pop.PopConsumerLockService;
+import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -37,11 +37,11 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import
org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
-import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
import
org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.store.PutMessageStatus;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 6e0d235f00..9967955656 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -44,7 +44,6 @@ import org.apache.rocketmq.broker.longpolling.PollingHeader;
import org.apache.rocketmq.broker.longpolling.PollingResult;
import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
import org.apache.rocketmq.broker.longpolling.PopRequest;
-
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.broker.pop.PopConsumerContext;
import org.apache.rocketmq.common.BrokerConfig;
@@ -806,7 +805,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
this.brokerController.getConsumerOrderInfoManager().update(requestHeader.getAttemptId(),
isRetry, topic,
requestHeader.getConsumerGroup(),
queueId, popTime,
requestHeader.getInvisibleTime(), result.getMessageQueueOffset(),
- orderCountInfo);
+ orderCountInfo, result);
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
requestHeader.getConsumerGroup(), topic, queueId,
finalOffset);
} else {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
index 9c23a8625e..db5f60fb17 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
@@ -35,7 +35,7 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.failover.EscapeBridge;
import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
-import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
+import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
import org.apache.rocketmq.broker.processor.PopMessageProcessor;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
@@ -233,7 +233,7 @@ public class PopConsumerServiceTest {
// fifo block
PopConsumerContext context = new PopConsumerContext(
clientHost, System.currentTimeMillis(), 20000, groupId, false,
ConsumeInitMode.MIN, attemptId);
- consumerService.setFifoBlocked(context, groupId, topicId, queueId,
Collections.singletonList(100L));
+ consumerService.setFifoBlocked(context, groupId, topicId, queueId,
Collections.singletonList(100L), resetGetMessageResult);
Mockito.when(brokerController.getConsumerOrderInfoManager()
.checkBlock(anyString(), anyString(), anyString(), anyInt(),
anyLong())).thenReturn(true);
Assert.assertTrue(consumerService.isFifoBlocked(context, groupId,
topicId, queueId));
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManagerLockFreeNotifyTest.java
similarity index 95%
rename from
broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
rename to
broker/src/test/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManagerLockFreeNotifyTest.java
index 1fdf454d5e..d3c0df987c 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManagerLockFreeNotifyTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.pop.orderly;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -43,7 +43,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
private static final int QUEUE_ID_0 = 0;
private long popTime;
- private ConsumerOrderInfoManager consumerOrderInfoManager;
+ private QueueLevelConsumerManager consumerOrderInfoManager;
private AtomicBoolean notified;
private final BrokerConfig brokerConfig = new BrokerConfig();
@@ -61,7 +61,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
return null;
}).when(popMessageProcessor).notifyLongPollingRequestIfNeed(anyString(),
anyString(), anyInt());
- consumerOrderInfoManager = new
ConsumerOrderInfoManager(brokerController);
+ consumerOrderInfoManager = new
QueueLevelConsumerManager(brokerController);
popTime = System.currentTimeMillis();
}
@@ -158,7 +158,7 @@ public class ConsumerOrderInfoManagerLockFreeNotifyTest {
@Test
public void testRecover() {
- ConsumerOrderInfoManager savedConsumerOrderInfoManager = new
ConsumerOrderInfoManager();
+ QueueLevelConsumerManager savedConsumerOrderInfoManager = new
QueueLevelConsumerManager();
savedConsumerOrderInfoManager.update(
null,
false,
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManagerTest.java
similarity index 96%
rename from
broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
rename to
broker/src/test/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManagerTest.java
index 4414eda54e..7ab3c921ed 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/pop/orderly/ConsumerOrderInfoManagerTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.pop.orderly;
import java.time.Duration;
import java.util.Map;
@@ -51,11 +51,11 @@ public class ConsumerOrderInfoManagerTest {
private static final int QUEUE_ID_1 = 1;
private long popTime;
- private ConsumerOrderInfoManager consumerOrderInfoManager;
+ private QueueLevelConsumerManager consumerOrderInfoManager;
@Before
public void before() {
- consumerOrderInfoManager = new ConsumerOrderInfoManager();
+ consumerOrderInfoManager = new QueueLevelConsumerManager();
popTime = System.currentTimeMillis();
}
@@ -387,7 +387,7 @@ public class ConsumerOrderInfoManagerTest {
TopicConfig topicConfig = new TopicConfig(TOPIC);
when(topicConfigManager.selectTopicConfig(eq(TOPIC))).thenReturn(topicConfig);
- ConsumerOrderInfoManager consumerOrderInfoManager = new
ConsumerOrderInfoManager(brokerController);
+ QueueLevelConsumerManager consumerOrderInfoManager = new
QueueLevelConsumerManager(brokerController);
{
consumerOrderInfoManager.update(null, false,
@@ -444,7 +444,7 @@ public class ConsumerOrderInfoManagerTest {
consumerOrderInfoManager.autoClean();
assertEquals(1, consumerOrderInfoManager.getTable().size());
- for (ConcurrentHashMap<Integer,
ConsumerOrderInfoManager.OrderInfo> orderInfoMap :
consumerOrderInfoManager.getTable().values()) {
+ for (ConcurrentHashMap<Integer,
QueueLevelConsumerManager.OrderInfo> orderInfoMap :
consumerOrderInfoManager.getTable().values()) {
assertEquals(1, orderInfoMap.size());
assertNotNull(orderInfoMap.get(QUEUE_ID_0));
break;
@@ -453,13 +453,13 @@ public class ConsumerOrderInfoManagerTest {
}
private void assertEncodeAndDecode() {
- ConsumerOrderInfoManager.OrderInfo prevOrderInfo =
consumerOrderInfoManager.getTable().values().stream().findFirst()
+ QueueLevelConsumerManager.OrderInfo prevOrderInfo =
consumerOrderInfoManager.getTable().values().stream().findFirst()
.get().get(QUEUE_ID_0);
String dataEncoded = consumerOrderInfoManager.encode();
consumerOrderInfoManager.decode(dataEncoded);
- ConsumerOrderInfoManager.OrderInfo newOrderInfo =
consumerOrderInfoManager.getTable().values().stream().findFirst()
+ QueueLevelConsumerManager.OrderInfo newOrderInfo =
consumerOrderInfoManager.getTable().values().stream().findFirst()
.get().get(QUEUE_ID_0);
assertNotSame(prevOrderInfo, newOrderInfo);
@@ -482,7 +482,7 @@ public class ConsumerOrderInfoManagerTest {
1,
Lists.newArrayList(2L, 3L, 4L),
new StringBuilder());
- ConsumerOrderInfoManager.OrderInfo orderInfo =
consumerOrderInfoManager.getTable().values().stream().findFirst()
+ QueueLevelConsumerManager.OrderInfo orderInfo =
consumerOrderInfoManager.getTable().values().stream().findFirst()
.get().get(QUEUE_ID_0);
orderInfo.setInvisibleTime(null);
diff --git
a/common/src/main/java/org/apache/rocketmq/common/OrderedConsumptionLevel.java
b/common/src/main/java/org/apache/rocketmq/common/OrderedConsumptionLevel.java
new file mode 100644
index 0000000000..bdf972abda
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/OrderedConsumptionLevel.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+public enum OrderedConsumptionLevel {
+ QUEUE(0),
+ SHARDING_KEY(1);
+
+ private final int value;
+
+ OrderedConsumptionLevel(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public static OrderedConsumptionLevel valueOf(int value) {
+ if (value == 1) {
+ return SHARDING_KEY;
+ }
+ return QUEUE;
+ }
+}