This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2fa7513a2b [ISSUE #8764] Implement consume lag estimation in cq
rocksdb store (#8800)
2fa7513a2b is described below
commit 2fa7513a2bcdddd1583b7e293b2f06bd350691e0
Author: LetLetMe <[email protected]>
AuthorDate: Tue Oct 15 14:14:31 2024 +0800
[ISSUE #8764] Implement consume lag estimation in cq rocksdb store (#8800)
---
.../apache/rocketmq/store/RocksDBMessageStore.java | 6 --
.../rocketmq/store/queue/RocksDBConsumeQueue.java | 41 +++++++++-
.../rocketmq/store/queue/ConsumeQueueTest.java | 92 +++++++++++++++++++++-
3 files changed, 128 insertions(+), 11 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
index 90df7aed59..21f8d45c9d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
@@ -168,12 +168,6 @@ public class RocksDBMessageStore extends
DefaultMessageStore {
}
}
- @Override
- public long estimateMessageCount(String topic, int queueId, long from,
long to, MessageFilter filter) {
- // todo
- return 0;
- }
-
@Override
public void initMetrics(Meter meter, Supplier<AttributesBuilder>
attributesBuilderSupplier) {
DefaultStoreMetricsManager.init(meter, attributesBuilderSupplier,
this);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
index 2363c2896e..83ba7bebad 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
@@ -222,10 +222,47 @@ public class RocksDBConsumeQueue implements
ConsumeQueueInterface {
@Override
public long estimateMessageCount(long from, long to, MessageFilter filter)
{
- // todo
- return 0;
+ // Check from and to offset validity
+ Pair<CqUnit, Long> fromUnit = getCqUnitAndStoreTime(from);
+ if (fromUnit == null) {
+ return -1;
+ }
+
+ if (from >= to) {
+ return -1;
+ }
+
+ if (to > getMaxOffsetInQueue()) {
+ to = getMaxOffsetInQueue();
+ }
+
+ int maxSampleSize =
messageStore.getMessageStoreConfig().getMaxConsumeQueueScan();
+ int sampleSize = to - from > maxSampleSize ? maxSampleSize : (int) (to
- from);
+
+ int matchThreshold =
messageStore.getMessageStoreConfig().getSampleCountThreshold();
+ int matchSize = 0;
+
+ for (int i = 0; i < sampleSize; i++) {
+ long index = from + i;
+ Pair<CqUnit, Long> pair = getCqUnitAndStoreTime(index);
+ if (pair == null) {
+ continue;
+ }
+ CqUnit cqUnit = pair.getObject1();
+ if (filter.isMatchedByConsumeQueue(cqUnit.getTagsCode(),
cqUnit.getCqExtUnit())) {
+ matchSize++;
+ // if matchSize is plenty, early exit estimate
+ if (matchSize > matchThreshold) {
+ sampleSize = i;
+ break;
+ }
+ }
+ }
+ // Make sure the second half is a floating point number, otherwise it
will be truncated to 0
+ return sampleSize == 0 ? 0 : (long) ((to - from) * (matchSize /
(sampleSize * 1.0)));
}
+
@Override
public long getMinOffsetInQueue() {
return this.messageStore.getMinOffsetInQueue(this.topic, this.queueId);
diff --git
a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
index c3c8be52dd..bf3b1eeca8 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -31,6 +32,7 @@ import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.RocksDBMessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Assert;
@@ -84,7 +86,26 @@ public class ConsumeQueueTest extends QueueTestBase {
return master;
}
- protected void putMsg(DefaultMessageStore messageStore) throws Exception {
+ protected RocksDBMessageStore genRocksdbMessageStore() throws Exception {
+ MessageStoreConfig messageStoreConfig = buildStoreConfig(
+ COMMIT_LOG_FILE_SIZE, CQ_FILE_SIZE, true, CQ_EXT_FILE_SIZE
+ );
+
+ BrokerConfig brokerConfig = new BrokerConfig();
+
+ RocksDBMessageStore master = new RocksDBMessageStore(
+ messageStoreConfig, new BrokerStatsManager(brokerConfig),
+ (topic, queueId, logicOffset, tagsCode, msgStoreTime,
filterBitMap, properties) -> {
+ }, brokerConfig, new ConcurrentHashMap<>());
+
+ assertThat(master.load()).isTrue();
+
+ master.start();
+
+ return master;
+ }
+
+ protected void putMsg(MessageStore messageStore) {
int totalMsgs = 200;
for (int i = 0; i < totalMsgs; i++) {
MessageExtBrokerInner message = buildMessage();
@@ -184,9 +205,33 @@ public class ConsumeQueueTest extends QueueTestBase {
@Test
public void testEstimateMessageCountInEmptyConsumeQueue() {
- DefaultMessageStore master = null;
+ DefaultMessageStore messageStore = null;
+ try {
+ messageStore = gen();
+ doTestEstimateMessageCountInEmptyConsumeQueue(messageStore);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+ }
+
+ @Test
+ public void testEstimateRocksdbMessageCountInEmptyConsumeQueue() {
+ if (notExecuted()) {
+ return;
+ }
+ DefaultMessageStore messageStore = null;
+ try {
+ messageStore = genRocksdbMessageStore();
+ doTestEstimateMessageCountInEmptyConsumeQueue(messageStore);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+ }
+
+ public void doTestEstimateMessageCountInEmptyConsumeQueue(MessageStore
master) {
try {
- master = gen();
ConsumeQueueInterface consumeQueue =
master.findConsumeQueue(TOPIC, QUEUE_ID);
MessageFilter filter = new MessageFilter() {
@Override
@@ -219,16 +264,34 @@ public class ConsumeQueueTest extends QueueTestBase {
}
}
+ @Test
+ public void testEstimateRocksdbMessageCount() {
+ if (notExecuted()) {
+ return;
+ }
+ DefaultMessageStore messageStore = null;
+ try {
+ messageStore = genRocksdbMessageStore();
+ doTestEstimateMessageCount(messageStore);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+ }
+
@Test
public void testEstimateMessageCount() {
DefaultMessageStore messageStore = null;
try {
messageStore = gen();
+ doTestEstimateMessageCount(messageStore);
} catch (Exception e) {
e.printStackTrace();
assertThat(Boolean.FALSE).isTrue();
}
+ }
+ public void doTestEstimateMessageCount(MessageStore messageStore) {
try {
try {
putMsg(messageStore);
@@ -265,15 +328,34 @@ public class ConsumeQueueTest extends QueueTestBase {
}
}
+ @Test
+ public void testEstimateRocksdbMessageCountSample() {
+ if (notExecuted()) {
+ return;
+ }
+ DefaultMessageStore messageStore = null;
+ try {
+ messageStore = genRocksdbMessageStore();
+ doTestEstimateMessageCountSample(messageStore);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+ }
+
@Test
public void testEstimateMessageCountSample() {
DefaultMessageStore messageStore = null;
try {
messageStore = gen();
+ doTestEstimateMessageCountSample(messageStore);
} catch (Exception e) {
e.printStackTrace();
assertThat(Boolean.FALSE).isTrue();
}
+ }
+
+ public void doTestEstimateMessageCountSample(MessageStore messageStore) {
try {
try {
@@ -303,4 +385,8 @@ public class ConsumeQueueTest extends QueueTestBase {
UtilAll.deleteFile(new File(STORE_PATH));
}
}
+
+ private boolean notExecuted() {
+ return MixAll.isMac();
+ }
}