This is an automated email from the ASF dual-hosted git repository.

ShannonDing pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e12d755a77 [ISSUE #10344] Parameterize RocksDB CQ size amplification
e12d755a77 is described below

commit e12d755a7728ec140df7466f4b76882a2945f5ea
Author: rongtong <[email protected]>
AuthorDate: Mon Jun 1 10:47:24 2026 +0800

    [ISSUE #10344] Parameterize RocksDB CQ size amplification
---
 .../rocketmq/store/config/MessageStoreConfig.java  | 10 ++++++++
 .../store/rocksdb/RocksDBOptionsFactory.java       | 14 +++++------
 .../store/rocksdb/RocksDBOptionsFactoryTest.java   | 28 ++++++++++++++++++++++
 3 files changed, 45 insertions(+), 7 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 9f9f4ac3df..fadc957e9d 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -505,6 +505,8 @@ public class MessageStoreConfig {
 
     private String rocksdbCompressionType = 
CompressionType.LZ4_COMPRESSION.getLibraryName();
 
+    private int rocksdbMaxSizeAmplificationPercent = 25;
+
     private long popRocksdbBlockCacheSize = 256 * SizeUnit.MB;
 
     private long popRocksdbWriteBufferSize = 32 * SizeUnit.MB;
@@ -535,6 +537,14 @@ public class MessageStoreConfig {
         this.rocksdbCompressionType = compressionType;
     }
 
+    public int getRocksdbMaxSizeAmplificationPercent() {
+        return rocksdbMaxSizeAmplificationPercent;
+    }
+
+    public void setRocksdbMaxSizeAmplificationPercent(int 
rocksdbMaxSizeAmplificationPercent) {
+        this.rocksdbMaxSizeAmplificationPercent = 
rocksdbMaxSizeAmplificationPercent;
+    }
+
     public long getPopRocksdbBlockCacheSize() {
         return popRocksdbBlockCacheSize;
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
index 37eec67d35..b6d693219d 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.store.rocksdb;
 
 import org.apache.rocketmq.common.config.ConfigHelper;
 import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.BloomFilter;
 import org.rocksdb.ColumnFamilyOptions;
@@ -57,19 +58,18 @@ public class RocksDBOptionsFactory {
                 setBlockCache(new LRUCache(1024 * SizeUnit.MB, 8, false)).
                 setWholeKeyFiltering(true);
 
+        MessageStoreConfig messageStoreConfig = 
messageStore.getMessageStoreConfig();
         ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions();
-        CompactionOptionsUniversal compactionOption = new 
CompactionOptionsUniversal();
-        compactionOption.setSizeRatio(100).
-                setMaxSizeAmplificationPercent(25).
+        CompactionOptionsUniversal compactionOption = new 
CompactionOptionsUniversal().
+                setSizeRatio(100).
+                
setMaxSizeAmplificationPercent(messageStoreConfig.getRocksdbMaxSizeAmplificationPercent()).
                 setAllowTrivialMove(true).
                 setMinMergeWidth(2).
                 setMaxMergeWidth(Integer.MAX_VALUE).
                 setStopStyle(CompactionStopStyle.CompactionStopStyleTotalSize).
                 setCompressionSizePercent(-1);
-        String bottomMostCompressionTypeOpt = 
messageStore.getMessageStoreConfig()
-            .getBottomMostCompressionTypeForConsumeQueueStore();
-        String compressionTypeOpt = messageStore.getMessageStoreConfig()
-            .getRocksdbCompressionType();
+        String bottomMostCompressionTypeOpt = 
messageStoreConfig.getBottomMostCompressionTypeForConsumeQueueStore();
+        String compressionTypeOpt = 
messageStoreConfig.getRocksdbCompressionType();
         CompressionType bottomMostCompressionType = 
CompressionType.getCompressionType(bottomMostCompressionTypeOpt);
         CompressionType compressionType = 
CompressionType.getCompressionType(compressionTypeOpt);
         return columnFamilyOptions.setMaxWriteBufferNumber(4).
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java
index 1d7273968f..ef285cd999 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java
@@ -17,13 +17,26 @@
 
 package org.apache.rocketmq.store.rocksdb;
 
+import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionOptionsUniversal;
 import org.rocksdb.CompressionType;
+import org.rocksdb.RocksDB;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class RocksDBOptionsFactoryTest {
 
+    @BeforeClass
+    public static void loadRocksDB() {
+        RocksDB.loadLibrary();
+    }
+
     @Test
     public void testBottomMostCompressionType() {
         MessageStoreConfig config = new MessageStoreConfig();
@@ -31,4 +44,19 @@ public class RocksDBOptionsFactoryTest {
             
CompressionType.getCompressionType(config.getBottomMostCompressionTypeForConsumeQueueStore()));
         Assert.assertEquals(CompressionType.LZ4_COMPRESSION, 
CompressionType.getCompressionType("lz4"));
     }
+
+    @Test
+    public void 
testConsumeQueueUniversalCompactionMaxSizeAmplificationPercent() {
+        MessageStoreConfig config = new MessageStoreConfig();
+        config.setRocksdbMaxSizeAmplificationPercent(50);
+        MessageStore messageStore = mock(MessageStore.class);
+        when(messageStore.getMessageStoreConfig()).thenReturn(config);
+
+        ConsumeQueueCompactionFilterFactory compactionFilterFactory = new 
ConsumeQueueCompactionFilterFactory(() -> 0);
+        try (ColumnFamilyOptions options = 
RocksDBOptionsFactory.createCQCFOptions(messageStore, compactionFilterFactory);
+            CompactionOptionsUniversal compactionOptions = 
options.compactionOptionsUniversal()) {
+            Assert.assertEquals(50, 
compactionOptions.maxSizeAmplificationPercent());
+        }
+    }
+
 }

Reply via email to