This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 03ffc45a8 [ISSUE #5364] Support changeInvisibleTime for pop orderly
(#5367)
03ffc45a8 is described below
commit 03ffc45a8b93844b692b7a834cf28b922015de3a
Author: lk <[email protected]>
AuthorDate: Mon Oct 24 10:44:06 2022 +0800
[ISSUE #5364] Support changeInvisibleTime for pop orderly (#5367)
---
.../broker/offset/ConsumerOrderInfoManager.java | 466 ++++++++++++-------
.../broker/processor/AckMessageProcessor.java | 3 +-
.../processor/ChangeInvisibleTimeProcessor.java | 33 ++
.../broker/processor/PopMessageProcessor.java | 18 +-
.../offset/ConsumerOrderInfoManagerTest.java | 500 +++++++++++++++++++++
.../rocketmq/client/impl/MQClientAPIImpl.java | 16 +-
.../common/protocol/header/ExtraInfoUtil.java | 33 +-
.../common/protocol/header/ExtraInfoUtilTest.java | 46 ++
.../rocketmq/test/client/rmq/RMQPopClient.java | 163 +++++++
.../rocketmq/test/factory/ConsumerFactory.java | 8 +
.../org/apache/rocketmq/test/base/BaseConf.java | 3 +
.../rocketmq/test/client/consumer/pop/BasePop.java | 42 ++
.../test/client/consumer/pop/BasePopOrderly.java | 89 ++++
.../pop/ChangeInvisibleTimeMidMsgOrderlyIT.java | 102 +++++
14 files changed, 1359 insertions(+), 163 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
index 68c767fd4..894a6c373 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
@@ -17,16 +17,21 @@
package org.apache.rocketmq.broker.offset;
import com.alibaba.fastjson.annotation.JSONField;
+import com.google.common.base.MoreObjects;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
@@ -57,16 +62,29 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
this.table = table;
}
+ protected static String buildKey(String topic, String group) {
+ return topic + TOPIC_GROUP_SEPARATOR + group;
+ }
+
+ protected static String[] decodeKey(String key) {
+ return key.split(TOPIC_GROUP_SEPARATOR);
+ }
+
/**
- * not thread safe.
+ * update the message list received
*
- * @param topic
- * @param group
- * @param queueId
- * @param msgOffsetList
+ * @param isRetry is retry topic or not
+ * @param topic topic
+ * @param group group
+ * @param queueId queue id of message
+ * @param popTime the time of pop message
+ * @param invisibleTime invisible time
+ * @param msgQueueOffsetList the queue offsets of messages
+ * @param orderInfoBuilder will append order info to this builder
*/
- public int update(String topic, String group, int queueId, List<Long>
msgOffsetList) {
- String key = topic + TOPIC_GROUP_SEPARATOR + group;
+ public void update(boolean isRetry, String topic, String group, int
queueId, long popTime, long invisibleTime,
+ List<Long> msgQueueOffsetList, StringBuilder orderInfoBuilder) {
+ String key = buildKey(topic, group);
ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
if (qs == null) {
qs = new ConcurrentHashMap<>(16);
@@ -78,33 +96,42 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
OrderInfo orderInfo = qs.get(queueId);
- // start is same.
- List<Long> simple = OrderInfo.simpleO(msgOffsetList);
- if (orderInfo != null &&
simple.get(0).equals(orderInfo.getOffsetList().get(0))) {
- if (simple.equals(orderInfo.getOffsetList())) {
- orderInfo.setConsumedCount(orderInfo.getConsumedCount() + 1);
- } else {
- // reset, because msgs are changed.
- orderInfo.setConsumedCount(0);
- }
- orderInfo.setLastConsumeTimestamp(System.currentTimeMillis());
- orderInfo.setOffsetList(simple);
- orderInfo.setCommitOffsetBit(0);
+ if (orderInfo != null) {
+ OrderInfo newOrderInfo = new OrderInfo(popTime, invisibleTime,
msgQueueOffsetList, System.currentTimeMillis(), 0);
+ newOrderInfo.mergeOffsetConsumedCount(orderInfo.offsetList,
orderInfo.offsetConsumedCount);
+
+ orderInfo = newOrderInfo;
} else {
- orderInfo = new OrderInfo();
- orderInfo.setOffsetList(simple);
- orderInfo.setLastConsumeTimestamp(System.currentTimeMillis());
- orderInfo.setConsumedCount(0);
- orderInfo.setCommitOffsetBit(0);
+ orderInfo = new OrderInfo(popTime, invisibleTime,
msgQueueOffsetList, System.currentTimeMillis(), 0);
+ }
+ qs.put(queueId, orderInfo);
+
+ Map<Long, Integer> offsetConsumedCount = orderInfo.offsetConsumedCount;
+ int minConsumedTimes = Integer.MAX_VALUE;
+ if (offsetConsumedCount != null) {
+ Set<Long> offsetSet = offsetConsumedCount.keySet();
+ for (Long offset : offsetSet) {
+ Integer consumedTimes =
offsetConsumedCount.getOrDefault(offset, 0);
+ ExtraInfoUtil.buildQueueOffsetOrderCountInfo(orderInfoBuilder,
isRetry, queueId, offset, consumedTimes);
+ minConsumedTimes = Math.min(minConsumedTimes, consumedTimes);
+ }
- qs.put(queueId, orderInfo);
+ if (offsetConsumedCount.size() != orderInfo.offsetList.size()) {
+ // offsetConsumedCount only save messages which consumed count
is greater than 0
+ // if size not equal, means there are some new messages
+ minConsumedTimes = 0;
+ }
+ } else {
+ minConsumedTimes = 0;
}
- return orderInfo.getConsumedCount();
+ // for compatibility
+ // the old pop sdk use queueId to get consumedTimes from orderCountInfo
+ ExtraInfoUtil.buildQueueIdOrderCountInfo(orderInfoBuilder, isRetry,
queueId, minConsumedTimes);
}
public boolean checkBlock(String topic, String group, int queueId, long
invisibleTime) {
- String key = topic + TOPIC_GROUP_SEPARATOR + group;
+ String key = buildKey(topic, group);
ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
if (qs == null) {
qs = new ConcurrentHashMap<>(16);
@@ -119,84 +146,98 @@ public class ConsumerOrderInfoManager extends
ConfigManager {
if (orderInfo == null) {
return false;
}
-
- boolean isBlock = System.currentTimeMillis() -
orderInfo.getLastConsumeTimestamp() < invisibleTime;
-
- return isBlock && !orderInfo.isDone();
+ return orderInfo.needBlock(invisibleTime);
}
/**
- * @param topic
- * @param group
- * @param queueId
- * @param offset
+ * mark message is consumed finished. return the consumer offset
+ *
+ * @param topic topic
+ * @param group group
+ * @param queueId queue id of message
+ * @param queueOffset queue offset of message
* @return -1 : illegal, -2 : no need commit, >= 0 : commit
*/
- public long commitAndNext(String topic, String group, int queueId, long
offset) {
- String key = topic + TOPIC_GROUP_SEPARATOR + group;
+ public long commitAndNext(String topic, String group, int queueId, long
queueOffset, long popTime) {
+ String key = buildKey(topic, group);
ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
if (qs == null) {
- return offset + 1;
+ return queueOffset + 1;
}
OrderInfo orderInfo = qs.get(queueId);
if (orderInfo == null) {
- log.warn("OrderInfo is null, {}, {}, {}", key, offset, orderInfo);
- return offset + 1;
+ log.warn("OrderInfo is null, {}, {}, {}", key, queueOffset,
orderInfo);
+ return queueOffset + 1;
}
- List<Long> offsetList = orderInfo.getOffsetList();
- if (offsetList == null || offsetList.isEmpty()) {
- log.warn("OrderInfo is empty, {}, {}, {}", key, offset, orderInfo);
+ List<Long> o = orderInfo.offsetList;
+ if (o == null || o.isEmpty()) {
+ log.warn("OrderInfo is empty, {}, {}, {}", key, queueOffset,
orderInfo);
return -1;
}
- Long first = offsetList.get(0);
- int i = 0, size = offsetList.size();
+
+ if (popTime != orderInfo.popTime) {
+ log.warn("popTime is not equal to orderInfo saved. key: {},
offset: {}, orderInfo: {}, popTime: {}", key, queueOffset, orderInfo, popTime);
+ return -2;
+ }
+
+ Long first = o.get(0);
+ int i = 0, size = o.size();
for (; i < size; i++) {
long temp;
if (i == 0) {
temp = first;
} else {
- temp = first + offsetList.get(i);
+ temp = first + o.get(i);
}
- if (offset == temp) {
+ if (queueOffset == temp) {
break;
}
}
// not found
if (i >= size) {
- log.warn("OrderInfo not found commit offset, {}, {}, {}", key,
offset, orderInfo);
+ log.warn("OrderInfo not found commit offset, {}, {}, {}", key,
queueOffset, orderInfo);
return -1;
}
//set bit
- orderInfo.setCommitOffsetBit(orderInfo.getCommitOffsetBit() | (1L <<
i));
- if (orderInfo.isDone()) {
- if (size == 1) {
- return offsetList.get(0) + 1;
- } else {
- return offsetList.get(size - 1) + first + 1;
- }
- }
- return -2;
+ orderInfo.setCommitOffsetBit(orderInfo.commitOffsetBit | (1L << i));
+ long nextOffset = orderInfo.getNextOffset();
+
+ return nextOffset;
}
- public OrderInfo get(String topic, String group, int queueId) {
- String key = topic + TOPIC_GROUP_SEPARATOR + group;
+ /**
+ * update next visible time of this message
+ *
+ * @param topic topic
+ * @param group group
+ * @param queueId queue id of message
+ * @param queueOffset queue offset of message
+ * @param nextVisibleTime nex visible time
+ */
+ public void updateNextVisibleTime(String topic, String group, int queueId,
long queueOffset, long popTime, long nextVisibleTime) {
+ String key = buildKey(topic, group);
ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs = table.get(key);
if (qs == null) {
- return null;
+ log.warn("orderInfo of queueId is null. key: {}, queueOffset: {},
queueId: {}", key, queueOffset, queueId);
+ return;
+ }
+ OrderInfo orderInfo = qs.get(queueId);
+ if (orderInfo == null) {
+ log.warn("orderInfo is null, key: {}, queueOffset: {}, queueId:
{}", key, queueOffset, queueId);
+ return;
+ }
+ if (popTime != orderInfo.popTime) {
+ log.warn("popTime is not equal to orderInfo saved. key: {},
queueOffset: {}, orderInfo: {}, popTime: {}", key, queueOffset, orderInfo,
popTime);
+ return;
}
- return qs.get(queueId);
- }
-
- public int getConsumeCount(String topic, String group, int queueId) {
- OrderInfo orderInfo = get(topic, group, queueId);
- return orderInfo == null ? 0 : orderInfo.getConsumedCount();
+ orderInfo.updateOffsetNextVisibleTime(queueOffset, nextVisibleTime);
}
- private void autoClean() {
+ protected void autoClean() {
if (brokerController == null) {
return;
}
@@ -207,7 +248,7 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
iterator.next();
String topicAtGroup = entry.getKey();
ConcurrentHashMap<Integer/*queueId*/, OrderInfo> qs =
entry.getValue();
- String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+ String[] arrays = decodeKey(topicAtGroup);
if (arrays.length != 2) {
continue;
}
@@ -246,7 +287,6 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
if (System.currentTimeMillis() -
qsEntry.getValue().getLastConsumeTimestamp() > CLEAN_SPAN_FROM_LAST) {
qsIterator.remove();
log.info("Not consume long time, Clean order info, {}:{},
{}", topicAtGroup, entry.getValue(), topicConfig);
- continue;
}
}
}
@@ -279,58 +319,58 @@ public class ConsumerOrderInfoManager extends
ConfigManager {
@Override
public String encode(boolean prettyFormat) {
this.autoClean();
-
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append("{\n").append("\t\"table\":{");
- Iterator<Map.Entry<String/* topic@group*/,
ConcurrentHashMap<Integer/*queueId*/, OrderInfo>>> iterator =
- this.table.entrySet().iterator();
- int count1 = 0;
- while (iterator.hasNext()) {
- Map.Entry<String/* topic@group*/,
ConcurrentHashMap<Integer/*queueId*/, OrderInfo>> entry =
- iterator.next();
- if (count1 > 0) {
- stringBuilder.append(",");
- }
-
stringBuilder.append("\n\t\t\"").append(entry.getKey()).append("\":{");
- Iterator<Map.Entry<Integer/*queueId*/, OrderInfo>> qsIterator =
entry.getValue().entrySet().iterator();
- int count2 = 0;
- while (qsIterator.hasNext()) {
- Map.Entry<Integer/*queueId*/, OrderInfo> qsEntry =
qsIterator.next();
- if (count2 > 0) {
- stringBuilder.append(",");
- }
-
stringBuilder.append("\n\t\t\t").append(qsEntry.getKey()).append(":")
- .append(qsEntry.getValue().encode());
- count2++;
- }
- stringBuilder.append("\n\t\t}");
- count1++;
- }
- stringBuilder.append("\n\t}").append("\n}");
- return stringBuilder.toString();
+ return RemotingSerializable.toJson(this, prettyFormat);
}
public static class OrderInfo {
+ private long popTime;
+ /**
+ * the invisibleTime when pop message
+ */
+ @JSONField(name = "i")
+ private Long invisibleTime;
/**
* offset
+ * offsetList[0] is the queue offset of message
+ * offsetList[i] (i > 0) is the distance between current message and
offsetList[0]
*/
+ @JSONField(name = "o")
private List<Long> offsetList;
/**
- * consumed count
+ * next visible timestamp for message
+ * key: message queue offset
+ */
+ @JSONField(name = "ot")
+ private Map<Long, Long> offsetNextVisibleTime;
+ /**
+ * message consumed count for offset
+ * key: message queue offset
*/
- private int consumedCount;
+ @JSONField(name = "oc")
+ private Map<Long, Integer> offsetConsumedCount;
/**
* last consume timestamp
*/
+ @JSONField(name = "l")
private long lastConsumeTimestamp;
/**
* commit offset bit
*/
+ @JSONField(name = "cm")
private long commitOffsetBit;
public OrderInfo() {
}
+ public OrderInfo(long popTime, long invisibleTime, List<Long>
queueOffsetList, long lastConsumeTimestamp,
+ long commitOffsetBit) {
+ this.popTime = popTime;
+ this.invisibleTime = invisibleTime;
+ this.offsetList = buildOffsetList(queueOffsetList);
+ this.lastConsumeTimestamp = lastConsumeTimestamp;
+ this.commitOffsetBit = commitOffsetBit;
+ }
+
public List<Long> getOffsetList() {
return offsetList;
}
@@ -339,28 +379,6 @@ public class ConsumerOrderInfoManager extends
ConfigManager {
this.offsetList = offsetList;
}
- public static List<Long> simpleO(List<Long> offsetList) {
- List<Long> simple = new ArrayList<>();
- if (offsetList.size() == 1) {
- simple.addAll(offsetList);
- return simple;
- }
- Long first = offsetList.get(0);
- simple.add(first);
- for (int i = 1; i < offsetList.size(); i++) {
- simple.add(offsetList.get(i) - first);
- }
- return simple;
- }
-
- public int getConsumedCount() {
- return consumedCount;
- }
-
- public void setConsumedCount(int consumedCount) {
- this.consumedCount = consumedCount;
- }
-
public long getLastConsumeTimestamp() {
return lastConsumeTimestamp;
}
@@ -377,50 +395,198 @@ public class ConsumerOrderInfoManager extends
ConfigManager {
this.commitOffsetBit = commitOffsetBit;
}
+ public long getPopTime() {
+ return popTime;
+ }
+
+ public void setPopTime(long popTime) {
+ this.popTime = popTime;
+ }
+
+ public Long getInvisibleTime() {
+ return invisibleTime;
+ }
+
+ public void setInvisibleTime(Long invisibleTime) {
+ this.invisibleTime = invisibleTime;
+ }
+
+ public Map<Long, Long> getOffsetNextVisibleTime() {
+ return offsetNextVisibleTime;
+ }
+
+ public void setOffsetNextVisibleTime(Map<Long, Long>
offsetNextVisibleTime) {
+ this.offsetNextVisibleTime = offsetNextVisibleTime;
+ }
+
+ public Map<Long, Integer> getOffsetConsumedCount() {
+ return offsetConsumedCount;
+ }
+
+ public void setOffsetConsumedCount(Map<Long, Integer>
offsetConsumedCount) {
+ this.offsetConsumedCount = offsetConsumedCount;
+ }
+
+ public static List<Long> buildOffsetList(List<Long> queueOffsetList) {
+ List<Long> simple = new ArrayList<>();
+ if (queueOffsetList.size() == 1) {
+ simple.addAll(queueOffsetList);
+ return simple;
+ }
+ Long first = queueOffsetList.get(0);
+ simple.add(first);
+ for (int i = 1; i < queueOffsetList.size(); i++) {
+ simple.add(queueOffsetList.get(i) - first);
+ }
+ return simple;
+ }
+
+ @JSONField(serialize = false, deserialize = false)
+ public boolean needBlock(long currentInvisibleTime) {
+ if (offsetList == null || offsetList.isEmpty()) {
+ return false;
+ }
+ int num = offsetList.size();
+ int i = 0;
+ if (this.invisibleTime == null || this.invisibleTime <= 0) {
+ this.invisibleTime = currentInvisibleTime;
+ }
+ long currentTime = System.currentTimeMillis();
+ for (; i < num; i++) {
+ if (isNotAck(i)) {
+ long nextVisibleTime = popTime + invisibleTime;
+ if (offsetNextVisibleTime != null) {
+ Long time =
offsetNextVisibleTime.get(this.getQueueOffset(i));
+ if (time != null) {
+ nextVisibleTime = time;
+ }
+ }
+ if (currentTime < nextVisibleTime) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
@JSONField(serialize = false, deserialize = false)
- public boolean isDone() {
+ public Long getLockFreeTimestamp() {
if (offsetList == null || offsetList.isEmpty()) {
- return true;
+ return null;
}
int num = offsetList.size();
- for (byte i = 0; i < num; i++) {
- if ((commitOffsetBit & (1L << i)) == 0) {
- return false;
+ int i = 0;
+ long currentTime = System.currentTimeMillis();
+ for (; i < num; i++) {
+ if (isNotAck(i)) {
+ if (invisibleTime == null || invisibleTime <= 0) {
+ return null;
+ }
+ long nextVisibleTime = popTime + invisibleTime;
+ if (offsetNextVisibleTime != null) {
+ Long time =
offsetNextVisibleTime.get(this.getQueueOffset(i));
+ if (time != null) {
+ nextVisibleTime = time;
+ }
+ }
+ if (currentTime < nextVisibleTime) {
+ return nextVisibleTime;
+ }
}
}
- return true;
+ return currentTime;
+ }
+
+ @JSONField(serialize = false, deserialize = false)
+ public void updateOffsetNextVisibleTime(long queueOffset, long
nextVisibleTime) {
+ if (this.offsetNextVisibleTime == null) {
+ this.offsetNextVisibleTime = new HashMap<>();
+ }
+ this.offsetNextVisibleTime.put(queueOffset, nextVisibleTime);
}
@JSONField(serialize = false, deserialize = false)
- public String encode() {
- StringBuilder sb = new StringBuilder();
- sb.append("{").append("\"c\":").append(getConsumedCount());
- sb.append(",").append("\"cm\":").append(getCommitOffsetBit());
- sb.append(",").append("\"l\":").append(getLastConsumeTimestamp());
- sb.append(",").append("\"o\":[");
- if (getOffsetList() != null) {
- for (int i = 0; i < getOffsetList().size(); i++) {
- sb.append(getOffsetList().get(i));
- if (i < getOffsetList().size() - 1) {
- sb.append(",");
+ public long getNextOffset() {
+ if (offsetList == null || offsetList.isEmpty()) {
+ return -2;
+ }
+ int num = offsetList.size();
+ int i = 0;
+ for (; i < num; i++) {
+ if (isNotAck(i)) {
+ break;
+ }
+ }
+ if (i == num) {
+ // all ack
+ return getQueueOffset(num - 1) + 1;
+ }
+ return getQueueOffset(i);
+ }
+
+ /**
+ * convert the offset at the index of offsetList to queue offset
+ *
+ * @param offsetIndex the index of offsetList
+ * @return queue offset of message
+ */
+ @JSONField(serialize = false, deserialize = false)
+ public long getQueueOffset(int offsetIndex) {
+ return getQueueOffset(this.offsetList, offsetIndex);
+ }
+
+ protected static long getQueueOffset(List<Long> offsetList, int
offsetIndex) {
+ if (offsetIndex == 0) {
+ return offsetList.get(0);
+ }
+ return offsetList.get(0) + offsetList.get(offsetIndex);
+ }
+
+ @JSONField(serialize = false, deserialize = false)
+ public boolean isNotAck(int offsetIndex) {
+ return (commitOffsetBit & (1L << offsetIndex)) == 0;
+ }
+
+ /**
+ * calculate message consumed count of each message, and put nonzero
value into offsetConsumedCount
+ *
+ * @param prevOffsetConsumedCount the offset list of message
+ */
+ @JSONField(serialize = false, deserialize = false)
+ public void mergeOffsetConsumedCount(List<Long> preOffsetList,
Map<Long, Integer> prevOffsetConsumedCount) {
+ Map<Long, Integer> offsetConsumedCount = new HashMap<>();
+ if (prevOffsetConsumedCount == null) {
+ prevOffsetConsumedCount = new HashMap<>();
+ }
+ Set<Long> preQueueOffsetSet = new HashSet<>();
+ for (int i = 0; i < preOffsetList.size(); i++) {
+ preQueueOffsetSet.add(getQueueOffset(preOffsetList, i));
+ }
+ for (int i = 0; i < offsetList.size(); i++) {
+ long queueOffset = this.getQueueOffset(i);
+ if (preQueueOffsetSet.contains(queueOffset)) {
+ int count = 1;
+ Integer preCount =
prevOffsetConsumedCount.get(queueOffset);
+ if (preCount != null) {
+ count = preCount + 1;
}
+ offsetConsumedCount.put(queueOffset, count);
}
}
- sb.append("]").append("}");
- return sb.toString();
+ this.offsetConsumedCount = offsetConsumedCount;
}
@Override
public String toString() {
- final StringBuilder sb = new StringBuilder("OrderInfo");
- sb.append("@").append(this.hashCode());
- sb.append("{offsetList=").append(offsetList);
- sb.append(", consumedCount=").append(consumedCount);
- sb.append(", lastConsumeTimestamp=").append(lastConsumeTimestamp);
- sb.append(", commitOffsetBit=").append(commitOffsetBit);
- sb.append(", isDone=").append(isDone());
- sb.append('}');
- return sb.toString();
+ return MoreObjects.toStringHelper(this)
+ .add("popTime", popTime)
+ .add("invisibleTime", invisibleTime)
+ .add("offsetList", offsetList)
+ .add("offsetNextVisibleTime", offsetNextVisibleTime)
+ .add("offsetConsumedCount", offsetConsumedCount)
+ .add("lastConsumeTimestamp", lastConsumeTimestamp)
+ .add("commitOffsetBit", commitOffsetBit)
+ .toString();
}
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 8bd3c613c..9493deab8 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -164,7 +164,8 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
}
long nextOffset =
brokerController.getConsumerOrderInfoManager().commitAndNext(
requestHeader.getTopic(), requestHeader.getConsumerGroup(),
- requestHeader.getQueueId(), requestHeader.getOffset());
+ requestHeader.getQueueId(), requestHeader.getOffset(),
+ ExtraInfoUtil.getPopTime(extraInfo));
if (nextOffset > -1) {
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index 76c1b908e..b1092db23 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -96,6 +96,10 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
String[] extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
+ if (ExtraInfoUtil.isOrder(extraInfo)) {
+ return processChangeInvisibleTimeForOrder(requestHeader,
extraInfo, response, responseHeader);
+ }
+
// add new ck
long now = System.currentTimeMillis();
PutMessageResult ckResult = appendCheckPoint(requestHeader,
ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(),
requestHeader.getOffset(), now, ExtraInfoUtil.getBrokerName(extraInfo));
@@ -123,6 +127,35 @@ public class ChangeInvisibleTimeProcessor implements
NettyRequestProcessor {
return response;
}
+ protected RemotingCommand
processChangeInvisibleTimeForOrder(ChangeInvisibleTimeRequestHeader
requestHeader, String[] extraInfo, RemotingCommand response,
ChangeInvisibleTimeResponseHeader responseHeader) {
+ long popTime = ExtraInfoUtil.getPopTime(extraInfo);
+ long oldOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
+ requestHeader.getTopic(), requestHeader.getQueueId());
+ if (requestHeader.getOffset() < oldOffset) {
+ return response;
+ }
+ while
(!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getQueueId())) {
+ }
+ try {
+ oldOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
+ requestHeader.getTopic(), requestHeader.getQueueId());
+ if (requestHeader.getOffset() < oldOffset) {
+ return response;
+ }
+
+ long nextVisibleTime = System.currentTimeMillis() +
requestHeader.getInvisibleTime();
+
this.brokerController.getConsumerOrderInfoManager().updateNextVisibleTime(
+ requestHeader.getTopic(), requestHeader.getConsumerGroup(),
requestHeader.getQueueId(), requestHeader.getOffset(), popTime,
nextVisibleTime);
+
+ responseHeader.setInvisibleTime(nextVisibleTime - popTime);
+ responseHeader.setPopTime(popTime);
+ responseHeader.setReviveQid(ExtraInfoUtil.getReviveQid(extraInfo));
+ } finally {
+
this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getQueueId());
+ }
+ return response;
+ }
+
private void ackOrigin(final ChangeInvisibleTimeRequestHeader
requestHeader, String[] extraInfo) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
AckMsg ackMsg = new AckMsg();
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 0d2c5f9b5..df85fc7e9 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -522,12 +522,12 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
getMessageTmpResult.getBufferTotalSize());
if (isOrder) {
- int count =
brokerController.getConsumerOrderInfoManager().update(topic,
+
this.brokerController.getConsumerOrderInfoManager().update(isRetry, topic,
requestHeader.getConsumerGroup(),
- queueId, getMessageTmpResult.getMessageQueueOffset());
+ queueId, popTime, requestHeader.getInvisibleTime(),
getMessageTmpResult.getMessageQueueOffset(),
+ orderCountInfo);
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
requestHeader.getConsumerGroup(), topic, queueId,
offset);
- ExtraInfoUtil.buildOrderCountInfo(orderCountInfo, isRetry,
queueId, count);
} else {
appendCheckPoint(requestHeader, topic, reviveQid, queueId,
offset, getMessageTmpResult, popTime,
this.brokerController.getBrokerConfig().getBrokerName());
}
@@ -901,6 +901,14 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
public class QueueLockManager extends ServiceThread {
private ConcurrentHashMap<String, TimedLock> expiredLocalCache = new
ConcurrentHashMap<>(100000);
+ public String buildLockKey(String topic, String consumerGroup, int
queueId) {
+ return topic + PopAckConstants.SPLIT + consumerGroup +
PopAckConstants.SPLIT + queueId;
+ }
+
+ public boolean tryLock(String topic, String consumerGroup, int
queueId) {
+ return tryLock(buildLockKey(topic, consumerGroup, queueId));
+ }
+
public boolean tryLock(String key) {
TimedLock timedLock = expiredLocalCache.get(key);
@@ -946,6 +954,10 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
return total;
}
+ public void unLock(String topic, String consumerGroup, int queueId) {
+ unLock(buildLockKey(topic, consumerGroup, queueId));
+ }
+
public void unLock(String key) {
TimedLock timedLock = expiredLocalCache.get(key);
if (timedLock != null) {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
new file mode 100644
index 000000000..b7bb075a4
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.offset;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.assertj.core.util.Lists;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConsumerOrderInfoManagerTest {
+
+ private static final String TOPIC = "topic";
+ private static final String GROUP = "group";
+ private static final int QUEUE_ID_0 = 0;
+ private static final int QUEUE_ID_1 = 1;
+
+ private long popTime;
+ private ConsumerOrderInfoManager consumerOrderInfoManager;
+
+ @Before
+ public void before() {
+ consumerOrderInfoManager = new ConsumerOrderInfoManager();
+ popTime = System.currentTimeMillis();
+ }
+
+ @Test
+ public void testCommitAndNext() {
+ consumerOrderInfoManager.update(
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 3000,
+ Lists.newArrayList(1L),
+ new StringBuilder()
+ );
+ assertEncodeAndDecode();
+ assertEquals(-2, consumerOrderInfoManager.commitAndNext(
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ 1L,
+ popTime - 10
+ ));
+ assertEncodeAndDecode();
+ assertTrue(consumerOrderInfoManager.checkBlock(
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ TimeUnit.SECONDS.toMillis(3)
+ ));
+
+ assertEquals(2, consumerOrderInfoManager.commitAndNext(
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ 1L,
+ popTime
+ ));
+ assertEncodeAndDecode();
+ assertFalse(consumerOrderInfoManager.checkBlock(
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ TimeUnit.SECONDS.toMillis(3)
+ ));
+ }
+
+ @Test
+ public void testConsumedCount() {
+ {
+ // consume three new messages
+ StringBuilder orderInfoBuilder = new StringBuilder();
+ consumerOrderInfoManager.update(
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 3000,
+ Lists.newArrayList(1L, 2L, 3L),
+ orderInfoBuilder
+ );
+ assertEncodeAndDecode();
+ Map<String, Integer> orderInfoMap =
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+ assertEquals(1, orderInfoMap.size());
+ assertEquals(0,
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC,
QUEUE_ID_0)).intValue());
+ }
+
+ {
+ // reconsume same messages
+ StringBuilder orderInfoBuilder = new StringBuilder();
+ consumerOrderInfoManager.update(
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 3000,
+ Lists.newArrayList(1L, 2L, 3L),
+ orderInfoBuilder
+ );
+ assertEncodeAndDecode();
+ Map<String, Integer> orderInfoMap =
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+ assertEquals(4, orderInfoMap.size());
+ assertEquals(1,
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC,
QUEUE_ID_0)).intValue());
+ for (int i = 1; i <= 3; i++) {
+ assertEquals(1,
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0,
i)).intValue());
+ }
+ }
+
+ {
+ // reconsume last two message
+ StringBuilder orderInfoBuilder = new StringBuilder();
+ consumerOrderInfoManager.update(
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 3000,
+ Lists.newArrayList(2L, 3L),
+ orderInfoBuilder
+ );
+ assertEncodeAndDecode();
+ Map<String, Integer> orderInfoMap =
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+ assertEquals(3, orderInfoMap.size());
+ assertEquals(2,
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC,
QUEUE_ID_0)).intValue());
+ for (int i = 2; i <= 3; i++) {
+ assertEquals(2,
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0,
i)).intValue());
+ }
+ }
+
+ {
+ // consume a new message and reconsume last message
+ StringBuilder orderInfoBuilder = new StringBuilder();
+ consumerOrderInfoManager.update(
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 3000,
+ Lists.newArrayList(3L, 4L),
+ orderInfoBuilder
+ );
+ assertEncodeAndDecode();
+ Map<String, Integer> orderInfoMap =
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+ assertEquals(2, orderInfoMap.size());
+ assertEquals(0,
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC,
QUEUE_ID_0)).intValue());
+ assertEquals(3,
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0,
3)).intValue());
+ }
+
+ {
+ // consume two new messages
+ StringBuilder orderInfoBuilder = new StringBuilder();
+ consumerOrderInfoManager.update(
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 3000,
+ Lists.newArrayList(5L, 6L),
+ orderInfoBuilder
+ );
+ assertEncodeAndDecode();
+ Map<String, Integer> orderInfoMap =
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+ assertEquals(1, orderInfoMap.size());
+ assertEquals(0,
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC,
QUEUE_ID_0)).intValue());
+ }
+ }
+
+ @Test
+ public void testConsumedCountForMultiQueue() {
+ {
+ // consume two new messages
+ StringBuilder orderInfoBuilder = new StringBuilder();
+ consumerOrderInfoManager.update(
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 3000,
+ Lists.newArrayList(0L),
+ orderInfoBuilder
+ );
+ consumerOrderInfoManager.update(
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_1,
+ popTime,
+ 3000,
+ Lists.newArrayList(0L),
+ orderInfoBuilder
+ );
+ assertEncodeAndDecode();
+ Map<String, Integer> orderInfoMap =
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+ assertEquals(2, orderInfoMap.size());
+ assertEquals(0,
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC,
QUEUE_ID_0)).intValue());
+ assertEquals(0,
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC,
QUEUE_ID_1)).intValue());
+ }
+ {
+ // reconsume two message
+ StringBuilder orderInfoBuilder = new StringBuilder();
+ consumerOrderInfoManager.update(
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 3000,
+ Lists.newArrayList(0L),
+ orderInfoBuilder
+ );
+ consumerOrderInfoManager.update(
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_1,
+ popTime,
+ 3000,
+ Lists.newArrayList(0L),
+ orderInfoBuilder
+ );
+ assertEncodeAndDecode();
+ Map<String, Integer> orderInfoMap =
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+ assertEquals(4, orderInfoMap.size());
+ assertEquals(1,
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC,
QUEUE_ID_0)).intValue());
+ assertEquals(1,
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC,
QUEUE_ID_1)).intValue());
+ assertEquals(1,
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0,
0L)).intValue());
+ assertEquals(1,
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_1,
0L)).intValue());
+ }
+ {
+ // reconsume with a new message
+ StringBuilder orderInfoBuilder = new StringBuilder();
+ consumerOrderInfoManager.update(
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 3000,
+ Lists.newArrayList(0L, 1L),
+ orderInfoBuilder
+ );
+ consumerOrderInfoManager.update(
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_1,
+ popTime,
+ 3000,
+ Lists.newArrayList(0L),
+ orderInfoBuilder
+ );
+ assertEncodeAndDecode();
+ Map<String, Integer> orderInfoMap =
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+ assertEquals(4, orderInfoMap.size());
+ assertEquals(0,
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC,
QUEUE_ID_0)).intValue());
+ assertEquals(2,
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC,
QUEUE_ID_1)).intValue());
+ assertEquals(2,
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0,
0L)).intValue());
+
assertNull(orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC,
QUEUE_ID_0, 1L)));
+ assertEquals(2,
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_1,
0L)).intValue());
+ }
+ }
+
+ @Test
+ public void testUpdateNextVisibleTime() {
+ long invisibleTime = 3000;
+
+ StringBuilder orderInfoBuilder = new StringBuilder();
+ consumerOrderInfoManager.update(
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 1,
+ Lists.newArrayList(1L, 2L, 3L),
+ orderInfoBuilder
+ );
+
+ consumerOrderInfoManager.updateNextVisibleTime(TOPIC, GROUP,
QUEUE_ID_0, 2L, popTime, System.currentTimeMillis() + invisibleTime);
+ assertEncodeAndDecode();
+
+ assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP,
QUEUE_ID_0, 1L, popTime));
+ assertEncodeAndDecode();
+ assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP,
QUEUE_ID_0, 3L, popTime));
+ assertEncodeAndDecode();
+
+ await().atMost(Duration.ofSeconds(invisibleTime + 1)).until(() ->
!consumerOrderInfoManager.checkBlock(TOPIC, GROUP, QUEUE_ID_0, invisibleTime));
+
+ orderInfoBuilder = new StringBuilder();
+ consumerOrderInfoManager.update(
+ false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 1,
+ Lists.newArrayList(2L, 3L, 4L),
+ orderInfoBuilder
+ );
+
+ consumerOrderInfoManager.updateNextVisibleTime(TOPIC, GROUP,
QUEUE_ID_0, 2L, popTime, System.currentTimeMillis() + invisibleTime);
+ assertEncodeAndDecode();
+
+ assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP,
QUEUE_ID_0, 3L, popTime));
+ assertEncodeAndDecode();
+ assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP,
QUEUE_ID_0, 4L, popTime));
+ assertEncodeAndDecode();
+ assertTrue(consumerOrderInfoManager.checkBlock(TOPIC, GROUP,
QUEUE_ID_0, invisibleTime));
+
+ assertEquals(5L, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP,
QUEUE_ID_0, 2L, popTime));
+ assertEncodeAndDecode();
+ assertFalse(consumerOrderInfoManager.checkBlock(TOPIC, GROUP,
QUEUE_ID_0, invisibleTime));
+ }
+
+ @Test
+ public void testAutoCleanAndEncode() {
+ BrokerConfig brokerConfig = new BrokerConfig();
+ BrokerController brokerController = mock(BrokerController.class);
+ TopicConfigManager topicConfigManager = mock(TopicConfigManager.class);
+ when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+
+ SubscriptionGroupManager subscriptionGroupManager =
mock(SubscriptionGroupManager.class);
+
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
+ ConcurrentMap<String, SubscriptionGroupConfig>
subscriptionGroupConfigConcurrentMap = new ConcurrentHashMap<>();
+ subscriptionGroupConfigConcurrentMap.put(GROUP, new
SubscriptionGroupConfig());
+
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(subscriptionGroupConfigConcurrentMap);
+
+ TopicConfig topicConfig = new TopicConfig(TOPIC);
+
when(topicConfigManager.selectTopicConfig(eq(TOPIC))).thenReturn(topicConfig);
+
+ ConsumerOrderInfoManager consumerOrderInfoManager = new
ConsumerOrderInfoManager(brokerController);
+
+ {
+ consumerOrderInfoManager.update(false,
+ "errTopic",
+ "errGroup",
+ QUEUE_ID_0,
+ popTime,
+ 1,
+ Lists.newArrayList(2L, 3L, 4L),
+ new StringBuilder());
+
+ consumerOrderInfoManager.autoClean();
+ assertEquals(0, consumerOrderInfoManager.getTable().size());
+ }
+ {
+ consumerOrderInfoManager.update(false,
+ TOPIC,
+ "errGroup",
+ QUEUE_ID_0,
+ popTime,
+ 1,
+ Lists.newArrayList(2L, 3L, 4L),
+ new StringBuilder());
+
+ consumerOrderInfoManager.autoClean();
+ assertEquals(0, consumerOrderInfoManager.getTable().size());
+ }
+ {
+ topicConfig.setReadQueueNums(0);
+ consumerOrderInfoManager.update(false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 1,
+ Lists.newArrayList(2L, 3L, 4L),
+ new StringBuilder());
+
+ await().atMost(Duration.ofSeconds(1)).until(() -> {
+ consumerOrderInfoManager.autoClean();
+ return consumerOrderInfoManager.getTable().size() == 0;
+ });
+ }
+ {
+ topicConfig.setReadQueueNums(8);
+ consumerOrderInfoManager.update(false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 1,
+ Lists.newArrayList(2L, 3L, 4L),
+ new StringBuilder());
+
+ consumerOrderInfoManager.autoClean();
+ assertEquals(1, consumerOrderInfoManager.getTable().size());
+ for (ConcurrentHashMap<Integer,
ConsumerOrderInfoManager.OrderInfo> orderInfoMap :
consumerOrderInfoManager.getTable().values()) {
+ assertEquals(1, orderInfoMap.size());
+ assertNotNull(orderInfoMap.get(QUEUE_ID_0));
+ break;
+ }
+ }
+ }
+
+ private void assertEncodeAndDecode() {
+ ConsumerOrderInfoManager.OrderInfo prevOrderInfo =
consumerOrderInfoManager.getTable().values().stream().findFirst()
+ .get().get(QUEUE_ID_0);
+
+ String dataEncoded = consumerOrderInfoManager.encode();
+
+ consumerOrderInfoManager.decode(dataEncoded);
+ ConsumerOrderInfoManager.OrderInfo newOrderInfo =
consumerOrderInfoManager.getTable().values().stream().findFirst()
+ .get().get(QUEUE_ID_0);
+
+ assertNotSame(prevOrderInfo, newOrderInfo);
+ assertEquals(prevOrderInfo.getPopTime(), newOrderInfo.getPopTime());
+ assertEquals(prevOrderInfo.getInvisibleTime(),
newOrderInfo.getInvisibleTime());
+ assertEquals(prevOrderInfo.getOffsetList(),
newOrderInfo.getOffsetList());
+ assertEquals(prevOrderInfo.getOffsetConsumedCount(),
newOrderInfo.getOffsetConsumedCount());
+ assertEquals(prevOrderInfo.getOffsetNextVisibleTime(),
newOrderInfo.getOffsetNextVisibleTime());
+ assertEquals(prevOrderInfo.getLastConsumeTimestamp(),
newOrderInfo.getLastConsumeTimestamp());
+ assertEquals(prevOrderInfo.getCommitOffsetBit(),
newOrderInfo.getCommitOffsetBit());
+ }
+
+ @Test
+ public void testLoadFromOldVersionOrderInfoData() {
+ consumerOrderInfoManager.update(false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 1,
+ Lists.newArrayList(2L, 3L, 4L),
+ new StringBuilder());
+ ConsumerOrderInfoManager.OrderInfo orderInfo =
consumerOrderInfoManager.getTable().values().stream().findFirst()
+ .get().get(QUEUE_ID_0);
+
+ orderInfo.setInvisibleTime(null);
+ orderInfo.setOffsetConsumedCount(null);
+ orderInfo.setOffsetNextVisibleTime(null);
+
+ String dataEncoded = consumerOrderInfoManager.encode();
+
+ consumerOrderInfoManager.decode(dataEncoded);
+ assertTrue(consumerOrderInfoManager.checkBlock(TOPIC, GROUP,
QUEUE_ID_0, 3000));
+
+ StringBuilder orderInfoBuilder = new StringBuilder();
+ consumerOrderInfoManager.update(false,
+ TOPIC,
+ GROUP,
+ QUEUE_ID_0,
+ popTime,
+ 1,
+ Lists.newArrayList(3L, 4L, 5L),
+ orderInfoBuilder);
+ assertEncodeAndDecode();
+ Map<String, Integer> orderInfoMap =
ExtraInfoUtil.parseOrderCountInfo(orderInfoBuilder.toString());
+ assertEquals(3, orderInfoMap.size());
+ assertEquals(0,
orderInfoMap.get(ExtraInfoUtil.getStartOffsetInfoMapKey(TOPIC,
QUEUE_ID_0)).intValue());
+ assertEquals(1,
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0,
3)).intValue());
+ assertEquals(1,
orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0,
4)).intValue());
+ }
+}
\ No newline at end of file
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 5f393cb57..854fb73a6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1101,19 +1101,23 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
}
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) +
MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
} else {
- String key =
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
messageExt.getQueueId());
- int index =
sortMap.get(key).indexOf(messageExt.getQueueOffset());
- Long msgQueueOffset =
msgOffsetInfo.get(key).get(index);
+ String queueIdKey =
ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(),
messageExt.getQueueId());
+ String queueOffsetKey =
ExtraInfoUtil.getQueueOffsetMapKey(messageExt.getTopic(),
messageExt.getQueueId(), messageExt.getQueueOffset());
+ int index =
sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset());
+ Long msgQueueOffset =
msgOffsetInfo.get(queueIdKey).get(index);
if (msgQueueOffset != messageExt.getQueueOffset()) {
log.warn("Queue offset[%d] of msg is strange, not
equal to the stored in msg, %s", msgQueueOffset, messageExt);
}
messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
-
ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(key).longValue(),
responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
- responseHeader.getReviveQid(),
messageExt.getTopic(), brokerName, messageExt.getQueueId(),
msgQueueOffset.longValue())
+
ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey),
responseHeader.getPopTime(), responseHeader.getInvisibleTime(),
+ responseHeader.getReviveQid(),
messageExt.getTopic(), brokerName, messageExt.getQueueId(), msgQueueOffset)
);
if (((PopMessageRequestHeader)
requestHeader).isOrder() && orderCountInfo != null) {
- Integer count = orderCountInfo.get(key);
+ Integer count = orderCountInfo.get(queueOffsetKey);
+ if (count == null) {
+ count = orderCountInfo.get(queueIdKey);
+ }
if (count != null && count > 0) {
messageExt.setReconsumeTimes(count);
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java
index 9a777208a..442060456 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtil.java
@@ -27,6 +27,7 @@ import org.apache.rocketmq.common.message.MessageConst;
public class ExtraInfoUtil {
private static final String NORMAL_TOPIC = "0";
private static final String RETRY_TOPIC = "1";
+ private static final String QUEUE_OFFSET = "qo";
public static String[] split(String extraInfo) {
if (extraInfo == null) {
@@ -131,7 +132,7 @@ public class ExtraInfoUtil {
.append(MessageConst.KEY_SEPARATOR).append(startOffset);
}
- public static void buildOrderCountInfo(StringBuilder stringBuilder,
boolean retry, int queueId, int orderCount) {
+ public static void buildQueueIdOrderCountInfo(StringBuilder stringBuilder,
boolean retry, int queueId, int orderCount) {
if (stringBuilder == null) {
stringBuilder = new StringBuilder(64);
}
@@ -145,6 +146,20 @@ public class ExtraInfoUtil {
.append(MessageConst.KEY_SEPARATOR).append(orderCount);
}
+ public static void buildQueueOffsetOrderCountInfo(StringBuilder
stringBuilder, boolean retry, long queueId, long queueOffset, int orderCount) {
+ if (stringBuilder == null) {
+ stringBuilder = new StringBuilder(64);
+ }
+
+ if (stringBuilder.length() > 0) {
+ stringBuilder.append(";");
+ }
+
+ stringBuilder.append(retry ? RETRY_TOPIC : NORMAL_TOPIC)
+
.append(MessageConst.KEY_SEPARATOR).append(getQueueOffsetKeyValueKey(queueId,
queueOffset))
+ .append(MessageConst.KEY_SEPARATOR).append(orderCount);
+ }
+
public static void buildMsgOffsetInfo(StringBuilder stringBuilder, boolean
retry, int queueId, List<Long> msgOffsets) {
if (stringBuilder == null) {
stringBuilder = new StringBuilder(64);
@@ -252,7 +267,19 @@ public class ExtraInfoUtil {
return startOffsetMap;
}
- public static String getStartOffsetInfoMapKey(String topic, int queueId) {
- return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ?
RETRY_TOPIC : NORMAL_TOPIC) + "@" + queueId;
+ public static String getStartOffsetInfoMapKey(String topic, long key) {
+ return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ?
RETRY_TOPIC : NORMAL_TOPIC) + "@" + key;
+ }
+
+ public static String getQueueOffsetKeyValueKey(long queueId, long
queueOffset) {
+ return QUEUE_OFFSET + queueId + "%" + queueOffset;
+ }
+
+ public static String getQueueOffsetMapKey(String topic, long queueId, long
queueOffset) {
+ return (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) ?
RETRY_TOPIC : NORMAL_TOPIC) + "@" + getQueueOffsetKeyValueKey(queueId,
queueOffset);
+ }
+
+ public static boolean isOrder(String[] extraInfo) {
+ return ExtraInfoUtil.getReviveQid(extraInfo) ==
KeyBuilder.POP_ORDER_REVIVE_QUEUE;
}
}
diff --git
a/common/src/test/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtilTest.java
b/common/src/test/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtilTest.java
new file mode 100644
index 000000000..2da78b6e4
--- /dev/null
+++
b/common/src/test/java/org/apache/rocketmq/common/protocol/header/ExtraInfoUtilTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import java.util.Map;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ExtraInfoUtilTest {
+
+ @Test
+ public void testOrderCountInfo() {
+ String topic = "TOPIC";
+ int queueId = 0;
+ long queueOffset = 1234;
+
+ Integer queueIdCount = 1;
+ Integer queueOffsetCount = 2;
+
+ String queueIdKey = ExtraInfoUtil.getStartOffsetInfoMapKey(topic,
queueId);
+ String queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(topic,
queueId, queueOffset);
+
+ StringBuilder sb = new StringBuilder();
+ ExtraInfoUtil.buildQueueIdOrderCountInfo(sb, false, queueId,
queueIdCount);
+ ExtraInfoUtil.buildQueueOffsetOrderCountInfo(sb, false, queueId,
queueOffset, queueOffsetCount);
+ Map<String, Integer> orderCountInfo =
ExtraInfoUtil.parseOrderCountInfo(sb.toString());
+
+ assertEquals(queueIdCount, orderCountInfo.get(queueIdKey));
+ assertEquals(queueOffsetCount, orderCountInfo.get(queueOffsetKey));
+ }
+}
\ No newline at end of file
diff --git
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
new file mode 100644
index 000000000..558acb804
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.AckCallback;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.PopCallback;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader;
+import
org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.test.clientinterface.MQConsumer;
+import org.apache.rocketmq.test.util.RandomUtil;
+
+public class RMQPopClient implements MQConsumer {
+ private static final long DEFAULT_TIMEOUT = 3000;
+ private MQClientAPIImpl mqClientAPI;
+
+ @Override
+ public void create() {
+ create(false);
+ }
+
+ @Override
+ public void create(boolean useTLS) {
+ ClientConfig clientConfig = new ClientConfig();
+ clientConfig.setInstanceName(RandomUtil.getStringByUUID());
+
+ NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ nettyClientConfig.setUseTLS(useTLS);
+ this.mqClientAPI = new MQClientAPIImpl(nettyClientConfig,
+ new ClientRemotingProcessor(null),
+ null,
+ clientConfig);
+ }
+
+ @Override
+ public void start() {
+ this.mqClientAPI.start();
+ }
+
+ @Override
+ public void shutdown() {
+ this.mqClientAPI.shutdown();
+ }
+
+ public CompletableFuture<PopResult> popMessageAsync(String brokerAddr,
MessageQueue mq, long invisibleTime,
+ int maxNums, String consumerGroup, long timeout, boolean poll, int
initMode, boolean order,
+ String expressionType, String expression) {
+ PopMessageRequestHeader requestHeader = new PopMessageRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ requestHeader.setTopic(mq.getTopic());
+ requestHeader.setQueueId(mq.getQueueId());
+ requestHeader.setMaxMsgNums(maxNums);
+ requestHeader.setInvisibleTime(invisibleTime);
+ requestHeader.setInitMode(initMode);
+ requestHeader.setExpType(expressionType);
+ requestHeader.setExp(expression);
+ requestHeader.setOrder(order);
+ if (poll) {
+ requestHeader.setPollTime(timeout);
+ requestHeader.setBornTime(System.currentTimeMillis());
+ timeout += 10 * 1000;
+ }
+ CompletableFuture<PopResult> future = new CompletableFuture<>();
+ try {
+ this.mqClientAPI.popMessageAsync(mq.getBrokerName(), brokerAddr,
requestHeader, timeout, new PopCallback() {
+ @Override
+ public void onSuccess(PopResult popResult) {
+ future.complete(popResult);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ future.completeExceptionally(e);
+ }
+ });
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
+ }
+
+ public CompletableFuture<AckResult> ackMessageAsync(String brokerAddr,
String topic, String consumerGroup,
+ String extraInfo) {
+ String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
+ AckMessageRequestHeader requestHeader = new AckMessageRequestHeader();
+ requestHeader.setTopic(ExtraInfoUtil.getRealTopic(extraInfoStrs,
topic, consumerGroup));
+ requestHeader.setQueueId(ExtraInfoUtil.getQueueId(extraInfoStrs));
+ requestHeader.setOffset(ExtraInfoUtil.getQueueOffset(extraInfoStrs));
+ requestHeader.setConsumerGroup(consumerGroup);
+ requestHeader.setExtraInfo(extraInfo);
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
+ try {
+ this.mqClientAPI.ackMessageAsync(brokerAddr, DEFAULT_TIMEOUT, new
AckCallback() {
+ @Override
+ public void onSuccess(AckResult ackResult) {
+ future.complete(ackResult);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ future.completeExceptionally(e);
+ }
+ }, requestHeader);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
+ }
+
+ public CompletableFuture<AckResult> changeInvisibleTimeAsync(String
brokerAddr, String brokerName, String topic,
+ String consumerGroup, String extraInfo, long invisibleTime) {
+ String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
+ ChangeInvisibleTimeRequestHeader requestHeader = new
ChangeInvisibleTimeRequestHeader();
+ requestHeader.setTopic(ExtraInfoUtil.getRealTopic(extraInfoStrs,
topic, consumerGroup));
+ requestHeader.setQueueId(ExtraInfoUtil.getQueueId(extraInfoStrs));
+ requestHeader.setOffset(ExtraInfoUtil.getQueueOffset(extraInfoStrs));
+ requestHeader.setConsumerGroup(consumerGroup);
+ requestHeader.setExtraInfo(extraInfo);
+ requestHeader.setInvisibleTime(invisibleTime);
+
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
+ try {
+ this.mqClientAPI.changeInvisibleTimeAsync(brokerName, brokerAddr,
requestHeader, DEFAULT_TIMEOUT, new AckCallback() {
+ @Override
+ public void onSuccess(AckResult ackResult) {
+ future.complete(ackResult);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ future.completeExceptionally(e);
+ }
+ });
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
+ }
+}
diff --git
a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
index d530db98b..27f5dcbdd 100644
--- a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
@@ -22,6 +22,7 @@ import
org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQPopClient;
import org.apache.rocketmq.test.client.rmq.RMQPopConsumer;
import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
import org.apache.rocketmq.test.listener.AbstractListener;
@@ -73,6 +74,13 @@ public class ConsumerFactory {
return consumer;
}
+ public static RMQPopClient getRMQPopClient() {
+ RMQPopClient client = new RMQPopClient();
+ client.create();
+ client.start();
+ return client;
+ }
+
public static DefaultMQPullConsumer getRMQPullConsumer(String nsAddr,
String consumerGroup) throws Exception {
DefaultMQPullConsumer defaultMQPullConsumer = new
DefaultMQPullConsumer(consumerGroup);
defaultMQPullConsumer.setInstanceName(UUID.randomUUID().toString());
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 035a8be68..079064c96 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -44,6 +44,7 @@ import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQTransactionalProducer;
import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
+import org.apache.rocketmq.test.clientinterface.MQConsumer;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.util.MQAdminTestUtils;
@@ -317,6 +318,8 @@ public class BaseConf {
((MQPullConsumer) mqClient).shutdown();
} else if (mqClient instanceof MQPushConsumer) {
((MQPushConsumer) mqClient).shutdown();
+ } else if (mqClient instanceof MQConsumer) {
+ ((MQConsumer) mqClient).shutdown();
}
}));
}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePop.java
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePop.java
new file mode 100644
index 000000000..29ff90261
--- /dev/null
+++
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePop.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.pop;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQPopClient;
+import org.apache.rocketmq.test.factory.ConsumerFactory;
+
+public class BasePop extends BaseConf {
+
+ public RMQPopClient getRMQPopClient() {
+ RMQPopClient client = ConsumerFactory.getRMQPopClient();
+ mqClients.add(client);
+ return client;
+ }
+
+ protected static class MsgRcv {
+ public final long rcvTime;
+ public final MessageExt messageExt;
+
+ public MsgRcv(long rcvTime, MessageExt messageExt) {
+ this.rcvTime = rcvTime;
+ this.messageExt = messageExt;
+ }
+ }
+}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
new file mode 100644
index 000000000..1ef40b281
--- /dev/null
+++
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.pop;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.IntegrationTestBase;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.client.rmq.RMQPopClient;
+import org.apache.rocketmq.test.util.MQRandomUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+
+import static org.junit.Assert.assertEquals;
+
+@Ignore
+public class BasePopOrderly extends BasePop {
+ protected static final long POP_TIMEOUT = 500;
+ protected String topic;
+ protected String group;
+ protected RMQNormalProducer producer = null;
+ protected RMQPopClient client = null;
+ protected String brokerAddr;
+ protected MessageQueue messageQueue;
+ protected final Map<String, List<MsgRcv>> msgRecv = new
ConcurrentHashMap<>();
+ protected final List<String> msgRecvSequence = new
CopyOnWriteArrayList<>();
+
+ @Before
+ public void setUp() {
+ brokerAddr = brokerController1.getBrokerAddr();
+ topic = MQRandomUtils.getRandomTopic();
+ group = initConsumerGroup();
+ IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 1,
CQType.SimpleCQ, TopicMessageType.FIFO);
+ producer = getProducer(NAMESRV_ADDR, topic);
+ client = getRMQPopClient();
+ messageQueue = new MessageQueue(topic, BROKER1_NAME, -1);
+ }
+
+ @After
+ public void tearDown() {
+ shutdown();
+ }
+
+ protected void assertMsgRecv(int seqId, int expectNum) {
+ String msgId = msgRecvSequence.get(seqId);
+ List<MsgRcv> msgRcvList = msgRecv.get(msgId);
+ assertEquals(expectNum, msgRcvList.size());
+ assertConsumeTimes(msgRcvList);
+ }
+
+ protected void assertConsumeTimes(List<MsgRcv> msgRcvList) {
+ for (int i = 0; i < msgRcvList.size(); i++) {
+ assertEquals(i, msgRcvList.get(i).messageExt.getReconsumeTimes());
+ }
+ }
+
+ protected void onRecvNewMessage(MessageExt messageExt) {
+ msgRecvSequence.add(messageExt.getMsgId());
+ msgRecv.compute(messageExt.getMsgId(), (k, msgRcvList) -> {
+ if (msgRcvList == null) {
+ msgRcvList = new CopyOnWriteArrayList<>();
+ }
+ msgRcvList.add(new MsgRcv(System.currentTimeMillis(), messageExt));
+ return msgRcvList;
+ });
+ }
+}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/ChangeInvisibleTimeMidMsgOrderlyIT.java
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/ChangeInvisibleTimeMidMsgOrderlyIT.java
new file mode 100644
index 000000000..ea1ac1e82
--- /dev/null
+++
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/ChangeInvisibleTimeMidMsgOrderlyIT.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.pop;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+public class ChangeInvisibleTimeMidMsgOrderlyIT extends BasePopOrderly {
+ /**
+ * send three messages (msg1, msg2, msg3, msg4) and the max message num of
pop request is three
+ * <p>
+ * ack msg1 and msg3, changeInvisibleTime msg2
+ * <p>
+ * expect the sequence of message received is: msg1, msg2, msg3, msg2,
msg3, msg4
+ */
+ @Test
+ public void test() {
+ producer.send(4);
+
+ await().atMost(Duration.ofSeconds(5)).until(() -> {
+ changeInvisibleTimeMidMessage().get();
+ return msgRecvSequence.size() == 6;
+ });
+
+ assertMsgRecv(0, 1);
+ assertMsgRecv(1, 2);
+ assertMsgRecv(2, 2);
+ assertMsgRecv(5, 1);
+
+ assertEquals(msgRecvSequence.get(1), msgRecvSequence.get(3));
+ assertEquals(msgRecvSequence.get(2), msgRecvSequence.get(4));
+ }
+
+ private CompletableFuture<Void> changeInvisibleTimeMidMessage() {
+ CompletableFuture<PopResult> future = client.popMessageAsync(
+ brokerAddr, messageQueue, 5000, 3, group, POP_TIMEOUT, true,
+ ConsumeInitMode.MIN, true, ExpressionType.TAG, "*");
+ CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+ future.whenComplete((popResult, throwable) -> {
+ if (throwable != null) {
+ resultFuture.completeExceptionally(throwable);
+ return;
+ }
+ if (popResult.getMsgFoundList() == null ||
popResult.getMsgFoundList().isEmpty()) {
+ resultFuture.complete(null);
+ return;
+ }
+ try {
+ for (MessageExt messageExt : popResult.getMsgFoundList()) {
+ onRecvNewMessage(messageExt);
+ if (msgRecv.size() != 2) {
+ try {
+ client.ackMessageAsync(brokerAddr, topic, group,
messageExt.getProperty(MessageConst.PROPERTY_POP_CK)).get();
+ } catch (Exception e) {
+ resultFuture.completeExceptionally(e);
+ return;
+ }
+ } else {
+ try {
+ TimeUnit.MILLISECONDS.sleep(1);
+ client.changeInvisibleTimeAsync(
+ brokerAddr, BROKER1_NAME, topic, group,
+
messageExt.getProperty(MessageConst.PROPERTY_POP_CK), 3000).get();
+ } catch (Exception e) {
+ resultFuture.completeExceptionally(e);
+ return;
+ }
+ }
+ }
+ resultFuture.complete(null);
+ } catch (Throwable t) {
+ resultFuture.completeExceptionally(t);
+ }
+ });
+ return resultFuture;
+ }
+}