This is an automated email from the ASF dual-hosted git repository.

fuyou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1e0b08f56a fix: make ConsumeQueueStore bottom most compression type 
configurable (#8841)
1e0b08f56a is described below

commit 1e0b08f56ab628c9017536eba8509d8987ec6be5
Author: Zhanhui Li <[email protected]>
AuthorDate: Fri Oct 18 16:50:13 2024 +0800

    fix: make ConsumeQueueStore bottom most compression type configurable 
(#8841)
    
    Signed-off-by: Li Zhanhui <[email protected]>
---
 .../rocketmq/store/config/MessageStoreConfig.java  | 23 +++++++++++++++
 .../store/rocksdb/RocksDBOptionsFactory.java       |  5 +++-
 .../store/rocksdb/RocksDBOptionsFactoryTest.java   | 34 ++++++++++++++++++++++
 3 files changed, 61 insertions(+), 1 deletion(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 6853128438..5195868e0f 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -430,6 +430,22 @@ public class MessageStoreConfig {
 
     private int batchWriteKvCqSize = 16;
 
+    /**
+     * If ConsumeQueueStore is RocksDB based, this option is to configure 
bottom-most tier compression type.
+     * The following values are valid:
+     * <ul>
+     *     <li>snappy</li>
+     *     <li>z</li>
+     *     <li>bzip2</li>
+     *     <li>lz4</li>
+     *     <li>lz4hc</li>
+     *     <li>xpress</li>
+     *     <li>zstd</li>
+     * </ul>
+     *
+     * LZ4 is the recommended one.
+     */
+    private String bottomMostCompressionTypeForConsumeQueueStore = "zstd";
 
     public int getBatchWriteKvCqSize() {
         return batchWriteKvCqSize;
@@ -1885,4 +1901,11 @@ public class MessageStoreConfig {
         this.transferMetadataJsonToRocksdb = transferMetadataJsonToRocksdb;
     }
 
+    public String getBottomMostCompressionTypeForConsumeQueueStore() {
+        return bottomMostCompressionTypeForConsumeQueueStore;
+    }
+
+    public void setBottomMostCompressionTypeForConsumeQueueStore(String 
bottomMostCompressionTypeForConsumeQueueStore) {
+        this.bottomMostCompressionTypeForConsumeQueueStore = 
bottomMostCompressionTypeForConsumeQueueStore;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
index c7d5041bd8..d373ba6249 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
@@ -65,13 +65,16 @@ public class RocksDBOptionsFactory {
                 setMaxMergeWidth(Integer.MAX_VALUE).
                 setStopStyle(CompactionStopStyle.CompactionStopStyleTotalSize).
                 setCompressionSizePercent(-1);
+        String bottomMostCompressionTypeOpt = 
messageStore.getMessageStoreConfig()
+            .getBottomMostCompressionTypeForConsumeQueueStore();
+        CompressionType bottomMostCompressionType = 
CompressionType.getCompressionType(bottomMostCompressionTypeOpt);
         return columnFamilyOptions.setMaxWriteBufferNumber(4).
                 setWriteBufferSize(128 * SizeUnit.MB).
                 setMinWriteBufferNumberToMerge(1).
                 setTableFormatConfig(blockBasedTableConfig).
                 setMemTableConfig(new SkipListMemTableConfig()).
                 setCompressionType(CompressionType.LZ4_COMPRESSION).
-                setBottommostCompressionType(CompressionType.LZ4_COMPRESSION).
+                setBottommostCompressionType(bottomMostCompressionType).
                 setNumLevels(7).
                 setCompactionStyle(CompactionStyle.UNIVERSAL).
                 setCompactionOptionsUniversal(compactionOption).
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java
new file mode 100644
index 0000000000..1d7273968f
--- /dev/null
+++ 
b/store/src/test/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactoryTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.rocksdb;
+
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.CompressionType;
+
+public class RocksDBOptionsFactoryTest {
+
+    @Test
+    public void testBottomMostCompressionType() {
+        MessageStoreConfig config = new MessageStoreConfig();
+        Assert.assertEquals(CompressionType.ZSTD_COMPRESSION,
+            
CompressionType.getCompressionType(config.getBottomMostCompressionTypeForConsumeQueueStore()));
+        Assert.assertEquals(CompressionType.LZ4_COMPRESSION, 
CompressionType.getCompressionType("lz4"));
+    }
+}

Reply via email to