This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d3de48c80 [ISSUE #5605]Introduce tag estimation for lag calculation
(#5606)
d3de48c80 is described below
commit d3de48c806a8b0c87b73fe7a6d49054892845487
Author: SSpirits <[email protected]>
AuthorDate: Wed Nov 30 14:52:59 2022 +0800
[ISSUE #5605]Introduce tag estimation for lag calculation (#5606)
* Introduce tag estimation for lag calculation
Co-authored-by: Li Zhanhui <[email protected]>
* fixed according to review comments
Co-authored-by: Li Zhanhui <[email protected]>
---
.../org/apache/rocketmq/common/BrokerConfig.java | 13 ++
.../org/apache/rocketmq/store/ConsumeQueue.java | 91 ++++++++-
.../apache/rocketmq/store/DefaultMessageStore.java | 31 +++
.../org/apache/rocketmq/store/MappedFileQueue.java | 25 ++-
.../org/apache/rocketmq/store/MessageStore.java | 11 ++
.../rocketmq/store/config/MessageStoreConfig.java | 27 ++-
.../rocketmq/store/queue/BatchConsumeQueue.java | 116 ++++++++++-
.../store/queue/ConsumeQueueInterface.java | 11 ++
.../store/queue/BatchConsumeMessageTest.java | 103 ++++++++++
.../rocketmq/store/queue/ConsumeQueueTest.java | 205 +++++++++++++++++++-
.../listener/rmq/concurrent/RMQBlockListener.java | 60 ++++++
.../rocketmq/test/util/MQAdminTestUtils.java | 25 ++-
.../org/apache/rocketmq/test/base/BaseConf.java | 6 +-
.../rocketmq/test/base/IntegrationTestBase.java | 12 ++
.../rocketmq/test/offset/LagCalculationIT.java | 212 +++++++++++++++++++++
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 7 +
.../tools/admin/DefaultMQAdminExtImpl.java | 10 +-
.../apache/rocketmq/tools/admin/MQAdminExt.java | 8 +-
18 files changed, 950 insertions(+), 23 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index b9c0975b0..8e78320f1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -385,6 +385,11 @@ public class BrokerConfig extends BrokerIdentity {
private boolean metricsInDelta = false;
+ /**
+ * Estimate accumulation or not when subscription filter type is tag and
is not SUB_ALL.
+ */
+ private boolean estimateAccumulation = true;
+
public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
@@ -1584,4 +1589,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setTransactionOpBatchInterval(int transactionOpBatchInterval) {
this.transactionOpBatchInterval = transactionOpBatchInterval;
}
+
+ public boolean isEstimateAccumulation() {
+ return estimateAccumulation;
+ }
+
+ public void setEstimateAccumulation(boolean estimateAccumulation) {
+ this.estimateAccumulation = estimateAccumulation;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 5bbf773e4..3530b1c39 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -21,9 +21,9 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
@@ -34,7 +34,6 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.logfile.MappedFile;
-import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
@@ -45,6 +44,7 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
public static final int CQ_STORE_UNIT_SIZE = 20;
+ public static final int MSG_TAG_OFFSET_INDEX = 12;
private static final Logger LOG_ERROR =
LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
private final MessageStore messageStore;
@@ -1001,4 +1001,91 @@ public class ConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCycle {
public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
mappedFileQueue.cleanSwappedMap(forceCleanSwapIntervalMs);
}
+
+ @Override
+ public long estimateMessageCount(long from, long to, MessageFilter filter)
{
+ long physicalOffsetFrom = from * CQ_STORE_UNIT_SIZE;
+ long physicalOffsetTo = to * CQ_STORE_UNIT_SIZE;
+ List<MappedFile> mappedFiles =
mappedFileQueue.range(physicalOffsetFrom, physicalOffsetTo);
+ if (mappedFiles.isEmpty()) {
+ return -1;
+ }
+
+ boolean sample = false;
+ long match = 0;
+ long raw = 0;
+
+ for (MappedFile mappedFile : mappedFiles) {
+ int start = 0;
+ int len = mappedFile.getFileSize();
+
+ // calculate start and len for first segment and last segment to
reduce scanning
+ // first file segment
+ if (mappedFile.getFileFromOffset() <= physicalOffsetFrom) {
+ start = (int) (physicalOffsetFrom -
mappedFile.getFileFromOffset());
+ if (mappedFile.getFileFromOffset() + mappedFile.getFileSize()
>= physicalOffsetTo) {
+ // current mapped file covers search range completely.
+ len = (int) (physicalOffsetTo - physicalOffsetFrom);
+ } else {
+ len = mappedFile.getFileSize() - start;
+ }
+ }
+
+ // last file segment
+ if (0 == start && mappedFile.getFileFromOffset() +
mappedFile.getFileSize() > physicalOffsetTo) {
+ len = (int) (physicalOffsetTo -
mappedFile.getFileFromOffset());
+ }
+
+ // select partial data to scan
+ SelectMappedBufferResult slice =
mappedFile.selectMappedBuffer(start, len);
+ if (null != slice) {
+ try {
+ ByteBuffer buffer = slice.getByteBuffer();
+ int current = 0;
+ while (current < len) {
+ // skip physicalOffset and message length fields.
+ buffer.position(current + MSG_TAG_OFFSET_INDEX);
+ long tagCode = buffer.getLong();
+ ConsumeQueueExt.CqExtUnit ext = null;
+ if (isExtWriteEnable()) {
+ ext = consumeQueueExt.get(tagCode);
+ tagCode = ext.getTagsCode();
+ }
+ if (filter.isMatchedByConsumeQueue(tagCode, ext)) {
+ match++;
+ }
+ raw++;
+ current += CQ_STORE_UNIT_SIZE;
+
+ if (raw >=
messageStore.getMessageStoreConfig().getMaxConsumeQueueScan()) {
+ sample = true;
+ break;
+ }
+
+ if (match >
messageStore.getMessageStoreConfig().getSampleCountThreshold()) {
+ sample = true;
+ break;
+ }
+ }
+ } finally {
+ slice.release();
+ }
+ }
+ // we have scanned enough entries, now is the time to return an
educated guess.
+ if (sample) {
+ break;
+ }
+ }
+
+ long result = match;
+ if (sample) {
+ if (0 == raw) {
+ log.error("[BUG]. Raw should NOT be 0");
+ return 0;
+ }
+ result = (long) (match * (to - from) * 1.0 / raw);
+ }
+ log.debug("Result={}, raw={}, match={}, sample={}", result, raw,
match, sample);
+ return result;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 31c1a2cb4..9b0c38656 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2666,4 +2666,35 @@ public class DefaultMessageStore implements MessageStore
{
public boolean isShutdown() {
return shutdown;
}
+
+ @Override
+ public long estimateMessageCount(String topic, int queueId, long from,
long to, MessageFilter filter) {
+ if (from < 0) {
+ from = 0;
+ }
+
+ if (from >= to) {
+ return 0;
+ }
+
+ if (null == filter) {
+ return to - from;
+ }
+
+ ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId);
+ if (null == consumeQueue) {
+ return 0;
+ }
+
+ // correct the "from" argument to min offset in queue if it is too
small
+ long minOffset = consumeQueue.getMinOffsetInQueue();
+ if (from < minOffset) {
+ long diff = to - from;
+ from = minOffset;
+ to = from + diff;
+ }
+
+ long msgCount = consumeQueue.estimateMessageCount(from, to, filter);
+ return msgCount == -1 ? to - from : msgCount;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index 0fc28ac52..af300c337 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.store;
+import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -26,8 +27,6 @@ import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Stream;
-
-import com.google.common.collect.Lists;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -780,4 +779,26 @@ public class MappedFileQueue implements Swappable {
public String getStorePath() {
return storePath;
}
+
+ public List<MappedFile> range(final long from, final long to) {
+ Object[] mfs = copyMappedFiles(0);
+ if (null == mfs) {
+ return new ArrayList<>();
+ }
+
+ List<MappedFile> result = new ArrayList<>();
+ for (Object mf : mfs) {
+ MappedFile mappedFile = (MappedFile) mf;
+ if (mappedFile.getFileFromOffset() + mappedFile.getFileSize() <=
from) {
+ continue;
+ }
+
+ if (to <= mappedFile.getFileFromOffset()) {
+ break;
+ }
+ result.add(mappedFile);
+ }
+
+ return result;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 95de57cb3..df07a735b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -826,4 +826,15 @@ public interface MessageStore {
*/
boolean isShutdown();
+ /**
+ * Estimate number of messages, within [from, to], which match given filter
+ *
+ * @param topic Topic name
+ * @param queueId Queue ID
+ * @param from Lower boundary of the range, inclusive.
+ * @param to Upper boundary of the range, inclusive.
+ * @param filter The message filter.
+ * @return Estimate number of messages matching given filter.
+ */
+ long estimateMessageCount(String topic, int queueId, long from, long to,
MessageFilter filter);
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index c93d3eea5..91c80e940 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -16,12 +16,11 @@
*/
package org.apache.rocketmq.store.config;
+import java.io.File;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.queue.BatchConsumeQueue;
-import java.io.File;
-
public class MessageStoreConfig {
public static final String MULTI_PATH_SPLITTER =
System.getProperty("rocketmq.broker.multiPathSplitter", ",");
@@ -360,6 +359,16 @@ public class MessageStoreConfig {
private boolean asyncLearner = false;
+ /**
+ * Number of records to scan before starting to estimate.
+ */
+ private int maxConsumeQueueScan = 20_000;
+
+ /**
+ * Number of matched records before starting to estimate.
+ */
+ private int sampleCountThreshold = 5000;
+
public boolean isDebugLockEnable() {
return debugLockEnable;
}
@@ -1565,5 +1574,19 @@ public class MessageStoreConfig {
this.timerMaxDelaySec = timerMaxDelaySec;
}
+ public int getMaxConsumeQueueScan() {
+ return maxConsumeQueueScan;
+ }
+
+ public void setMaxConsumeQueueScan(int maxConsumeQueueScan) {
+ this.maxConsumeQueueScan = maxConsumeQueueScan;
+ }
+ public int getSampleCountThreshold() {
+ return sampleCountThreshold;
+ }
+
+ public void setSampleCountThreshold(int sampleCountThreshold) {
+ this.sampleCountThreshold = sampleCountThreshold;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index ed7d1bd56..8a307b957 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -17,35 +17,36 @@
package org.apache.rocketmq.store.queue;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.MappedFileQueue;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.logfile.MappedFile;
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.function.Function;
-
public class BatchConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
protected static final Logger log =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
//position 8, size 4, tagscode 8, storetime 8, msgBaseOffset 8, batchSize
2, compactedOffset 4, reserved 4
public static final int CQ_STORE_UNIT_SIZE = 46;
+ public static final int MSG_TAG_OFFSET_INDEX = 12;
public static final int MSG_STORE_TIME_OFFSET_INDEX = 20;
public static final int MSG_BASE_OFFSET_INDEX = 28;
public static final int MSG_BATCH_SIZE_INDEX = 36;
@@ -1005,4 +1006,103 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCy
public MappedFileQueue getMappedFileQueue() {
return mappedFileQueue;
}
+
+ @Override
+ public long estimateMessageCount(long from, long to, MessageFilter filter)
{
+ // transfer message offset to physical offset
+ SelectMappedBufferResult firstMappedFileBuffer =
getBatchMsgIndexBuffer(from);
+ if (firstMappedFileBuffer == null) {
+ return -1;
+ }
+ long physicalOffsetFrom = firstMappedFileBuffer.getStartOffset();
+
+ SelectMappedBufferResult lastMappedFileBuffer =
getBatchMsgIndexBuffer(to);
+ if (lastMappedFileBuffer == null) {
+ return -1;
+ }
+ long physicalOffsetTo = lastMappedFileBuffer.getStartOffset();
+
+ List<MappedFile> mappedFiles =
mappedFileQueue.range(physicalOffsetFrom, physicalOffsetTo);
+ if (mappedFiles.isEmpty()) {
+ return -1;
+ }
+
+ boolean sample = false;
+ long match = 0;
+ long matchCqUnitCount = 0;
+ long raw = 0;
+ long scanCqUnitCount = 0;
+
+ for (MappedFile mappedFile : mappedFiles) {
+ int start = 0;
+ int len = mappedFile.getFileSize();
+
+ // calculate start and len for first segment and last segment to
reduce scanning
+ // first file segment
+ if (mappedFile.getFileFromOffset() <= physicalOffsetFrom) {
+ start = (int) (physicalOffsetFrom -
mappedFile.getFileFromOffset());
+ if (mappedFile.getFileFromOffset() + mappedFile.getFileSize()
>= physicalOffsetTo) {
+ // current mapped file covers search range completely.
+ len = (int) (physicalOffsetTo - physicalOffsetFrom);
+ } else {
+ len = mappedFile.getFileSize() - start;
+ }
+ }
+
+ // last file segment
+ if (0 == start && mappedFile.getFileFromOffset() +
mappedFile.getFileSize() > physicalOffsetTo) {
+ len = (int) (physicalOffsetTo -
mappedFile.getFileFromOffset());
+ }
+
+ // select partial data to scan
+ SelectMappedBufferResult slice =
mappedFile.selectMappedBuffer(start, len);
+ if (null != slice) {
+ try {
+ ByteBuffer buffer = slice.getByteBuffer();
+ int current = 0;
+ while (current < len) {
+ // skip physicalOffset and message length fields.
+ buffer.position(current + MSG_TAG_OFFSET_INDEX);
+ long tagCode = buffer.getLong();
+ buffer.position(current + MSG_BATCH_SIZE_INDEX);
+ long batchSize = buffer.getShort();
+ if (filter.isMatchedByConsumeQueue(tagCode, null)) {
+ match += batchSize;
+ matchCqUnitCount++;
+ }
+ raw += batchSize;
+ scanCqUnitCount++;
+ current += CQ_STORE_UNIT_SIZE;
+
+ if (scanCqUnitCount >=
messageStore.getMessageStoreConfig().getMaxConsumeQueueScan()) {
+ sample = true;
+ break;
+ }
+
+ if (matchCqUnitCount >
messageStore.getMessageStoreConfig().getSampleCountThreshold()) {
+ sample = true;
+ break;
+ }
+ }
+ } finally {
+ slice.release();
+ }
+ }
+ // we have scanned enough entries, now is the time to return an
educated guess.
+ if (sample) {
+ break;
+ }
+ }
+
+ long result = match;
+ if (sample) {
+ if (0 == raw) {
+ log.error("[BUG]. Raw should NOT be 0");
+ return 0;
+ }
+ result = (long) (match * (to - from) * 1.0 / raw);
+ }
+ log.debug("Result={}, raw={}, match={}, sample={}", result, raw,
match, sample);
+ return result;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index f36dda094..76242a5e3 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.store.queue;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.MessageFilter;
public interface ConsumeQueueInterface {
/**
@@ -145,4 +146,14 @@ public interface ConsumeQueueInterface {
* @param messageNum message number
*/
void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner,
MessageExtBrokerInner msg, short messageNum);
+
+ /**
+ * Estimate number of records matching given filter.
+ *
+ * @param from Lower boundary, inclusive.
+ * @param to Upper boundary, inclusive.
+ * @param filter Specified filter criteria
+ * @return Number of matching records.
+ */
+ long estimateMessageCount(long from, long to, MessageFilter filter);
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
index 2485ec670..8e8fee278 100644
---
a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
@@ -18,9 +18,11 @@
package org.apache.rocketmq.store.queue;
import java.io.File;
+import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Random;
@@ -34,8 +36,10 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
+import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
+import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
@@ -49,6 +53,8 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
public class BatchConsumeMessageTest extends QueueTestBase {
+ private static final int BATCH_NUM = 10;
+ private static final int TOTAL_MSGS = 200;
private MessageStore messageStore;
@Before
@@ -456,4 +462,101 @@ public class BatchConsumeMessageTest extends
QueueTestBase {
}
}
+ protected void putMsg(String topic) {
+ createTopic(topic, CQType.BatchCQ, messageStore);
+
+ for (int i = 0; i < TOTAL_MSGS; i++) {
+ MessageExtBrokerInner message = buildMessage(topic, BATCH_NUM * (i
% 2 + 1));
+ switch (i % 3) {
+ case 0:
+ message.setTags("TagA");
+ break;
+
+ case 1:
+ message.setTags("TagB");
+ break;
+ }
+ message.setTagsCode(message.getTags().hashCode());
+
message.setPropertiesString(MessageDecoder.messageProperties2String(message.getProperties()));
+ PutMessageResult putMessageResult =
messageStore.putMessage(message);
+ Assert.assertEquals(PutMessageStatus.PUT_OK,
putMessageResult.getPutMessageStatus());
+ }
+
+ await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+ }
+
+ @Test
+ public void testEstimateMessageCountInEmptyConsumeQueue() {
+ String topic = UUID.randomUUID().toString();
+ ConsumeQueueInterface consumeQueue =
messageStore.findConsumeQueue(topic, 0);
+ MessageFilter filter = new MessageFilter() {
+ @Override
+ public boolean isMatchedByConsumeQueue(Long tagsCode,
ConsumeQueueExt.CqExtUnit cqExtUnit) {
+ return tagsCode == "TagA".hashCode();
+ }
+
+ @Override
+ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer,
Map<String, String> properties) {
+ return false;
+ }
+ };
+ long estimation = consumeQueue.estimateMessageCount(0, 0, filter);
+ Assert.assertEquals(-1, estimation);
+
+ // test for illegal offset
+ estimation = consumeQueue.estimateMessageCount(0, 100, filter);
+ Assert.assertEquals(-1, estimation);
+ estimation = consumeQueue.estimateMessageCount(100, 1000, filter);
+ Assert.assertEquals(-1, estimation);
+ }
+
+ @Test
+ public void testEstimateMessageCount() {
+ String topic = UUID.randomUUID().toString();
+ putMsg(topic);
+ ConsumeQueueInterface cq = messageStore.findConsumeQueue(topic, 0);
+ MessageFilter filter = new MessageFilter() {
+ @Override
+ public boolean isMatchedByConsumeQueue(Long tagsCode,
ConsumeQueueExt.CqExtUnit cqExtUnit) {
+ return tagsCode == "TagA".hashCode();
+ }
+
+ @Override
+ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer,
Map<String, String> properties) {
+ return false;
+ }
+ };
+ long estimation = cq.estimateMessageCount(0, 2999, filter);
+ Assert.assertEquals(1000, estimation);
+
+ // test for illegal offset
+ estimation = cq.estimateMessageCount(0, Long.MAX_VALUE, filter);
+ Assert.assertEquals(-1, estimation);
+ estimation = cq.estimateMessageCount(100000, 1000000, filter);
+ Assert.assertEquals(-1, estimation);
+ estimation = cq.estimateMessageCount(100, 0, filter);
+ Assert.assertEquals(-1, estimation);
+ }
+
+ @Test
+ public void testEstimateMessageCountSample() {
+ String topic = UUID.randomUUID().toString();
+ putMsg(topic);
+ messageStore.getMessageStoreConfig().setSampleCountThreshold(10);
+ messageStore.getMessageStoreConfig().setMaxConsumeQueueScan(20);
+ ConsumeQueueInterface cq = messageStore.findConsumeQueue(topic, 0);
+ MessageFilter filter = new MessageFilter() {
+ @Override
+ public boolean isMatchedByConsumeQueue(Long tagsCode,
ConsumeQueueExt.CqExtUnit cqExtUnit) {
+ return tagsCode == "TagA".hashCode();
+ }
+
+ @Override
+ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer,
Map<String, String> properties) {
+ return false;
+ }
+ };
+ long estimation = cq.estimateMessageCount(1000, 2000, filter);
+ Assert.assertEquals(300, estimation);
+ }
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
index f2742015c..6a8bfc5bc 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
@@ -16,17 +16,98 @@
*/
package org.apache.rocketmq.store.queue;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.ConsumeQueueExt;
+import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Assert;
import org.junit.Test;
-import java.util.UUID;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import static org.awaitility.Awaitility.await;
public class ConsumeQueueTest extends QueueTestBase {
+ private static final String TOPIC = "StoreTest";
+ private static final int QUEUE_ID = 0;
+ private static final String STORE_PATH = "." + File.separator +
"unit_test_store";
+ private static final int COMMIT_LOG_FILE_SIZE = 1024 * 8;
+ private static final int CQ_FILE_SIZE = 10 * 20;
+ private static final int CQ_EXT_FILE_SIZE = 10 *
(ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE + 64);
+
+ public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int
cqFileSize,
+ boolean enableCqExt, int cqExtFileSize) {
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setMappedFileSizeCommitLog(commitLogFileSize);
+ messageStoreConfig.setMappedFileSizeConsumeQueue(cqFileSize);
+ messageStoreConfig.setMappedFileSizeConsumeQueueExt(cqExtFileSize);
+ messageStoreConfig.setMessageIndexEnable(false);
+ messageStoreConfig.setEnableConsumeQueueExt(enableCqExt);
+
+ messageStoreConfig.setStorePathRootDir(STORE_PATH);
+ messageStoreConfig.setStorePathCommitLog(STORE_PATH + File.separator +
"commitlog");
+
+ return messageStoreConfig;
+ }
+
+ protected DefaultMessageStore gen() throws Exception {
+ MessageStoreConfig messageStoreConfig = buildStoreConfig(
+ COMMIT_LOG_FILE_SIZE, CQ_FILE_SIZE, true, CQ_EXT_FILE_SIZE
+ );
+
+ BrokerConfig brokerConfig = new BrokerConfig();
+
+ DefaultMessageStore master = new DefaultMessageStore(
+ messageStoreConfig, new BrokerStatsManager(brokerConfig),
+ (topic, queueId, logicOffset, tagsCode, msgStoreTime,
filterBitMap, properties) -> {
+ }, brokerConfig);
+
+ assertThat(master.load()).isTrue();
+
+ master.start();
+
+ return master;
+ }
+
+ protected void putMsg(DefaultMessageStore messageStore) throws Exception {
+ int totalMsgs = 200;
+ for (int i = 0; i < totalMsgs; i++) {
+ MessageExtBrokerInner message = buildMessage();
+ message.setQueueId(0);
+ switch (i % 3) {
+ case 0:
+ message.setTags("TagA");
+ break;
+
+ case 1:
+ message.setTags("TagB");
+ break;
+
+ case 2:
+ message.setTags("TagC");
+ break;
+ }
+ message.setTagsCode(message.getTags().hashCode());
+
message.setPropertiesString(MessageDecoder.messageProperties2String(message.getProperties()));
+ messageStore.putMessage(message);
+ }
+ await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+ }
+
@Test
public void testIterator() throws Exception {
final int msgNum = 100;
@@ -99,4 +180,126 @@ public class ConsumeQueueTest extends QueueTestBase {
}
messageStore.getQueueStore().destroy(consumeQueue);
}
+
+ @Test
+ public void testEstimateMessageCountInEmptyConsumeQueue() {
+ DefaultMessageStore master = null;
+ try {
+ master = gen();
+ ConsumeQueueInterface consumeQueue =
master.findConsumeQueue(TOPIC, QUEUE_ID);
+ MessageFilter filter = new MessageFilter() {
+ @Override
+ public boolean isMatchedByConsumeQueue(Long tagsCode,
ConsumeQueueExt.CqExtUnit cqExtUnit) {
+ return tagsCode == "TagA".hashCode();
+ }
+
+ @Override
+ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer,
Map<String, String> properties) {
+ return false;
+ }
+ };
+ long estimation = consumeQueue.estimateMessageCount(0, 0, filter);
+ Assert.assertEquals(-1, estimation);
+
+ // test for illegal offset
+ estimation = consumeQueue.estimateMessageCount(0, 100, filter);
+ Assert.assertEquals(-1, estimation);
+ estimation = consumeQueue.estimateMessageCount(100, 1000, filter);
+ Assert.assertEquals(-1, estimation);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ } finally {
+ if (master != null) {
+ master.shutdown();
+ master.destroy();
+ }
+ UtilAll.deleteFile(new File(STORE_PATH));
+ }
+ }
+
+ @Test
+ public void testEstimateMessageCount() {
+ DefaultMessageStore messageStore = null;
+ try {
+ messageStore = gen();
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+
+ try {
+ try {
+ putMsg(messageStore);
+ } catch (Exception e) {
+ fail("Failed to put message", e);
+ }
+
+ ConsumeQueueInterface cq = messageStore.findConsumeQueue(TOPIC,
QUEUE_ID);
+ MessageFilter filter = new MessageFilter() {
+ @Override
+ public boolean isMatchedByConsumeQueue(Long tagsCode,
ConsumeQueueExt.CqExtUnit cqExtUnit) {
+ return tagsCode == "TagA".hashCode();
+ }
+
+ @Override
+ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer,
Map<String, String> properties) {
+ return false;
+ }
+ };
+ long estimation = cq.estimateMessageCount(0, 199, filter);
+ Assert.assertEquals(67, estimation);
+
+ // test for illegal offset
+ estimation = cq.estimateMessageCount(0, 1000, filter);
+ Assert.assertEquals(67, estimation);
+ estimation = cq.estimateMessageCount(1000, 10000, filter);
+ Assert.assertEquals(-1, estimation);
+ estimation = cq.estimateMessageCount(100, 0, filter);
+ Assert.assertEquals(-1, estimation);
+ } finally {
+ messageStore.shutdown();
+ messageStore.destroy();
+ UtilAll.deleteFile(new File(STORE_PATH));
+ }
+ }
+
+ @Test
+ public void testEstimateMessageCountSample() {
+ DefaultMessageStore messageStore = null;
+ try {
+ messageStore = gen();
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+
+ try {
+ try {
+ putMsg(messageStore);
+ } catch (Exception e) {
+ fail("Failed to put message", e);
+ }
+ messageStore.getMessageStoreConfig().setSampleCountThreshold(10);
+ messageStore.getMessageStoreConfig().setMaxConsumeQueueScan(20);
+ ConsumeQueueInterface cq = messageStore.findConsumeQueue(TOPIC,
QUEUE_ID);
+ MessageFilter filter = new MessageFilter() {
+ @Override
+ public boolean isMatchedByConsumeQueue(Long tagsCode,
ConsumeQueueExt.CqExtUnit cqExtUnit) {
+ return tagsCode == "TagA".hashCode();
+ }
+
+ @Override
+ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer,
Map<String, String> properties) {
+ return false;
+ }
+ };
+ long estimation = cq.estimateMessageCount(100, 150, filter);
+ Assert.assertEquals(15, estimation);
+ } finally {
+ messageStore.shutdown();
+ messageStore.destroy();
+ UtilAll.deleteFile(new File(STORE_PATH));
+ }
+ }
}
diff --git
a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQBlockListener.java
b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQBlockListener.java
new file mode 100644
index 000000000..907612cce
--- /dev/null
+++
b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQBlockListener.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.listener.rmq.concurrent;
+
+import java.util.List;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class RMQBlockListener extends RMQNormalListener {
+ private volatile boolean block = true;
+ private volatile boolean inBlock = true;
+
+ public RMQBlockListener() {
+ super();
+ }
+
+ public RMQBlockListener(boolean block) {
+ super();
+ this.block = block;
+ }
+
+ public boolean isBlocked() {
+ return inBlock;
+ }
+
+ public void setBlock(boolean block) {
+ this.block = block;
+ }
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
+ ConsumeConcurrentlyStatus status = super.consumeMessage(msgs, context);
+
+ try {
+ while (block) {
+ inBlock = true;
+ Thread.sleep(100);
+ }
+ } catch (InterruptedException ignore) {
+ }
+
+ return status;
+ }
+}
diff --git
a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 129fe8f9a..554289d01 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -31,7 +31,10 @@ import
org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
@@ -41,8 +44,6 @@ import
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
import
org.apache.rocketmq.remoting.protocol.statictopic.TopicRemappingDetailWrapper;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.rpc.ClientMetadata;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminUtils;
@@ -56,6 +57,18 @@ import static org.awaitility.Awaitility.await;
public class MQAdminTestUtils {
private static Logger log =
LoggerFactory.getLogger(MQAdminTestUtils.class);
+ private static DefaultMQAdminExt mqAdminExt;
+
+ public static void startAdmin(String nameSrvAddr) throws MQClientException
{
+ mqAdminExt = new DefaultMQAdminExt();
+ mqAdminExt.setNamesrvAddr(nameSrvAddr);
+ mqAdminExt.start();
+ }
+
+ public static void shutdownAdmin() {
+ mqAdminExt.shutdown();
+ }
+
public static boolean createTopic(String nameSrvAddr, String clusterName,
String topic,
int queueNum, Map<String, String>
attributes) {
int defaultWaitTime = 30;
@@ -298,4 +311,12 @@ public class MQAdminTestUtils {
cmd.execute(commandLine, options, null);
}
+ public static ConsumeStats examineConsumeStats(String brokerAddr, String
topic, String group) {
+ ConsumeStats consumeStats = null;
+ try {
+ consumeStats = mqAdminExt.examineConsumeStats(brokerAddr, group,
topic, Long.MAX_VALUE);
+ } catch (Exception ignored) {
+ }
+ return consumeStats;
+ }
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index d1b89e914..d27139195 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -35,11 +35,11 @@ import
org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -55,6 +55,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.junit.Assert;
+import static org.apache.rocketmq.test.base.IntegrationTestBase.initMQAdmin;
import static org.awaitility.Awaitility.await;
public class BaseConf {
@@ -109,6 +110,7 @@ public class BaseConf {
brokerControllerList = ImmutableList.of(brokerController1,
brokerController2, brokerController3);
brokerControllerMap = brokerControllerList.stream().collect(
Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(),
Function.identity()));
+ initMQAdmin(NAMESRV_ADDR);
}
public BaseConf() {
diff --git
a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index a2b9b95ae..221793692 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.UtilAll;
@@ -84,6 +85,7 @@ public class IntegrationTestBase {
for (File file : TMPE_FILES) {
UtilAll.deleteFile(file);
}
+ MQAdminTestUtils.shutdownAdmin();
} catch (Exception e) {
logger.error("Shutdown error", e);
}
@@ -133,6 +135,8 @@ public class IntegrationTestBase {
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(nsAddr);
brokerConfig.setEnablePropertyFilter(true);
+ brokerConfig.setEnableCalcFilterBitMap(true);
+ storeConfig.setEnableConsumeQueueExt(true);
brokerConfig.setLoadBalancePollNameServerInterval(500);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
@@ -195,4 +199,12 @@ public class IntegrationTestBase {
UtilAll.deleteFile(file);
}
+ public static void initMQAdmin(String nsAddr) {
+ try {
+ MQAdminTestUtils.startAdmin(nsAddr);
+ } catch (MQClientException e) {
+ logger.info("MQAdmin start failed");
+ System.exit(1);
+ }
+ }
}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java
b/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java
new file mode 100644
index 000000000..810118b3e
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/offset/LagCalculationIT.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.offset;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.filter.ConsumerFilterData;
+import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
+import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
+import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.store.DefaultMessageFilter;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
+import org.apache.rocketmq.test.factory.ConsumerFactory;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQBlockListener;
+import org.apache.rocketmq.test.message.MessageQueueMsg;
+import org.apache.rocketmq.test.util.MQAdminTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class LagCalculationIT extends BaseConf {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LagCalculationIT.class);
+ private RMQNormalProducer producer = null;
+ private RMQNormalConsumer consumer = null;
+ private String topic = null;
+ private RMQBlockListener blockListener = null;
+
+ @Before
+ public void setUp() {
+ topic = initTopic();
+ LOGGER.info(String.format("use topic: %s;", topic));
+ for (BrokerController controller : brokerControllerList) {
+ controller.getBrokerConfig().setLongPollingEnable(false);
+ controller.getBrokerConfig().setShortPollingTimeMills(500);
+ controller.getBrokerConfig().setEstimateAccumulation(true);
+ }
+ producer = getProducer(NAMESRV_ADDR, topic);
+ blockListener = new RMQBlockListener(false);
+ consumer = getConsumer(NAMESRV_ADDR, topic, "*", blockListener);
+ }
+
+ @After
+ public void tearDown() {
+ shutdown();
+ }
+
+ private Pair<Long, Long> getLag(List<MessageQueue> mqs) {
+ long lag = 0;
+ long pullLag = 0;
+ for (BrokerController controller : brokerControllerList) {
+ ConsumeStats consumeStats =
MQAdminTestUtils.examineConsumeStats(controller.getBrokerAddr(), topic,
consumer.getConsumerGroup());
+ Map<MessageQueue, OffsetWrapper> offsetTable =
consumeStats.getOffsetTable();
+ for (MessageQueue mq : mqs) {
+ if
(mq.getBrokerName().equals(controller.getBrokerConfig().getBrokerName())) {
+ long brokerOffset =
controller.getMessageStore().getMaxOffsetInQueue(topic, mq.getQueueId());
+
+ long consumerOffset =
controller.getConsumerOffsetManager().queryOffset(consumer.getConsumerGroup(),
+ topic, mq.getQueueId());
+ long pullOffset =
+
controller.getConsumerOffsetManager().queryPullOffset(consumer.getConsumerGroup(),
+ topic, mq.getQueueId());
+ OffsetWrapper offsetWrapper = offsetTable.get(mq);
+ assertEquals(brokerOffset,
offsetWrapper.getBrokerOffset());
+ assertEquals(consumerOffset,
offsetWrapper.getConsumerOffset());
+ assertEquals(pullOffset, offsetWrapper.getPullOffset());
+ lag += brokerOffset - consumerOffset;
+ pullLag += brokerOffset - pullOffset;
+ }
+ }
+ }
+ return new Pair<>(lag, pullLag);
+ }
+
+ @Test
+ public void testCalculateLag() throws InterruptedException {
+ int msgSize = 10;
+ List<MessageQueue> mqs = producer.getMessageQueue();
+ MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
+
+ producer.send(mqMsgs.getMsgsWithMQ());
+ consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(),
CONSUME_TIME);
+ // wait for updating offset
+ Thread.sleep(5 * 1000);
+
+ Pair<Long, Long> pair = getLag(mqs);
+ assertEquals(0, (long) pair.getObject1());
+ assertEquals(0, (long) pair.getObject2());
+
+ blockListener.setBlock(true);
+ consumer.clearMsg();
+ producer.clearMsg();
+ producer.send(mqMsgs.getMsgsWithMQ());
+ // wait for updating offset
+ Thread.sleep(5 * 1000);
+
+ pair = getLag(mqs);
+ assertEquals(producer.getAllMsgBody().size(), (long)
pair.getObject1());
+ assertEquals(0, (long) pair.getObject2());
+
+ blockListener.setBlock(false);
+ consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(),
CONSUME_TIME);
+ consumer.shutdown();
+ producer.clearMsg();
+ producer.send(mqMsgs.getMsgsWithMQ());
+ // wait for updating offset
+ Thread.sleep(5 * 1000);
+
+ pair = getLag(mqs);
+ assertEquals(producer.getAllMsgBody().size(), (long)
pair.getObject1());
+ assertEquals(producer.getAllMsgBody().size(), (long)
pair.getObject2());
+ }
+
+ @Test
+ public void testEstimateLag() throws Exception {
+ int msgNoTagSize = 80;
+ int msgWithTagSize = 20;
+ int repeat = 2;
+ String tag = "TAG_FOR_TEST_ESTIMATE";
+ String sql = "TAGS = 'TAG_FOR_TEST_ESTIMATE' And value < " + repeat /
2;
+ MessageSelector selector = MessageSelector.bySql(sql);
+ RMQBlockListener sqlListener = new RMQBlockListener(true);
+ RMQSqlConsumer sqlConsumer =
ConsumerFactory.getRMQSqlConsumer(NAMESRV_ADDR, initConsumerGroup(), topic,
selector, sqlListener);
+ RMQBlockListener tagListener = new RMQBlockListener(true);
+ RMQNormalConsumer tagConsumer = getConsumer(NAMESRV_ADDR, topic, tag,
tagListener);
+ // wait for building filter data
+ await().atMost(5, TimeUnit.SECONDS).until(() ->
sqlListener.isBlocked() && tagListener.isBlocked());
+
+ List<MessageQueue> mqs = producer.getMessageQueue();
+ for (int i = 0; i < repeat; i++) {
+ MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgNoTagSize);
+ Map<MessageQueue, List<Object>> msgMap = mqMsgs.getMsgsWithMQ();
+ mqMsgs = new MessageQueueMsg(mqs, msgWithTagSize, tag);
+ Map<MessageQueue, List<Object>> msgWithTagMap =
mqMsgs.getMsgsWithMQ();
+ int finalI = i;
+ msgMap.forEach((mq, msgList) -> {
+ List<Object> msgWithTagList = msgWithTagMap.get(mq);
+ for (Object o : msgWithTagList) {
+ ((Message) o).putUserProperty("value",
String.valueOf(finalI));
+ }
+ msgList.addAll(msgWithTagList);
+ Collections.shuffle(msgList);
+ });
+ producer.send(msgMap);
+ }
+
+ // test lag estimation for tag consumer
+ for (BrokerController controller : brokerControllerList) {
+ for (MessageQueue mq : mqs) {
+ if
(mq.getBrokerName().equals(controller.getBrokerConfig().getBrokerName())) {
+ long brokerOffset =
controller.getMessageStore().getMaxOffsetInQueue(topic, mq.getQueueId());
+ long estimateMessageCount = controller.getMessageStore()
+ .estimateMessageCount(topic, mq.getQueueId(), 0,
brokerOffset,
+ new
DefaultMessageFilter(FilterAPI.buildSubscriptionData(topic, tag)));
+ assertEquals(repeat * msgWithTagSize,
estimateMessageCount);
+ }
+ }
+ }
+
+ // test lag estimation for sql consumer
+ for (BrokerController controller : brokerControllerList) {
+ for (MessageQueue mq : mqs) {
+ if
(mq.getBrokerName().equals(controller.getBrokerConfig().getBrokerName())) {
+ long brokerOffset =
controller.getMessageStore().getMaxOffsetInQueue(topic, mq.getQueueId());
+ SubscriptionData subscriptionData =
controller.getConsumerManager().findSubscriptionData(sqlConsumer.getConsumerGroup(),
topic);
+ ConsumerFilterData consumerFilterData =
controller.getConsumerFilterManager().get(topic,
sqlConsumer.getConsumerGroup());
+ long estimateMessageCount = controller.getMessageStore()
+ .estimateMessageCount(topic, mq.getQueueId(), 0,
brokerOffset,
+ new ExpressionMessageFilter(subscriptionData,
consumerFilterData, controller.getConsumerFilterManager()));
+ assertEquals(repeat / 2 * msgWithTagSize,
estimateMessageCount);
+ }
+ }
+ }
+
+ sqlConsumer.shutdown();
+ tagConsumer.shutdown();
+ }
+}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 9f15ccaff..7bc308036 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -296,6 +296,13 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic);
}
+ @Override
+ public ConsumeStats examineConsumeStats(final String brokerAddr, final
String consumerGroup,
+ final String topicName, final long timeoutMillis)
+ throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ return this.defaultMQAdminExtImpl.examineConsumeStats(brokerAddr,
consumerGroup, topicName, timeoutMillis);
+ }
+
@Override
public AdminToolResult<ConsumeStats> examineConsumeStatsConcurrent(String
consumerGroup, String topic) {
return
defaultMQAdminExtImpl.examineConsumeStatsConcurrent(consumerGroup, topic);
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 5f3bcbd38..0460ed95b 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -62,6 +62,8 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -106,8 +108,6 @@ import
org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapp
import
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.tools.admin.api.TrackType;
@@ -493,6 +493,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
return staticResult;
}
+ @Override
+ public ConsumeStats examineConsumeStats(String brokerAddr, String
consumerGroup, String topicName,
+ long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException {
+ return
this.mqClientInstance.getMQClientAPIImpl().getConsumeStats(brokerAddr,
consumerGroup, topicName, timeoutMillis);
+ }
+
@Override
public AdminToolResult<ConsumeStats> examineConsumeStatsConcurrent(final
String consumerGroup, final String topic) {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index efe8c0342..ebf878f32 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -145,6 +145,10 @@ public interface MQAdminExt extends MQAdmin {
final String topic) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException;
+ ConsumeStats examineConsumeStats(final String brokerAddr, final String
consumerGroup, final String topicName,
+ final long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException;
+
AdminToolResult<ConsumeStats> examineConsumeStatsConcurrent(String
consumerGroup, String topic);
ClusterInfo examineBrokerClusterInfo() throws InterruptedException,
MQBrokerException, RemotingTimeoutException,
@@ -462,9 +466,9 @@ public interface MQAdminExt extends MQAdmin {
/**
* clean controller broker meta data
- *
*/
void cleanControllerBrokerData(String controllerAddr, String clusterName,
String brokerName,
- String brokerAddr, boolean isCleanLivingBroker) throws
RemotingException, InterruptedException, MQBrokerException;
+ String brokerAddr,
+ boolean isCleanLivingBroker) throws RemotingException,
InterruptedException, MQBrokerException;
}