This is an automated email from the ASF dual-hosted git repository.
ShannonDing pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e12d755a77 [ISSUE #10344] Parameterize RocksDB CQ size amplification
e12d755a77 is described below
commit e12d755a7728ec140df7466f4b76882a2945f5ea
Author: rongtong <[email protected]>
AuthorDate: Mon Jun 1 10:47:24 2026 +0800
[ISSUE #10344] Parameterize RocksDB CQ size amplification
---
.../rocketmq/store/config/MessageStoreConfig.java | 10 ++++++++
.../store/rocksdb/RocksDBOptionsFactory.java | 14 +++++------
.../store/rocksdb/RocksDBOptionsFactoryTest.java | 28 ++++++++++++++++++++++
3 files changed, 45 insertions(+), 7 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 9f9f4ac3df..fadc957e9d 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -505,6 +505,8 @@ public class MessageStoreConfig {
private String rocksdbCompressionType =
CompressionType.LZ4_COMPRESSION.getLibraryName();
+ private int rocksdbMaxSizeAmplificationPercent = 25;
+
private long popRocksdbBlockCacheSize = 256 * SizeUnit.MB;
private long popRocksdbWriteBufferSize = 32 * SizeUnit.MB;
@@ -535,6 +537,14 @@ public class MessageStoreConfig {
this.rocksdbCompressionType = compressionType;
}
+ public int getRocksdbMaxSizeAmplificationPercent() {
+ return rocksdbMaxSizeAmplificationPercent;
+ }
+
+ public void setRocksdbMaxSizeAmplificationPercent(int
rocksdbMaxSizeAmplificationPercent) {
+ this.rocksdbMaxSizeAmplificationPercent =
rocksdbMaxSizeAmplificationPercent;
+ }
+
public long getPopRocksdbBlockCacheSize() {
return popRocksdbBlockCacheSize;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
index 37eec67d35..b6d693219d 100644
---
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
+++
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.store.rocksdb;
import org.apache.rocketmq.common.config.ConfigHelper;
import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyOptions;
@@ -57,19 +58,18 @@ public class RocksDBOptionsFactory {
setBlockCache(new LRUCache(1024 * SizeUnit.MB, 8, false)).
setWholeKeyFiltering(true);
+ MessageStoreConfig messageStoreConfig =
messageStore.getMessageStoreConfig();
ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions();
- CompactionOptionsUniversal compactionOption = new
CompactionOptionsUniversal();
- compactionOption.setSizeRatio(100).
- setMaxSizeAmplificationPercent(25).
+ CompactionOptionsUniversal compactionOption = new
CompactionOptionsUniversal().
+ setSizeRatio(100).
+
setMaxSizeAmplificationPercent(messageStoreConfig.getRocksdbMaxSizeAmplificationPercent()).
setAllowTrivialMove(true).
setMinMergeWidth(2).
setMaxMergeWidth(Integer.MAX_VALUE).
setStopStyle(CompactionStopStyle.CompactionStopStyleTotalSize).
setCompressionSizePercent(-1);
- String bottomMostCompressionTypeOpt =
messageStore.getMessageStoreConfig()
- .getBottomMostCompressionTypeForConsumeQueueStore();
- String compressionTypeOpt = messageStore.getMessageStoreConfig()
- .getRocksdbCompressionType();
+ String bottomMostCompressionTypeOpt =
messageStoreConfig.getBottomMostCompressionTypeForConsumeQueueStore();
+ String compressionTypeOpt =
messageStoreConfig.getRocksdbCompressionType();
CompressionType bottomMostCompressionType =
CompressionType.getCompressionType(bottomMostCompressionTypeOpt);
CompressionType compressionType =
CompressionType.getCompressionType(compressionTypeOpt);
return columnFamilyOptions.setMaxWriteBufferNumber(4).
diff --git
a/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java
b/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java
index 1d7273968f..ef285cd999 100644
---
a/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java
@@ -17,13 +17,26 @@
package org.apache.rocketmq.store.rocksdb;
+import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionOptionsUniversal;
import org.rocksdb.CompressionType;
+import org.rocksdb.RocksDB;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class RocksDBOptionsFactoryTest {
+ @BeforeClass
+ public static void loadRocksDB() {
+ RocksDB.loadLibrary();
+ }
+
@Test
public void testBottomMostCompressionType() {
MessageStoreConfig config = new MessageStoreConfig();
@@ -31,4 +44,19 @@ public class RocksDBOptionsFactoryTest {
CompressionType.getCompressionType(config.getBottomMostCompressionTypeForConsumeQueueStore()));
Assert.assertEquals(CompressionType.LZ4_COMPRESSION,
CompressionType.getCompressionType("lz4"));
}
+
+ @Test
+ public void
testConsumeQueueUniversalCompactionMaxSizeAmplificationPercent() {
+ MessageStoreConfig config = new MessageStoreConfig();
+ config.setRocksdbMaxSizeAmplificationPercent(50);
+ MessageStore messageStore = mock(MessageStore.class);
+ when(messageStore.getMessageStoreConfig()).thenReturn(config);
+
+ ConsumeQueueCompactionFilterFactory compactionFilterFactory = new
ConsumeQueueCompactionFilterFactory(() -> 0);
+ try (ColumnFamilyOptions options =
RocksDBOptionsFactory.createCQCFOptions(messageStore, compactionFilterFactory);
+ CompactionOptionsUniversal compactionOptions =
options.compactionOptionsUniversal()) {
+ Assert.assertEquals(50,
compactionOptions.maxSizeAmplificationPercent());
+ }
+ }
+
}