This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 49e23e1e0b fix: atomic flush incorrect use and clean up code (#8830)
49e23e1e0b is described below

commit 49e23e1e0b8bc37e4497de97202a343956de3968
Author: Zhanhui Li <[email protected]>
AuthorDate: Fri Oct 18 14:30:38 2024 +0800

    fix: atomic flush incorrect use and clean up code (#8830)
    
    Signed-off-by: Li Zhanhui <[email protected]>
---
 .../common/config/AbstractRocksDBStorage.java      | 199 +++++++++++++++------
 .../rocketmq/common/config/ConfigHelper.java       | 121 +++++++++++++
 .../common/config/ConfigRocksDBStorage.java        | 166 ++---------------
 .../store/queue/RocksDBConsumeQueueStore.java      |   8 +-
 .../store/rocksdb/ConsumeQueueRocksDBStorage.java  |  41 ++---
 .../store/rocksdb/RocksDBOptionsFactory.java       |  10 +-
 6 files changed, 299 insertions(+), 246 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index 13522889bb..42ddbdc728 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -17,18 +17,10 @@
 package org.apache.rocketmq.common.config;
 
 import com.google.common.collect.Maps;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.nio.charset.StandardCharsets;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -37,7 +29,9 @@ import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.CompactRangeOptions;
 import org.rocksdb.CompactionOptions;
+import org.rocksdb.CompressionType;
 import org.rocksdb.DBOptions;
+import org.rocksdb.Env;
 import org.rocksdb.FlushOptions;
 import org.rocksdb.LiveFileMetaData;
 import org.rocksdb.Priority;
@@ -49,14 +43,31 @@ import org.rocksdb.Status;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
 
-import static org.rocksdb.RocksDB.NOT_FOUND;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 public abstract class AbstractRocksDBStorage {
     protected static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
 
+    /**
+     * Direct Jemalloc allocator
+     */
+    public static final PooledByteBufAllocator POOLED_ALLOCATOR = new 
PooledByteBufAllocator(true);
+
+    public static final byte CTRL_0 = '\u0000';
+    public static final byte CTRL_1 = '\u0001';
+    public static final byte CTRL_2 = '\u0002';
+
     private static final String SPACE = " | ";
 
-    protected String dbPath;
+    protected final String dbPath;
     protected boolean readOnly;
     protected RocksDB db;
     protected DBOptions options;
@@ -71,7 +82,8 @@ public abstract class AbstractRocksDBStorage {
     protected CompactRangeOptions compactRangeOptions;
 
     protected ColumnFamilyHandle defaultCFHandle;
-    protected final List<ColumnFamilyOptions> cfOptions = new ArrayList();
+    protected final List<ColumnFamilyOptions> cfOptions = new ArrayList<>();
+    protected final List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
 
     protected volatile boolean loaded;
     private volatile boolean closed;
@@ -79,15 +91,76 @@ public abstract class AbstractRocksDBStorage {
     private final Semaphore reloadPermit = new Semaphore(1);
     private final ScheduledExecutorService reloadScheduler = 
ThreadUtils.newScheduledThreadPool(1, new 
ThreadFactoryImpl("RocksDBStorageReloadService_"));
     private final ThreadPoolExecutor manualCompactionThread = 
(ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor(
-            1, 1, 1000 * 60, TimeUnit.MILLISECONDS,
-            new ArrayBlockingQueue(1),
-            new ThreadFactoryImpl("RocksDBManualCompactionService_"),
-            new ThreadPoolExecutor.DiscardOldestPolicy());
+        1, 1, 1000 * 60, TimeUnit.MILLISECONDS,
+        new ArrayBlockingQueue<>(1),
+        new ThreadFactoryImpl("RocksDBManualCompactionService_"),
+        new ThreadPoolExecutor.DiscardOldestPolicy());
 
     static {
         RocksDB.loadLibrary();
     }
 
+    public AbstractRocksDBStorage(String dbPath) {
+        this.dbPath = dbPath;
+    }
+
+    protected void initOptions() {
+        initWriteOptions();
+        initAbleWalWriteOptions();
+        initReadOptions();
+        initTotalOrderReadOptions();
+        initCompactRangeOptions();
+        initCompactionOptions();
+    }
+
+    /**
+     * Write options for <a 
href="https://github.com/facebook/rocksdb/wiki/Atomic-flush";>Atomic Flush</a>
+     */
+    protected void initWriteOptions() {
+        this.writeOptions = new WriteOptions();
+        this.writeOptions.setSync(false);
+        this.writeOptions.setDisableWAL(true);
+        this.writeOptions.setNoSlowdown(true);
+    }
+
+    protected void initAbleWalWriteOptions() {
+        this.ableWalWriteOptions = new WriteOptions();
+        this.ableWalWriteOptions.setSync(false);
+        this.ableWalWriteOptions.setDisableWAL(false);
+        this.ableWalWriteOptions.setNoSlowdown(true);
+    }
+
+    protected void initReadOptions() {
+        this.readOptions = new ReadOptions();
+        this.readOptions.setPrefixSameAsStart(true);
+        this.readOptions.setTotalOrderSeek(false);
+        this.readOptions.setTailing(false);
+    }
+
+    protected void initTotalOrderReadOptions() {
+        this.totalOrderReadOptions = new ReadOptions();
+        this.totalOrderReadOptions.setPrefixSameAsStart(false);
+        this.totalOrderReadOptions.setTotalOrderSeek(true);
+        this.totalOrderReadOptions.setTailing(false);
+    }
+
+    protected void initCompactRangeOptions() {
+        this.compactRangeOptions = new CompactRangeOptions();
+        
this.compactRangeOptions.setBottommostLevelCompaction(CompactRangeOptions.BottommostLevelCompaction.kForce);
+        this.compactRangeOptions.setAllowWriteStall(true);
+        this.compactRangeOptions.setExclusiveManualCompaction(false);
+        this.compactRangeOptions.setChangeLevel(true);
+        this.compactRangeOptions.setTargetLevel(-1);
+        this.compactRangeOptions.setMaxSubcompactions(4);
+    }
+
+    protected void initCompactionOptions() {
+        this.compactionOptions = new CompactionOptions();
+        this.compactionOptions.setCompression(CompressionType.LZ4_COMPRESSION);
+        this.compactionOptions.setMaxSubcompactions(4);
+        this.compactionOptions.setOutputFileSizeLimit(4 * 1024 * 1024 * 1024L);
+    }
+
     public boolean hold() {
         if (!this.loaded || this.db == null || this.closed) {
             LOGGER.error("hold rocksdb Failed. {}", this.dbPath);
@@ -101,8 +174,8 @@ public abstract class AbstractRocksDBStorage {
     }
 
     protected void put(ColumnFamilyHandle cfHandle, WriteOptions writeOptions,
-                       final byte[] keyBytes, final int keyLen,
-                       final byte[] valueBytes, final int valueLen) throws 
RocksDBException {
+        final byte[] keyBytes, final int keyLen,
+        final byte[] valueBytes, final int valueLen) throws RocksDBException {
         if (!hold()) {
             throw new IllegalStateException("rocksDB:" + this + " is not 
ready");
         }
@@ -118,7 +191,7 @@ public abstract class AbstractRocksDBStorage {
     }
 
     protected void put(ColumnFamilyHandle cfHandle, WriteOptions writeOptions,
-                       final ByteBuffer keyBB, final ByteBuffer valueBB) 
throws RocksDBException {
+        final ByteBuffer keyBB, final ByteBuffer valueBB) throws 
RocksDBException {
         if (!hold()) {
             throw new IllegalStateException("rocksDB:" + this + " is not 
ready");
         }
@@ -159,13 +232,13 @@ public abstract class AbstractRocksDBStorage {
         }
     }
 
-    protected boolean get(ColumnFamilyHandle cfHandle, ReadOptions readOptions,
-                          final ByteBuffer keyBB, final ByteBuffer valueBB) 
throws RocksDBException {
+    protected int get(ColumnFamilyHandle cfHandle, ReadOptions readOptions, 
final ByteBuffer keyBB,
+        final ByteBuffer valueBB) throws RocksDBException {
         if (!hold()) {
             throw new IllegalStateException("rocksDB:" + this + " is not 
ready");
         }
         try {
-            return this.db.get(cfHandle, readOptions, keyBB, valueBB) != 
NOT_FOUND;
+            return this.db.get(cfHandle, readOptions, keyBB, valueBB);
         } catch (RocksDBException e) {
             LOGGER.error("get Failed. {}, {}", this.dbPath, getStatusError(e));
             throw e;
@@ -175,8 +248,8 @@ public abstract class AbstractRocksDBStorage {
     }
 
     protected List<byte[]> multiGet(final ReadOptions readOptions,
-                                    final List<ColumnFamilyHandle> 
columnFamilyHandleList,
-                                    final List<byte[]> keys) throws 
RocksDBException {
+        final List<ColumnFamilyHandle> columnFamilyHandleList,
+        final List<byte[]> keys) throws RocksDBException {
         if (!hold()) {
             throw new IllegalStateException("rocksDB:" + this + " is not 
ready");
         }
@@ -190,7 +263,8 @@ public abstract class AbstractRocksDBStorage {
         }
     }
 
-    protected void delete(ColumnFamilyHandle cfHandle, WriteOptions 
writeOptions, byte[] keyBytes) throws RocksDBException {
+    protected void delete(ColumnFamilyHandle cfHandle, WriteOptions 
writeOptions,
+        byte[] keyBytes) throws RocksDBException {
         if (!hold()) {
             throw new IllegalStateException("rocksDB:" + this + " is not 
ready");
         }
@@ -204,7 +278,8 @@ public abstract class AbstractRocksDBStorage {
         }
     }
 
-    protected void delete(ColumnFamilyHandle cfHandle, WriteOptions 
writeOptions, ByteBuffer keyBB) throws RocksDBException {
+    protected void delete(ColumnFamilyHandle cfHandle, WriteOptions 
writeOptions, ByteBuffer keyBB)
+        throws RocksDBException {
         if (!hold()) {
             throw new IllegalStateException("rocksDB:" + this + " is not 
ready");
         }
@@ -218,8 +293,8 @@ public abstract class AbstractRocksDBStorage {
         }
     }
 
-    protected void rangeDelete(ColumnFamilyHandle cfHandle, WriteOptions 
writeOptions,
-                               final byte[] startKey, final byte[] endKey) 
throws RocksDBException {
+    protected void rangeDelete(ColumnFamilyHandle cfHandle, WriteOptions 
writeOptions, final byte[] startKey,
+        final byte[] endKey) throws RocksDBException {
         if (!hold()) {
             throw new IllegalStateException("rocksDB:" + this + " is not 
ready");
         }
@@ -262,16 +337,17 @@ public abstract class AbstractRocksDBStorage {
         });
     }
 
-    protected void open(final List<ColumnFamilyDescriptor> cfDescriptors,
-                        final List<ColumnFamilyHandle> cfHandles) throws 
RocksDBException {
+    protected void open(final List<ColumnFamilyDescriptor> cfDescriptors) 
throws RocksDBException {
+        this.cfHandles.clear();
         if (this.readOnly) {
             this.db = RocksDB.openReadOnly(this.options, this.dbPath, 
cfDescriptors, cfHandles);
         } else {
             this.db = RocksDB.open(this.options, this.dbPath, cfDescriptors, 
cfHandles);
         }
-        this.db.getEnv().setBackgroundThreads(8, Priority.HIGH);
-        this.db.getEnv().setBackgroundThreads(8, Priority.LOW);
-
+        assert cfDescriptors.size() == cfHandles.size();
+        try (Env env = this.db.getEnv()) {
+            env.setBackgroundThreads(8, Priority.LOW);
+        }
         if (this.db == null) {
             throw new RocksDBException("open rocksdb null");
         }
@@ -293,6 +369,9 @@ public abstract class AbstractRocksDBStorage {
         }
     }
 
+    /**
+     * Close column family handles except the default column family
+     */
     protected abstract void preShutdown();
 
     public synchronized boolean shutdown() {
@@ -310,11 +389,12 @@ public abstract class AbstractRocksDBStorage {
             }
             this.db.cancelAllBackgroundWork(true);
             this.db.pauseBackgroundWork();
-            //The close order is matter.
+            //The close order matters.
             //1. close column family handles
             preShutdown();
 
             this.defaultCFHandle.close();
+
             //2. close column family options.
             for (final ColumnFamilyOptions opt : this.cfOptions) {
                 opt.close();
@@ -332,9 +412,6 @@ public abstract class AbstractRocksDBStorage {
             if (this.totalOrderReadOptions != null) {
                 this.totalOrderReadOptions.close();
             }
-            if (this.options != null) {
-                this.options.close();
-            }
             //4. close db.
             if (db != null && !this.readOnly) {
                 this.db.syncWal();
@@ -342,6 +419,10 @@ public abstract class AbstractRocksDBStorage {
             if (db != null) {
                 this.db.closeE();
             }
+            // Close DBOptions after RocksDB instance is closed.
+            if (this.options != null) {
+                this.options.close();
+            }
             //5. help gc.
             this.cfOptions.clear();
             this.db = null;
@@ -360,21 +441,33 @@ public abstract class AbstractRocksDBStorage {
         return true;
     }
 
-    public void flush(final FlushOptions flushOptions) {
+    public void flush(final FlushOptions flushOptions) throws RocksDBException 
{
+        flush(flushOptions, this.cfHandles);
+    }
+
+    public void flush(final FlushOptions flushOptions, 
List<ColumnFamilyHandle> columnFamilyHandles) throws RocksDBException {
         if (!this.loaded || this.readOnly || closed) {
             return;
         }
 
         try {
             if (db != null) {
-                this.db.flush(flushOptions);
+                // For atomic-flush, we have to explicitly specify column 
family handles
+                // See https://github.com/rust-rocksdb/rust-rocksdb/pull/793
+                // and 
https://github.com/facebook/rocksdb/blob/8ad4c7efc48d301f5e85467105d7019a49984dc8/include/rocksdb/db.h#L1667
+                this.db.flush(flushOptions, columnFamilyHandles);
             }
         } catch (RocksDBException e) {
             scheduleReloadRocksdb(e);
             LOGGER.error("flush Failed. {}, {}", this.dbPath, 
getStatusError(e));
+            throw e;
         }
     }
 
+    public void flushWAL() throws RocksDBException {
+        this.db.flushWal(true);
+    }
+
     public Statistics getStatistics() {
         return this.options.statistics();
     }
@@ -441,10 +534,6 @@ public abstract class AbstractRocksDBStorage {
         LOGGER.info("reload rocksdb OK. {}", this.dbPath);
     }
 
-    public void flushWAL() throws RocksDBException {
-        this.db.flushWal(true);
-    }
-
     private String getStatusError(RocksDBException e) {
         if (e == null || e.getStatus() == null) {
             return "null";
@@ -477,13 +566,13 @@ public abstract class AbstractRocksDBStorage {
             Map<Integer, StringBuilder> map = Maps.newHashMap();
             for (LiveFileMetaData metaData : liveFileMetaDataList) {
                 StringBuilder sb = map.computeIfAbsent(metaData.level(), k -> 
new StringBuilder(256));
-                sb.append(new String(metaData.columnFamilyName(), 
DataConverter.CHARSET_UTF8)).append(SPACE).
-                        append(metaData.fileName()).append(SPACE).
-                        append("s: ").append(metaData.size()).append(SPACE).
-                        append("a: 
").append(metaData.numEntries()).append(SPACE).
-                        append("r: 
").append(metaData.numReadsSampled()).append(SPACE).
-                        append("d: 
").append(metaData.numDeletions()).append(SPACE).
-                        append(metaData.beingCompacted()).append("\n");
+                sb.append(new String(metaData.columnFamilyName(), 
StandardCharsets.UTF_8)).append(SPACE).
+                    append(metaData.fileName()).append(SPACE).
+                    append("s: ").append(metaData.size()).append(SPACE).
+                    append("a: ").append(metaData.numEntries()).append(SPACE).
+                    append("r: 
").append(metaData.numReadsSampled()).append(SPACE).
+                    append("d: 
").append(metaData.numDeletions()).append(SPACE).
+                    append(metaData.beingCompacted()).append("\n");
             }
 
             map.forEach((key, value) -> logger.info("level: {}\n{}", key, 
value.toString()));
@@ -492,11 +581,9 @@ public abstract class AbstractRocksDBStorage {
             String indexesAndFilterBlockMemUsage = 
this.db.getProperty("rocksdb.estimate-table-readers-mem");
             String memTableMemUsage = 
this.db.getProperty("rocksdb.cur-size-all-mem-tables");
             String blocksPinnedByIteratorMemUsage = 
this.db.getProperty("rocksdb.block-cache-pinned-usage");
-            logger.info("MemUsage. blockCache: {}, indexesAndFilterBlock: {}, 
memtable: {}, blocksPinnedByIterator: {}",
-                    blockCacheMemUsage, indexesAndFilterBlockMemUsage, 
memTableMemUsage, blocksPinnedByIteratorMemUsage);
-        } catch (Exception e) {
-            logger.error("statRocksdb Failed. {}", this.dbPath, e);
-            throw new RuntimeException(e);
+            logger.info("MemUsage. blockCache: {}, indexesAndFilterBlock: {}, 
MemTable: {}, blocksPinnedByIterator: {}",
+                blockCacheMemUsage, indexesAndFilterBlockMemUsage, 
memTableMemUsage, blocksPinnedByIteratorMemUsage);
+        } catch (Exception ignored) {
         }
     }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java 
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java
new file mode 100644
index 0000000000..95d5119cfc
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.config;
+
+import java.io.File;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.UtilAll;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.DBOptions;
+import org.rocksdb.DataBlockIndexType;
+import org.rocksdb.IndexType;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.LRUCache;
+import org.rocksdb.RateLimiter;
+import org.rocksdb.SkipListMemTableConfig;
+import org.rocksdb.Statistics;
+import org.rocksdb.StatsLevel;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WALRecoveryMode;
+import org.rocksdb.util.SizeUnit;
+
+public class ConfigHelper {
+    public static ColumnFamilyOptions createConfigOptions() {
+        BlockBasedTableConfig blockBasedTableConfig = new 
BlockBasedTableConfig().
+            setFormatVersion(5).
+            setIndexType(IndexType.kBinarySearch).
+            setDataBlockIndexType(DataBlockIndexType.kDataBlockBinarySearch).
+            setBlockSize(32 * SizeUnit.KB).
+            setFilterPolicy(new BloomFilter(16, false)).
+            // Indicating if we'd put index/filter blocks to the block cache.
+                setCacheIndexAndFilterBlocks(false).
+            setCacheIndexAndFilterBlocksWithHighPriority(true).
+            setPinL0FilterAndIndexBlocksInCache(false).
+            setPinTopLevelIndexAndFilter(true).
+            setBlockCache(new LRUCache(4 * SizeUnit.MB, 8, false)).
+            setWholeKeyFiltering(true);
+
+        ColumnFamilyOptions options = new ColumnFamilyOptions();
+        return options.setMaxWriteBufferNumber(2).
+            // MemTable size, MemTable(cache) -> immutable MemTable(cache) -> 
SST(disk)
+                setWriteBufferSize(8 * SizeUnit.MB).
+            setMinWriteBufferNumberToMerge(1).
+            setTableFormatConfig(blockBasedTableConfig).
+            setMemTableConfig(new SkipListMemTableConfig()).
+            setCompressionType(CompressionType.NO_COMPRESSION).
+            setNumLevels(7).
+            setCompactionStyle(CompactionStyle.LEVEL).
+            setLevel0FileNumCompactionTrigger(4).
+            setLevel0SlowdownWritesTrigger(8).
+            setLevel0StopWritesTrigger(12).
+            // The target file size for compaction.
+                setTargetFileSizeBase(64 * SizeUnit.MB).
+            setTargetFileSizeMultiplier(2).
+            // The upper-bound of the total size of L1 files in bytes
+                setMaxBytesForLevelBase(256 * SizeUnit.MB).
+            setMaxBytesForLevelMultiplier(2).
+            setMergeOperator(new StringAppendOperator()).
+            setInplaceUpdateSupport(true);
+    }
+
+    public static DBOptions createConfigDBOptions() {
+        //Turn based on 
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
+        // and 
http://gitlab.alibaba-inc.com/aloha/aloha/blob/branch_2_5_0/jstorm-core/src/main/java/com/alibaba/jstorm/cache/rocksdb/RocksDbOptionsFactory.java
+        DBOptions options = new DBOptions();
+        Statistics statistics = new Statistics();
+        statistics.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
+        return options.
+            setDbLogDir(getDBLogDir()).
+            setInfoLogLevel(InfoLogLevel.INFO_LEVEL).
+            setWalRecoveryMode(WALRecoveryMode.SkipAnyCorruptedRecords).
+            setManualWalFlush(true).
+            setMaxTotalWalSize(500 * SizeUnit.MB).
+            setWalSizeLimitMB(0).
+            setWalTtlSeconds(0).
+            setCreateIfMissing(true).
+            setCreateMissingColumnFamilies(true).
+            setMaxOpenFiles(-1).
+            setMaxLogFileSize(SizeUnit.GB).
+            setKeepLogFileNum(5).
+            setMaxManifestFileSize(SizeUnit.GB).
+            setAllowConcurrentMemtableWrite(false).
+            setStatistics(statistics).
+            setStatsDumpPeriodSec(600).
+            setAtomicFlush(true).
+            setMaxBackgroundJobs(32).
+            setMaxSubcompactions(4).
+            setParanoidChecks(true).
+            setDelayedWriteRate(16 * SizeUnit.MB).
+            setRateLimiter(new RateLimiter(100 * SizeUnit.MB)).
+            setUseDirectIoForFlushAndCompaction(true).
+            setUseDirectReads(true);
+    }
+
+    public static String getDBLogDir() {
+        String rootPath = System.getProperty("user.home");
+        if (StringUtils.isEmpty(rootPath)) {
+            return "";
+        }
+        rootPath = rootPath + File.separator + "logs";
+        UtilAll.ensureDirOK(rootPath);
+        return rootPath + File.separator + "rocketmqlogs" + File.separator;
+    }
+}
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
index f657d9cf2d..36da6834ff 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
@@ -16,101 +16,43 @@
  */
 package org.apache.rocketmq.common.config;
 
-import java.io.File;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.UtilAll;
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.BloomFilter;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.CompactRangeOptions;
-import org.rocksdb.CompactRangeOptions.BottommostLevelCompaction;
-import org.rocksdb.CompactionOptions;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.CompressionType;
-import org.rocksdb.DBOptions;
-import org.rocksdb.DataBlockIndexType;
-import org.rocksdb.IndexType;
-import org.rocksdb.InfoLogLevel;
-import org.rocksdb.LRUCache;
-import org.rocksdb.RateLimiter;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
-import org.rocksdb.SkipListMemTableConfig;
-import org.rocksdb.Statistics;
-import org.rocksdb.StatsLevel;
-import org.rocksdb.StringAppendOperator;
-import org.rocksdb.WALRecoveryMode;
 import org.rocksdb.WriteBatch;
-import org.rocksdb.WriteOptions;
-import org.rocksdb.util.SizeUnit;
 
 public class ConfigRocksDBStorage extends AbstractRocksDBStorage {
+    public static final byte[] KV_DATA_VERSION_COLUMN_FAMILY_NAME = 
"kvDataVersion".getBytes(StandardCharsets.UTF_8);
+    public static final byte[] FORBIDDEN_COLUMN_FAMILY_NAME = 
"forbidden".getBytes(StandardCharsets.UTF_8);
 
-    private static final byte[] KV_DATA_VERSION_COLUMN_FAMILY_NAME = 
"kvDataVersion".getBytes(StandardCharsets.UTF_8);
-    private static final byte[] KV_DATA_VERSION_KEY = 
"kvDataVersionKey".getBytes(StandardCharsets.UTF_8);
     protected ColumnFamilyHandle kvDataVersionFamilyHandle;
-
-    private static final byte[] FORBIDDEN_COLUMN_FAMILY_NAME = 
"forbidden".getBytes(StandardCharsets.UTF_8);
     protected ColumnFamilyHandle forbiddenFamilyHandle;
 
-
+    public static final byte[] KV_DATA_VERSION_KEY = 
"kvDataVersionKey".getBytes(StandardCharsets.UTF_8);
 
     public ConfigRocksDBStorage(final String dbPath) {
-        super();
-        this.dbPath = dbPath;
+        super(dbPath);
         this.readOnly = false;
     }
 
     public ConfigRocksDBStorage(final String dbPath, boolean readOnly) {
-        super();
-        this.dbPath = dbPath;
+        super(dbPath);
         this.readOnly = readOnly;
     }
 
-    private void initOptions() {
-        this.options = createConfigDBOptions();
-
-        this.writeOptions = new WriteOptions();
-        this.writeOptions.setSync(false);
-        this.writeOptions.setDisableWAL(true);
-        this.writeOptions.setNoSlowdown(true);
-
-        this.ableWalWriteOptions = new WriteOptions();
-        this.ableWalWriteOptions.setSync(false);
-        this.ableWalWriteOptions.setDisableWAL(false);
-        this.ableWalWriteOptions.setNoSlowdown(true);
-
-        this.readOptions = new ReadOptions();
-        this.readOptions.setPrefixSameAsStart(true);
-        this.readOptions.setTotalOrderSeek(false);
-        this.readOptions.setTailing(false);
-
-        this.totalOrderReadOptions = new ReadOptions();
-        this.totalOrderReadOptions.setPrefixSameAsStart(false);
-        this.totalOrderReadOptions.setTotalOrderSeek(false);
-        this.totalOrderReadOptions.setTailing(false);
-
-        this.compactRangeOptions = new CompactRangeOptions();
-        
this.compactRangeOptions.setBottommostLevelCompaction(BottommostLevelCompaction.kForce);
-        this.compactRangeOptions.setAllowWriteStall(true);
-        this.compactRangeOptions.setExclusiveManualCompaction(false);
-        this.compactRangeOptions.setChangeLevel(true);
-        this.compactRangeOptions.setTargetLevel(-1);
-        this.compactRangeOptions.setMaxSubcompactions(4);
-
-        this.compactionOptions = new CompactionOptions();
-        this.compactionOptions.setCompression(CompressionType.LZ4_COMPRESSION);
-        this.compactionOptions.setMaxSubcompactions(4);
-        this.compactionOptions.setOutputFileSizeLimit(4 * 1024 * 1024 * 1024L);
+    protected void initOptions() {
+        this.options = ConfigHelper.createConfigDBOptions();
+        super.initOptions();
     }
 
     @Override
@@ -120,15 +62,14 @@ public class ConfigRocksDBStorage extends 
AbstractRocksDBStorage {
 
             initOptions();
 
-            final List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList();
+            final List<ColumnFamilyDescriptor> cfDescriptors = new 
ArrayList<>();
 
-            ColumnFamilyOptions defaultOptions = createConfigOptions();
+            ColumnFamilyOptions defaultOptions = 
ConfigHelper.createConfigOptions();
             this.cfOptions.add(defaultOptions);
             cfDescriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions));
             cfDescriptors.add(new 
ColumnFamilyDescriptor(KV_DATA_VERSION_COLUMN_FAMILY_NAME, defaultOptions));
             cfDescriptors.add(new 
ColumnFamilyDescriptor(FORBIDDEN_COLUMN_FAMILY_NAME, defaultOptions));
-            final List<ColumnFamilyHandle> cfHandles = new ArrayList();
-            open(cfDescriptors, cfHandles);
+            open(cfDescriptors);
 
             this.defaultCFHandle = cfHandles.get(0);
             this.kvDataVersionFamilyHandle = cfHandles.get(1);
@@ -147,87 +88,6 @@ public class ConfigRocksDBStorage extends 
AbstractRocksDBStorage {
         this.forbiddenFamilyHandle.close();
     }
 
-    private ColumnFamilyOptions createConfigOptions() {
-        BlockBasedTableConfig blockBasedTableConfig = new 
BlockBasedTableConfig().
-            setFormatVersion(5).
-            setIndexType(IndexType.kBinarySearch).
-            setDataBlockIndexType(DataBlockIndexType.kDataBlockBinarySearch).
-            setBlockSize(32 * SizeUnit.KB).
-            setFilterPolicy(new BloomFilter(16, false)).
-            // Indicating if we'd put index/filter blocks to the block cache.
-            setCacheIndexAndFilterBlocks(false).
-            setCacheIndexAndFilterBlocksWithHighPriority(true).
-            setPinL0FilterAndIndexBlocksInCache(false).
-            setPinTopLevelIndexAndFilter(true).
-            setBlockCache(new LRUCache(4 * SizeUnit.MB, 8, false)).
-            setWholeKeyFiltering(true);
-
-        ColumnFamilyOptions options = new ColumnFamilyOptions();
-        return options.setMaxWriteBufferNumber(2).
-            // MemTable size, memtable(cache) -> immutable memtable(cache) -> 
sst(disk)
-            setWriteBufferSize(8 * SizeUnit.MB).
-            setMinWriteBufferNumberToMerge(1).
-            setTableFormatConfig(blockBasedTableConfig).
-            setMemTableConfig(new SkipListMemTableConfig()).
-            setCompressionType(CompressionType.NO_COMPRESSION).
-            setNumLevels(7).
-            setCompactionStyle(CompactionStyle.LEVEL).
-            setLevel0FileNumCompactionTrigger(4).
-            setLevel0SlowdownWritesTrigger(8).
-            setLevel0StopWritesTrigger(12).
-            // The target file size for compaction.
-            setTargetFileSizeBase(64 * SizeUnit.MB).
-            setTargetFileSizeMultiplier(2).
-            // The upper-bound of the total size of L1 files in bytes
-            setMaxBytesForLevelBase(256 * SizeUnit.MB).
-            setMaxBytesForLevelMultiplier(2).
-            setMergeOperator(new StringAppendOperator()).
-            setInplaceUpdateSupport(true);
-    }
-
-    private DBOptions createConfigDBOptions() {
-        //Turn based on 
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
-        // and 
http://gitlab.alibaba-inc.com/aloha/aloha/blob/branch_2_5_0/jstorm-core/src/main/java/com/alibaba/jstorm/cache/rocksdb/RocksDbOptionsFactory.java
-        DBOptions options = new DBOptions();
-        Statistics statistics = new Statistics();
-        statistics.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
-        return options.
-            setDbLogDir(getDBLogDir()).
-            setInfoLogLevel(InfoLogLevel.INFO_LEVEL).
-            setWalRecoveryMode(WALRecoveryMode.SkipAnyCorruptedRecords).
-            setManualWalFlush(true).
-            setMaxTotalWalSize(500 * SizeUnit.MB).
-            setWalSizeLimitMB(0).
-            setWalTtlSeconds(0).
-            setCreateIfMissing(true).
-            setCreateMissingColumnFamilies(true).
-            setMaxOpenFiles(-1).
-            setMaxLogFileSize(1 * SizeUnit.GB).
-            setKeepLogFileNum(5).
-            setMaxManifestFileSize(1 * SizeUnit.GB).
-            setAllowConcurrentMemtableWrite(false).
-            setStatistics(statistics).
-            setStatsDumpPeriodSec(600).
-            setAtomicFlush(true).
-            setMaxBackgroundJobs(32).
-            setMaxSubcompactions(4).
-            setParanoidChecks(true).
-            setDelayedWriteRate(16 * SizeUnit.MB).
-            setRateLimiter(new RateLimiter(100 * SizeUnit.MB)).
-            setUseDirectIoForFlushAndCompaction(true).
-            setUseDirectReads(true);
-    }
-
-    public static String getDBLogDir() {
-        String rootPath = System.getProperty("user.home");
-        if (StringUtils.isEmpty(rootPath)) {
-            return "";
-        }
-        rootPath = rootPath + File.separator + "logs";
-        UtilAll.ensureDirOK(rootPath);
-        return rootPath + File.separator + "rocketmqlogs" + File.separator;
-    }
-
     public void put(final byte[] keyBytes, final int keyLen, final byte[] 
valueBytes) throws Exception {
         put(this.defaultCFHandle, this.ableWalWriteOptions, keyBytes, keyLen, 
valueBytes, valueBytes.length);
     }
@@ -281,10 +141,6 @@ public class ConfigRocksDBStorage extends 
AbstractRocksDBStorage {
         return this.db.newIterator(this.forbiddenFamilyHandle, 
this.totalOrderReadOptions);
     }
 
-    public void rangeDelete(final byte[] startKey, final byte[] endKey) throws 
RocksDBException {
-        rangeDelete(this.defaultCFHandle, this.writeOptions, startKey, endKey);
-    }
-
     public RocksIterator iterator(ReadOptions readOptions) {
         return this.db.newIterator(this.defaultCFHandle, readOptions);
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index c889ae7ca8..17b845d817 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -81,15 +81,15 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
         super(messageStore);
 
         this.storePath = 
StorePathConfigHelper.getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir());
-        this.rocksDBStorage = new ConsumeQueueRocksDBStorage(messageStore, 
storePath, 4);
+        this.rocksDBStorage = new ConsumeQueueRocksDBStorage(messageStore, 
storePath);
         this.rocksDBConsumeQueueTable = new 
RocksDBConsumeQueueTable(rocksDBStorage, messageStore);
         this.rocksDBConsumeQueueOffsetTable = new 
RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage, 
messageStore);
 
         this.writeBatch = new WriteBatch();
         this.batchSize = messageStoreConfig.getBatchWriteKvCqSize();
-        this.bufferDRList = new ArrayList(batchSize);
-        this.cqBBPairList = new ArrayList(batchSize);
-        this.offsetBBPairList = new ArrayList(batchSize);
+        this.bufferDRList = new ArrayList<>(batchSize);
+        this.cqBBPairList = new ArrayList<>(batchSize);
+        this.offsetBBPairList = new ArrayList<>(batchSize);
         for (int i = 0; i < batchSize; i++) {
             
this.cqBBPairList.add(RocksDBConsumeQueueTable.getCQByteBufferPair());
             
this.offsetBBPairList.add(RocksDBConsumeQueueOffsetTable.getOffsetByteBufferPair());
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
index 362684560c..b343a5b4b5 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
@@ -16,53 +16,45 @@
  */
 package org.apache.rocketmq.store.rocksdb;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
-import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.store.MessageStore;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.CompactRangeOptions;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 import org.rocksdb.WriteBatch;
-import org.rocksdb.WriteOptions;
 
 public class ConsumeQueueRocksDBStorage extends AbstractRocksDBStorage {
+
+    public static final byte[] OFFSET_COLUMN_FAMILY = 
"offset".getBytes(StandardCharsets.UTF_8);
+
     private final MessageStore messageStore;
     private volatile ColumnFamilyHandle offsetCFHandle;
 
-    public ConsumeQueueRocksDBStorage(final MessageStore messageStore, final 
String dbPath, final int prefixLen) {
+    public ConsumeQueueRocksDBStorage(final MessageStore messageStore, final 
String dbPath) {
+        super(dbPath);
         this.messageStore = messageStore;
-        this.dbPath = dbPath;
         this.readOnly = false;
     }
 
-    private void initOptions() {
+    protected void initOptions() {
         this.options = RocksDBOptionsFactory.createDBOptions();
+        super.initOptions();
+    }
 
-        this.writeOptions = new WriteOptions();
-        this.writeOptions.setSync(false);
-        this.writeOptions.setDisableWAL(true);
-        this.writeOptions.setNoSlowdown(true);
-
+    @Override
+    protected void initTotalOrderReadOptions() {
         this.totalOrderReadOptions = new ReadOptions();
         this.totalOrderReadOptions.setPrefixSameAsStart(false);
         this.totalOrderReadOptions.setTotalOrderSeek(false);
-
-        this.compactRangeOptions = new CompactRangeOptions();
-        
this.compactRangeOptions.setBottommostLevelCompaction(CompactRangeOptions.BottommostLevelCompaction.kForce);
-        this.compactRangeOptions.setAllowWriteStall(true);
-        this.compactRangeOptions.setExclusiveManualCompaction(false);
-        this.compactRangeOptions.setChangeLevel(true);
-        this.compactRangeOptions.setTargetLevel(-1);
-        this.compactRangeOptions.setMaxSubcompactions(4);
     }
 
     @Override
@@ -72,7 +64,7 @@ public class ConsumeQueueRocksDBStorage extends 
AbstractRocksDBStorage {
 
             initOptions();
 
-            final List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList();
+            final List<ColumnFamilyDescriptor> cfDescriptors = new 
ArrayList<>();
 
             ColumnFamilyOptions cqCfOptions = 
RocksDBOptionsFactory.createCQCFOptions(this.messageStore);
             this.cfOptions.add(cqCfOptions);
@@ -80,11 +72,8 @@ public class ConsumeQueueRocksDBStorage extends 
AbstractRocksDBStorage {
 
             ColumnFamilyOptions offsetCfOptions = 
RocksDBOptionsFactory.createOffsetCFOptions();
             this.cfOptions.add(offsetCfOptions);
-            cfDescriptors.add(new 
ColumnFamilyDescriptor("offset".getBytes(DataConverter.CHARSET_UTF8), 
offsetCfOptions));
-
-            final List<ColumnFamilyHandle> cfHandles = new ArrayList();
-            open(cfDescriptors, cfHandles);
-
+            cfDescriptors.add(new ColumnFamilyDescriptor(OFFSET_COLUMN_FAMILY, 
offsetCfOptions));
+            open(cfDescriptors);
             this.defaultCFHandle = cfHandles.get(0);
             this.offsetCFHandle = cfHandles.get(1);
         } catch (final Exception e) {
@@ -130,4 +119,4 @@ public class ConsumeQueueRocksDBStorage extends 
AbstractRocksDBStorage {
     public ColumnFamilyHandle getOffsetCFHandle() {
         return this.offsetCFHandle;
     }
-}
\ No newline at end of file
+}
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
index a3a99d3346..c7d5041bd8 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
@@ -16,7 +16,7 @@
  */
 package org.apache.rocketmq.store.rocksdb;
 
-import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
+import org.apache.rocketmq.common.config.ConfigHelper;
 import org.apache.rocketmq.store.MessageStore;
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.BloomFilter;
@@ -71,7 +71,7 @@ public class RocksDBOptionsFactory {
                 setTableFormatConfig(blockBasedTableConfig).
                 setMemTableConfig(new SkipListMemTableConfig()).
                 setCompressionType(CompressionType.LZ4_COMPRESSION).
-                setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION).
+                setBottommostCompressionType(CompressionType.LZ4_COMPRESSION).
                 setNumLevels(7).
                 setCompactionStyle(CompactionStyle.UNIVERSAL).
                 setCompactionOptionsUniversal(compactionOption).
@@ -134,7 +134,7 @@ public class RocksDBOptionsFactory {
         Statistics statistics = new Statistics();
         statistics.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
         return options.
-                setDbLogDir(ConfigRocksDBStorage.getDBLogDir()).
+                setDbLogDir(ConfigHelper.getDBLogDir()).
                 setInfoLogLevel(InfoLogLevel.INFO_LEVEL).
                 setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery).
                 setManualWalFlush(true).
@@ -144,9 +144,9 @@ public class RocksDBOptionsFactory {
                 setCreateIfMissing(true).
                 setCreateMissingColumnFamilies(true).
                 setMaxOpenFiles(-1).
-                setMaxLogFileSize(1 * SizeUnit.GB).
+                setMaxLogFileSize(SizeUnit.GB).
                 setKeepLogFileNum(5).
-                setMaxManifestFileSize(1 * SizeUnit.GB).
+                setMaxManifestFileSize(SizeUnit.GB).
                 setAllowConcurrentMemtableWrite(false).
                 setStatistics(statistics).
                 setAtomicFlush(true).


Reply via email to