This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 49e23e1e0b fix: atomic flush incorrect use and clean up code (#8830)
49e23e1e0b is described below
commit 49e23e1e0b8bc37e4497de97202a343956de3968
Author: Zhanhui Li <[email protected]>
AuthorDate: Fri Oct 18 14:30:38 2024 +0800
fix: atomic flush incorrect use and clean up code (#8830)
Signed-off-by: Li Zhanhui <[email protected]>
---
.../common/config/AbstractRocksDBStorage.java | 199 +++++++++++++++------
.../rocketmq/common/config/ConfigHelper.java | 121 +++++++++++++
.../common/config/ConfigRocksDBStorage.java | 166 ++---------------
.../store/queue/RocksDBConsumeQueueStore.java | 8 +-
.../store/rocksdb/ConsumeQueueRocksDBStorage.java | 41 ++---
.../store/rocksdb/RocksDBOptionsFactory.java | 10 +-
6 files changed, 299 insertions(+), 246 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index 13522889bb..42ddbdc728 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -17,18 +17,10 @@
package org.apache.rocketmq.common.config;
import com.google.common.collect.Maps;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -37,7 +29,9 @@ import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactRangeOptions;
import org.rocksdb.CompactionOptions;
+import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
+import org.rocksdb.Env;
import org.rocksdb.FlushOptions;
import org.rocksdb.LiveFileMetaData;
import org.rocksdb.Priority;
@@ -49,14 +43,31 @@ import org.rocksdb.Status;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
-import static org.rocksdb.RocksDB.NOT_FOUND;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
public abstract class AbstractRocksDBStorage {
protected static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
+ /**
+ * Direct Jemalloc allocator
+ */
+ public static final PooledByteBufAllocator POOLED_ALLOCATOR = new
PooledByteBufAllocator(true);
+
+ public static final byte CTRL_0 = '\u0000';
+ public static final byte CTRL_1 = '\u0001';
+ public static final byte CTRL_2 = '\u0002';
+
private static final String SPACE = " | ";
- protected String dbPath;
+ protected final String dbPath;
protected boolean readOnly;
protected RocksDB db;
protected DBOptions options;
@@ -71,7 +82,8 @@ public abstract class AbstractRocksDBStorage {
protected CompactRangeOptions compactRangeOptions;
protected ColumnFamilyHandle defaultCFHandle;
- protected final List<ColumnFamilyOptions> cfOptions = new ArrayList();
+ protected final List<ColumnFamilyOptions> cfOptions = new ArrayList<>();
+ protected final List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
protected volatile boolean loaded;
private volatile boolean closed;
@@ -79,15 +91,76 @@ public abstract class AbstractRocksDBStorage {
private final Semaphore reloadPermit = new Semaphore(1);
private final ScheduledExecutorService reloadScheduler =
ThreadUtils.newScheduledThreadPool(1, new
ThreadFactoryImpl("RocksDBStorageReloadService_"));
private final ThreadPoolExecutor manualCompactionThread =
(ThreadPoolExecutor) ThreadUtils.newThreadPoolExecutor(
- 1, 1, 1000 * 60, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue(1),
- new ThreadFactoryImpl("RocksDBManualCompactionService_"),
- new ThreadPoolExecutor.DiscardOldestPolicy());
+ 1, 1, 1000 * 60, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(1),
+ new ThreadFactoryImpl("RocksDBManualCompactionService_"),
+ new ThreadPoolExecutor.DiscardOldestPolicy());
static {
RocksDB.loadLibrary();
}
+ public AbstractRocksDBStorage(String dbPath) {
+ this.dbPath = dbPath;
+ }
+
+ protected void initOptions() {
+ initWriteOptions();
+ initAbleWalWriteOptions();
+ initReadOptions();
+ initTotalOrderReadOptions();
+ initCompactRangeOptions();
+ initCompactionOptions();
+ }
+
+ /**
+ * Write options for <a
href="https://github.com/facebook/rocksdb/wiki/Atomic-flush">Atomic Flush</a>
+ */
+ protected void initWriteOptions() {
+ this.writeOptions = new WriteOptions();
+ this.writeOptions.setSync(false);
+ this.writeOptions.setDisableWAL(true);
+ this.writeOptions.setNoSlowdown(true);
+ }
+
+ protected void initAbleWalWriteOptions() {
+ this.ableWalWriteOptions = new WriteOptions();
+ this.ableWalWriteOptions.setSync(false);
+ this.ableWalWriteOptions.setDisableWAL(false);
+ this.ableWalWriteOptions.setNoSlowdown(true);
+ }
+
+ protected void initReadOptions() {
+ this.readOptions = new ReadOptions();
+ this.readOptions.setPrefixSameAsStart(true);
+ this.readOptions.setTotalOrderSeek(false);
+ this.readOptions.setTailing(false);
+ }
+
+ protected void initTotalOrderReadOptions() {
+ this.totalOrderReadOptions = new ReadOptions();
+ this.totalOrderReadOptions.setPrefixSameAsStart(false);
+ this.totalOrderReadOptions.setTotalOrderSeek(true);
+ this.totalOrderReadOptions.setTailing(false);
+ }
+
+ protected void initCompactRangeOptions() {
+ this.compactRangeOptions = new CompactRangeOptions();
+
this.compactRangeOptions.setBottommostLevelCompaction(CompactRangeOptions.BottommostLevelCompaction.kForce);
+ this.compactRangeOptions.setAllowWriteStall(true);
+ this.compactRangeOptions.setExclusiveManualCompaction(false);
+ this.compactRangeOptions.setChangeLevel(true);
+ this.compactRangeOptions.setTargetLevel(-1);
+ this.compactRangeOptions.setMaxSubcompactions(4);
+ }
+
+ protected void initCompactionOptions() {
+ this.compactionOptions = new CompactionOptions();
+ this.compactionOptions.setCompression(CompressionType.LZ4_COMPRESSION);
+ this.compactionOptions.setMaxSubcompactions(4);
+ this.compactionOptions.setOutputFileSizeLimit(4 * 1024 * 1024 * 1024L);
+ }
+
public boolean hold() {
if (!this.loaded || this.db == null || this.closed) {
LOGGER.error("hold rocksdb Failed. {}", this.dbPath);
@@ -101,8 +174,8 @@ public abstract class AbstractRocksDBStorage {
}
protected void put(ColumnFamilyHandle cfHandle, WriteOptions writeOptions,
- final byte[] keyBytes, final int keyLen,
- final byte[] valueBytes, final int valueLen) throws
RocksDBException {
+ final byte[] keyBytes, final int keyLen,
+ final byte[] valueBytes, final int valueLen) throws RocksDBException {
if (!hold()) {
throw new IllegalStateException("rocksDB:" + this + " is not
ready");
}
@@ -118,7 +191,7 @@ public abstract class AbstractRocksDBStorage {
}
protected void put(ColumnFamilyHandle cfHandle, WriteOptions writeOptions,
- final ByteBuffer keyBB, final ByteBuffer valueBB)
throws RocksDBException {
+ final ByteBuffer keyBB, final ByteBuffer valueBB) throws
RocksDBException {
if (!hold()) {
throw new IllegalStateException("rocksDB:" + this + " is not
ready");
}
@@ -159,13 +232,13 @@ public abstract class AbstractRocksDBStorage {
}
}
- protected boolean get(ColumnFamilyHandle cfHandle, ReadOptions readOptions,
- final ByteBuffer keyBB, final ByteBuffer valueBB)
throws RocksDBException {
+ protected int get(ColumnFamilyHandle cfHandle, ReadOptions readOptions,
final ByteBuffer keyBB,
+ final ByteBuffer valueBB) throws RocksDBException {
if (!hold()) {
throw new IllegalStateException("rocksDB:" + this + " is not
ready");
}
try {
- return this.db.get(cfHandle, readOptions, keyBB, valueBB) !=
NOT_FOUND;
+ return this.db.get(cfHandle, readOptions, keyBB, valueBB);
} catch (RocksDBException e) {
LOGGER.error("get Failed. {}, {}", this.dbPath, getStatusError(e));
throw e;
@@ -175,8 +248,8 @@ public abstract class AbstractRocksDBStorage {
}
protected List<byte[]> multiGet(final ReadOptions readOptions,
- final List<ColumnFamilyHandle>
columnFamilyHandleList,
- final List<byte[]> keys) throws
RocksDBException {
+ final List<ColumnFamilyHandle> columnFamilyHandleList,
+ final List<byte[]> keys) throws RocksDBException {
if (!hold()) {
throw new IllegalStateException("rocksDB:" + this + " is not
ready");
}
@@ -190,7 +263,8 @@ public abstract class AbstractRocksDBStorage {
}
}
- protected void delete(ColumnFamilyHandle cfHandle, WriteOptions
writeOptions, byte[] keyBytes) throws RocksDBException {
+ protected void delete(ColumnFamilyHandle cfHandle, WriteOptions
writeOptions,
+ byte[] keyBytes) throws RocksDBException {
if (!hold()) {
throw new IllegalStateException("rocksDB:" + this + " is not
ready");
}
@@ -204,7 +278,8 @@ public abstract class AbstractRocksDBStorage {
}
}
- protected void delete(ColumnFamilyHandle cfHandle, WriteOptions
writeOptions, ByteBuffer keyBB) throws RocksDBException {
+ protected void delete(ColumnFamilyHandle cfHandle, WriteOptions
writeOptions, ByteBuffer keyBB)
+ throws RocksDBException {
if (!hold()) {
throw new IllegalStateException("rocksDB:" + this + " is not
ready");
}
@@ -218,8 +293,8 @@ public abstract class AbstractRocksDBStorage {
}
}
- protected void rangeDelete(ColumnFamilyHandle cfHandle, WriteOptions
writeOptions,
- final byte[] startKey, final byte[] endKey)
throws RocksDBException {
+ protected void rangeDelete(ColumnFamilyHandle cfHandle, WriteOptions
writeOptions, final byte[] startKey,
+ final byte[] endKey) throws RocksDBException {
if (!hold()) {
throw new IllegalStateException("rocksDB:" + this + " is not
ready");
}
@@ -262,16 +337,17 @@ public abstract class AbstractRocksDBStorage {
});
}
- protected void open(final List<ColumnFamilyDescriptor> cfDescriptors,
- final List<ColumnFamilyHandle> cfHandles) throws
RocksDBException {
+ protected void open(final List<ColumnFamilyDescriptor> cfDescriptors)
throws RocksDBException {
+ this.cfHandles.clear();
if (this.readOnly) {
this.db = RocksDB.openReadOnly(this.options, this.dbPath,
cfDescriptors, cfHandles);
} else {
this.db = RocksDB.open(this.options, this.dbPath, cfDescriptors,
cfHandles);
}
- this.db.getEnv().setBackgroundThreads(8, Priority.HIGH);
- this.db.getEnv().setBackgroundThreads(8, Priority.LOW);
-
+ assert cfDescriptors.size() == cfHandles.size();
+ try (Env env = this.db.getEnv()) {
+ env.setBackgroundThreads(8, Priority.LOW);
+ }
if (this.db == null) {
throw new RocksDBException("open rocksdb null");
}
@@ -293,6 +369,9 @@ public abstract class AbstractRocksDBStorage {
}
}
+ /**
+ * Close column family handles except the default column family
+ */
protected abstract void preShutdown();
public synchronized boolean shutdown() {
@@ -310,11 +389,12 @@ public abstract class AbstractRocksDBStorage {
}
this.db.cancelAllBackgroundWork(true);
this.db.pauseBackgroundWork();
- //The close order is matter.
+ //The close order matters.
//1. close column family handles
preShutdown();
this.defaultCFHandle.close();
+
//2. close column family options.
for (final ColumnFamilyOptions opt : this.cfOptions) {
opt.close();
@@ -332,9 +412,6 @@ public abstract class AbstractRocksDBStorage {
if (this.totalOrderReadOptions != null) {
this.totalOrderReadOptions.close();
}
- if (this.options != null) {
- this.options.close();
- }
//4. close db.
if (db != null && !this.readOnly) {
this.db.syncWal();
@@ -342,6 +419,10 @@ public abstract class AbstractRocksDBStorage {
if (db != null) {
this.db.closeE();
}
+ // Close DBOptions after RocksDB instance is closed.
+ if (this.options != null) {
+ this.options.close();
+ }
//5. help gc.
this.cfOptions.clear();
this.db = null;
@@ -360,21 +441,33 @@ public abstract class AbstractRocksDBStorage {
return true;
}
- public void flush(final FlushOptions flushOptions) {
+ public void flush(final FlushOptions flushOptions) throws RocksDBException
{
+ flush(flushOptions, this.cfHandles);
+ }
+
+ public void flush(final FlushOptions flushOptions,
List<ColumnFamilyHandle> columnFamilyHandles) throws RocksDBException {
if (!this.loaded || this.readOnly || closed) {
return;
}
try {
if (db != null) {
- this.db.flush(flushOptions);
+ // For atomic-flush, we have to explicitly specify column
family handles
+ // See https://github.com/rust-rocksdb/rust-rocksdb/pull/793
+ // and
https://github.com/facebook/rocksdb/blob/8ad4c7efc48d301f5e85467105d7019a49984dc8/include/rocksdb/db.h#L1667
+ this.db.flush(flushOptions, columnFamilyHandles);
}
} catch (RocksDBException e) {
scheduleReloadRocksdb(e);
LOGGER.error("flush Failed. {}, {}", this.dbPath,
getStatusError(e));
+ throw e;
}
}
+ public void flushWAL() throws RocksDBException {
+ this.db.flushWal(true);
+ }
+
public Statistics getStatistics() {
return this.options.statistics();
}
@@ -441,10 +534,6 @@ public abstract class AbstractRocksDBStorage {
LOGGER.info("reload rocksdb OK. {}", this.dbPath);
}
- public void flushWAL() throws RocksDBException {
- this.db.flushWal(true);
- }
-
private String getStatusError(RocksDBException e) {
if (e == null || e.getStatus() == null) {
return "null";
@@ -477,13 +566,13 @@ public abstract class AbstractRocksDBStorage {
Map<Integer, StringBuilder> map = Maps.newHashMap();
for (LiveFileMetaData metaData : liveFileMetaDataList) {
StringBuilder sb = map.computeIfAbsent(metaData.level(), k ->
new StringBuilder(256));
- sb.append(new String(metaData.columnFamilyName(),
DataConverter.CHARSET_UTF8)).append(SPACE).
- append(metaData.fileName()).append(SPACE).
- append("s: ").append(metaData.size()).append(SPACE).
- append("a:
").append(metaData.numEntries()).append(SPACE).
- append("r:
").append(metaData.numReadsSampled()).append(SPACE).
- append("d:
").append(metaData.numDeletions()).append(SPACE).
- append(metaData.beingCompacted()).append("\n");
+ sb.append(new String(metaData.columnFamilyName(),
StandardCharsets.UTF_8)).append(SPACE).
+ append(metaData.fileName()).append(SPACE).
+ append("s: ").append(metaData.size()).append(SPACE).
+ append("a: ").append(metaData.numEntries()).append(SPACE).
+ append("r:
").append(metaData.numReadsSampled()).append(SPACE).
+ append("d:
").append(metaData.numDeletions()).append(SPACE).
+ append(metaData.beingCompacted()).append("\n");
}
map.forEach((key, value) -> logger.info("level: {}\n{}", key,
value.toString()));
@@ -492,11 +581,9 @@ public abstract class AbstractRocksDBStorage {
String indexesAndFilterBlockMemUsage =
this.db.getProperty("rocksdb.estimate-table-readers-mem");
String memTableMemUsage =
this.db.getProperty("rocksdb.cur-size-all-mem-tables");
String blocksPinnedByIteratorMemUsage =
this.db.getProperty("rocksdb.block-cache-pinned-usage");
- logger.info("MemUsage. blockCache: {}, indexesAndFilterBlock: {},
memtable: {}, blocksPinnedByIterator: {}",
- blockCacheMemUsage, indexesAndFilterBlockMemUsage,
memTableMemUsage, blocksPinnedByIteratorMemUsage);
- } catch (Exception e) {
- logger.error("statRocksdb Failed. {}", this.dbPath, e);
- throw new RuntimeException(e);
+ logger.info("MemUsage. blockCache: {}, indexesAndFilterBlock: {},
MemTable: {}, blocksPinnedByIterator: {}",
+ blockCacheMemUsage, indexesAndFilterBlockMemUsage,
memTableMemUsage, blocksPinnedByIteratorMemUsage);
+ } catch (Exception ignored) {
}
}
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java
new file mode 100644
index 0000000000..95d5119cfc
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.config;
+
+import java.io.File;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.UtilAll;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.DBOptions;
+import org.rocksdb.DataBlockIndexType;
+import org.rocksdb.IndexType;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.LRUCache;
+import org.rocksdb.RateLimiter;
+import org.rocksdb.SkipListMemTableConfig;
+import org.rocksdb.Statistics;
+import org.rocksdb.StatsLevel;
+import org.rocksdb.StringAppendOperator;
+import org.rocksdb.WALRecoveryMode;
+import org.rocksdb.util.SizeUnit;
+
+public class ConfigHelper {
+ public static ColumnFamilyOptions createConfigOptions() {
+ BlockBasedTableConfig blockBasedTableConfig = new
BlockBasedTableConfig().
+ setFormatVersion(5).
+ setIndexType(IndexType.kBinarySearch).
+ setDataBlockIndexType(DataBlockIndexType.kDataBlockBinarySearch).
+ setBlockSize(32 * SizeUnit.KB).
+ setFilterPolicy(new BloomFilter(16, false)).
+ // Indicating if we'd put index/filter blocks to the block cache.
+ setCacheIndexAndFilterBlocks(false).
+ setCacheIndexAndFilterBlocksWithHighPriority(true).
+ setPinL0FilterAndIndexBlocksInCache(false).
+ setPinTopLevelIndexAndFilter(true).
+ setBlockCache(new LRUCache(4 * SizeUnit.MB, 8, false)).
+ setWholeKeyFiltering(true);
+
+ ColumnFamilyOptions options = new ColumnFamilyOptions();
+ return options.setMaxWriteBufferNumber(2).
+ // MemTable size, MemTable(cache) -> immutable MemTable(cache) ->
SST(disk)
+ setWriteBufferSize(8 * SizeUnit.MB).
+ setMinWriteBufferNumberToMerge(1).
+ setTableFormatConfig(blockBasedTableConfig).
+ setMemTableConfig(new SkipListMemTableConfig()).
+ setCompressionType(CompressionType.NO_COMPRESSION).
+ setNumLevels(7).
+ setCompactionStyle(CompactionStyle.LEVEL).
+ setLevel0FileNumCompactionTrigger(4).
+ setLevel0SlowdownWritesTrigger(8).
+ setLevel0StopWritesTrigger(12).
+ // The target file size for compaction.
+ setTargetFileSizeBase(64 * SizeUnit.MB).
+ setTargetFileSizeMultiplier(2).
+ // The upper-bound of the total size of L1 files in bytes
+ setMaxBytesForLevelBase(256 * SizeUnit.MB).
+ setMaxBytesForLevelMultiplier(2).
+ setMergeOperator(new StringAppendOperator()).
+ setInplaceUpdateSupport(true);
+ }
+
+ public static DBOptions createConfigDBOptions() {
+ //Turn based on
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
+ // and
http://gitlab.alibaba-inc.com/aloha/aloha/blob/branch_2_5_0/jstorm-core/src/main/java/com/alibaba/jstorm/cache/rocksdb/RocksDbOptionsFactory.java
+ DBOptions options = new DBOptions();
+ Statistics statistics = new Statistics();
+ statistics.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
+ return options.
+ setDbLogDir(getDBLogDir()).
+ setInfoLogLevel(InfoLogLevel.INFO_LEVEL).
+ setWalRecoveryMode(WALRecoveryMode.SkipAnyCorruptedRecords).
+ setManualWalFlush(true).
+ setMaxTotalWalSize(500 * SizeUnit.MB).
+ setWalSizeLimitMB(0).
+ setWalTtlSeconds(0).
+ setCreateIfMissing(true).
+ setCreateMissingColumnFamilies(true).
+ setMaxOpenFiles(-1).
+ setMaxLogFileSize(SizeUnit.GB).
+ setKeepLogFileNum(5).
+ setMaxManifestFileSize(SizeUnit.GB).
+ setAllowConcurrentMemtableWrite(false).
+ setStatistics(statistics).
+ setStatsDumpPeriodSec(600).
+ setAtomicFlush(true).
+ setMaxBackgroundJobs(32).
+ setMaxSubcompactions(4).
+ setParanoidChecks(true).
+ setDelayedWriteRate(16 * SizeUnit.MB).
+ setRateLimiter(new RateLimiter(100 * SizeUnit.MB)).
+ setUseDirectIoForFlushAndCompaction(true).
+ setUseDirectReads(true);
+ }
+
+ public static String getDBLogDir() {
+ String rootPath = System.getProperty("user.home");
+ if (StringUtils.isEmpty(rootPath)) {
+ return "";
+ }
+ rootPath = rootPath + File.separator + "logs";
+ UtilAll.ensureDirOK(rootPath);
+ return rootPath + File.separator + "rocketmqlogs" + File.separator;
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
index f657d9cf2d..36da6834ff 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
@@ -16,101 +16,43 @@
*/
package org.apache.rocketmq.common.config;
-import java.io.File;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.UtilAll;
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.CompactRangeOptions;
-import org.rocksdb.CompactRangeOptions.BottommostLevelCompaction;
-import org.rocksdb.CompactionOptions;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.CompressionType;
-import org.rocksdb.DBOptions;
-import org.rocksdb.DataBlockIndexType;
-import org.rocksdb.IndexType;
-import org.rocksdb.InfoLogLevel;
-import org.rocksdb.LRUCache;
-import org.rocksdb.RateLimiter;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
-import org.rocksdb.SkipListMemTableConfig;
-import org.rocksdb.Statistics;
-import org.rocksdb.StatsLevel;
-import org.rocksdb.StringAppendOperator;
-import org.rocksdb.WALRecoveryMode;
import org.rocksdb.WriteBatch;
-import org.rocksdb.WriteOptions;
-import org.rocksdb.util.SizeUnit;
public class ConfigRocksDBStorage extends AbstractRocksDBStorage {
+ public static final byte[] KV_DATA_VERSION_COLUMN_FAMILY_NAME =
"kvDataVersion".getBytes(StandardCharsets.UTF_8);
+ public static final byte[] FORBIDDEN_COLUMN_FAMILY_NAME =
"forbidden".getBytes(StandardCharsets.UTF_8);
- private static final byte[] KV_DATA_VERSION_COLUMN_FAMILY_NAME =
"kvDataVersion".getBytes(StandardCharsets.UTF_8);
- private static final byte[] KV_DATA_VERSION_KEY =
"kvDataVersionKey".getBytes(StandardCharsets.UTF_8);
protected ColumnFamilyHandle kvDataVersionFamilyHandle;
-
- private static final byte[] FORBIDDEN_COLUMN_FAMILY_NAME =
"forbidden".getBytes(StandardCharsets.UTF_8);
protected ColumnFamilyHandle forbiddenFamilyHandle;
-
+ public static final byte[] KV_DATA_VERSION_KEY =
"kvDataVersionKey".getBytes(StandardCharsets.UTF_8);
public ConfigRocksDBStorage(final String dbPath) {
- super();
- this.dbPath = dbPath;
+ super(dbPath);
this.readOnly = false;
}
public ConfigRocksDBStorage(final String dbPath, boolean readOnly) {
- super();
- this.dbPath = dbPath;
+ super(dbPath);
this.readOnly = readOnly;
}
- private void initOptions() {
- this.options = createConfigDBOptions();
-
- this.writeOptions = new WriteOptions();
- this.writeOptions.setSync(false);
- this.writeOptions.setDisableWAL(true);
- this.writeOptions.setNoSlowdown(true);
-
- this.ableWalWriteOptions = new WriteOptions();
- this.ableWalWriteOptions.setSync(false);
- this.ableWalWriteOptions.setDisableWAL(false);
- this.ableWalWriteOptions.setNoSlowdown(true);
-
- this.readOptions = new ReadOptions();
- this.readOptions.setPrefixSameAsStart(true);
- this.readOptions.setTotalOrderSeek(false);
- this.readOptions.setTailing(false);
-
- this.totalOrderReadOptions = new ReadOptions();
- this.totalOrderReadOptions.setPrefixSameAsStart(false);
- this.totalOrderReadOptions.setTotalOrderSeek(false);
- this.totalOrderReadOptions.setTailing(false);
-
- this.compactRangeOptions = new CompactRangeOptions();
-
this.compactRangeOptions.setBottommostLevelCompaction(BottommostLevelCompaction.kForce);
- this.compactRangeOptions.setAllowWriteStall(true);
- this.compactRangeOptions.setExclusiveManualCompaction(false);
- this.compactRangeOptions.setChangeLevel(true);
- this.compactRangeOptions.setTargetLevel(-1);
- this.compactRangeOptions.setMaxSubcompactions(4);
-
- this.compactionOptions = new CompactionOptions();
- this.compactionOptions.setCompression(CompressionType.LZ4_COMPRESSION);
- this.compactionOptions.setMaxSubcompactions(4);
- this.compactionOptions.setOutputFileSizeLimit(4 * 1024 * 1024 * 1024L);
+ protected void initOptions() {
+ this.options = ConfigHelper.createConfigDBOptions();
+ super.initOptions();
}
@Override
@@ -120,15 +62,14 @@ public class ConfigRocksDBStorage extends
AbstractRocksDBStorage {
initOptions();
- final List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList();
+ final List<ColumnFamilyDescriptor> cfDescriptors = new
ArrayList<>();
- ColumnFamilyOptions defaultOptions = createConfigOptions();
+ ColumnFamilyOptions defaultOptions =
ConfigHelper.createConfigOptions();
this.cfOptions.add(defaultOptions);
cfDescriptors.add(new
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions));
cfDescriptors.add(new
ColumnFamilyDescriptor(KV_DATA_VERSION_COLUMN_FAMILY_NAME, defaultOptions));
cfDescriptors.add(new
ColumnFamilyDescriptor(FORBIDDEN_COLUMN_FAMILY_NAME, defaultOptions));
- final List<ColumnFamilyHandle> cfHandles = new ArrayList();
- open(cfDescriptors, cfHandles);
+ open(cfDescriptors);
this.defaultCFHandle = cfHandles.get(0);
this.kvDataVersionFamilyHandle = cfHandles.get(1);
@@ -147,87 +88,6 @@ public class ConfigRocksDBStorage extends
AbstractRocksDBStorage {
this.forbiddenFamilyHandle.close();
}
- private ColumnFamilyOptions createConfigOptions() {
- BlockBasedTableConfig blockBasedTableConfig = new
BlockBasedTableConfig().
- setFormatVersion(5).
- setIndexType(IndexType.kBinarySearch).
- setDataBlockIndexType(DataBlockIndexType.kDataBlockBinarySearch).
- setBlockSize(32 * SizeUnit.KB).
- setFilterPolicy(new BloomFilter(16, false)).
- // Indicating if we'd put index/filter blocks to the block cache.
- setCacheIndexAndFilterBlocks(false).
- setCacheIndexAndFilterBlocksWithHighPriority(true).
- setPinL0FilterAndIndexBlocksInCache(false).
- setPinTopLevelIndexAndFilter(true).
- setBlockCache(new LRUCache(4 * SizeUnit.MB, 8, false)).
- setWholeKeyFiltering(true);
-
- ColumnFamilyOptions options = new ColumnFamilyOptions();
- return options.setMaxWriteBufferNumber(2).
- // MemTable size, memtable(cache) -> immutable memtable(cache) ->
sst(disk)
- setWriteBufferSize(8 * SizeUnit.MB).
- setMinWriteBufferNumberToMerge(1).
- setTableFormatConfig(blockBasedTableConfig).
- setMemTableConfig(new SkipListMemTableConfig()).
- setCompressionType(CompressionType.NO_COMPRESSION).
- setNumLevels(7).
- setCompactionStyle(CompactionStyle.LEVEL).
- setLevel0FileNumCompactionTrigger(4).
- setLevel0SlowdownWritesTrigger(8).
- setLevel0StopWritesTrigger(12).
- // The target file size for compaction.
- setTargetFileSizeBase(64 * SizeUnit.MB).
- setTargetFileSizeMultiplier(2).
- // The upper-bound of the total size of L1 files in bytes
- setMaxBytesForLevelBase(256 * SizeUnit.MB).
- setMaxBytesForLevelMultiplier(2).
- setMergeOperator(new StringAppendOperator()).
- setInplaceUpdateSupport(true);
- }
-
- private DBOptions createConfigDBOptions() {
- //Turn based on
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
- // and
http://gitlab.alibaba-inc.com/aloha/aloha/blob/branch_2_5_0/jstorm-core/src/main/java/com/alibaba/jstorm/cache/rocksdb/RocksDbOptionsFactory.java
- DBOptions options = new DBOptions();
- Statistics statistics = new Statistics();
- statistics.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
- return options.
- setDbLogDir(getDBLogDir()).
- setInfoLogLevel(InfoLogLevel.INFO_LEVEL).
- setWalRecoveryMode(WALRecoveryMode.SkipAnyCorruptedRecords).
- setManualWalFlush(true).
- setMaxTotalWalSize(500 * SizeUnit.MB).
- setWalSizeLimitMB(0).
- setWalTtlSeconds(0).
- setCreateIfMissing(true).
- setCreateMissingColumnFamilies(true).
- setMaxOpenFiles(-1).
- setMaxLogFileSize(1 * SizeUnit.GB).
- setKeepLogFileNum(5).
- setMaxManifestFileSize(1 * SizeUnit.GB).
- setAllowConcurrentMemtableWrite(false).
- setStatistics(statistics).
- setStatsDumpPeriodSec(600).
- setAtomicFlush(true).
- setMaxBackgroundJobs(32).
- setMaxSubcompactions(4).
- setParanoidChecks(true).
- setDelayedWriteRate(16 * SizeUnit.MB).
- setRateLimiter(new RateLimiter(100 * SizeUnit.MB)).
- setUseDirectIoForFlushAndCompaction(true).
- setUseDirectReads(true);
- }
-
- public static String getDBLogDir() {
- String rootPath = System.getProperty("user.home");
- if (StringUtils.isEmpty(rootPath)) {
- return "";
- }
- rootPath = rootPath + File.separator + "logs";
- UtilAll.ensureDirOK(rootPath);
- return rootPath + File.separator + "rocketmqlogs" + File.separator;
- }
-
public void put(final byte[] keyBytes, final int keyLen, final byte[]
valueBytes) throws Exception {
put(this.defaultCFHandle, this.ableWalWriteOptions, keyBytes, keyLen,
valueBytes, valueBytes.length);
}
@@ -281,10 +141,6 @@ public class ConfigRocksDBStorage extends
AbstractRocksDBStorage {
return this.db.newIterator(this.forbiddenFamilyHandle,
this.totalOrderReadOptions);
}
- public void rangeDelete(final byte[] startKey, final byte[] endKey) throws
RocksDBException {
- rangeDelete(this.defaultCFHandle, this.writeOptions, startKey, endKey);
- }
-
public RocksIterator iterator(ReadOptions readOptions) {
return this.db.newIterator(this.defaultCFHandle, readOptions);
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index c889ae7ca8..17b845d817 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -81,15 +81,15 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
super(messageStore);
this.storePath =
StorePathConfigHelper.getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir());
- this.rocksDBStorage = new ConsumeQueueRocksDBStorage(messageStore,
storePath, 4);
+ this.rocksDBStorage = new ConsumeQueueRocksDBStorage(messageStore,
storePath);
this.rocksDBConsumeQueueTable = new
RocksDBConsumeQueueTable(rocksDBStorage, messageStore);
this.rocksDBConsumeQueueOffsetTable = new
RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage,
messageStore);
this.writeBatch = new WriteBatch();
this.batchSize = messageStoreConfig.getBatchWriteKvCqSize();
- this.bufferDRList = new ArrayList(batchSize);
- this.cqBBPairList = new ArrayList(batchSize);
- this.offsetBBPairList = new ArrayList(batchSize);
+ this.bufferDRList = new ArrayList<>(batchSize);
+ this.cqBBPairList = new ArrayList<>(batchSize);
+ this.offsetBBPairList = new ArrayList<>(batchSize);
for (int i = 0; i < batchSize; i++) {
this.cqBBPairList.add(RocksDBConsumeQueueTable.getCQByteBufferPair());
this.offsetBBPairList.add(RocksDBConsumeQueueOffsetTable.getOffsetByteBufferPair());
diff --git
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
index 362684560c..b343a5b4b5 100644
---
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
+++
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/ConsumeQueueRocksDBStorage.java
@@ -16,53 +16,45 @@
*/
package org.apache.rocketmq.store.rocksdb;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
-import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.store.MessageStore;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.CompactRangeOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
-import org.rocksdb.WriteOptions;
public class ConsumeQueueRocksDBStorage extends AbstractRocksDBStorage {
+
+ public static final byte[] OFFSET_COLUMN_FAMILY =
"offset".getBytes(StandardCharsets.UTF_8);
+
private final MessageStore messageStore;
private volatile ColumnFamilyHandle offsetCFHandle;
- public ConsumeQueueRocksDBStorage(final MessageStore messageStore, final
String dbPath, final int prefixLen) {
+ public ConsumeQueueRocksDBStorage(final MessageStore messageStore, final
String dbPath) {
+ super(dbPath);
this.messageStore = messageStore;
- this.dbPath = dbPath;
this.readOnly = false;
}
- private void initOptions() {
+ protected void initOptions() {
this.options = RocksDBOptionsFactory.createDBOptions();
+ super.initOptions();
+ }
- this.writeOptions = new WriteOptions();
- this.writeOptions.setSync(false);
- this.writeOptions.setDisableWAL(true);
- this.writeOptions.setNoSlowdown(true);
-
+ @Override
+ protected void initTotalOrderReadOptions() {
this.totalOrderReadOptions = new ReadOptions();
this.totalOrderReadOptions.setPrefixSameAsStart(false);
this.totalOrderReadOptions.setTotalOrderSeek(false);
-
- this.compactRangeOptions = new CompactRangeOptions();
-
this.compactRangeOptions.setBottommostLevelCompaction(CompactRangeOptions.BottommostLevelCompaction.kForce);
- this.compactRangeOptions.setAllowWriteStall(true);
- this.compactRangeOptions.setExclusiveManualCompaction(false);
- this.compactRangeOptions.setChangeLevel(true);
- this.compactRangeOptions.setTargetLevel(-1);
- this.compactRangeOptions.setMaxSubcompactions(4);
}
@Override
@@ -72,7 +64,7 @@ public class ConsumeQueueRocksDBStorage extends
AbstractRocksDBStorage {
initOptions();
- final List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList();
+ final List<ColumnFamilyDescriptor> cfDescriptors = new
ArrayList<>();
ColumnFamilyOptions cqCfOptions =
RocksDBOptionsFactory.createCQCFOptions(this.messageStore);
this.cfOptions.add(cqCfOptions);
@@ -80,11 +72,8 @@ public class ConsumeQueueRocksDBStorage extends
AbstractRocksDBStorage {
ColumnFamilyOptions offsetCfOptions =
RocksDBOptionsFactory.createOffsetCFOptions();
this.cfOptions.add(offsetCfOptions);
- cfDescriptors.add(new
ColumnFamilyDescriptor("offset".getBytes(DataConverter.CHARSET_UTF8),
offsetCfOptions));
-
- final List<ColumnFamilyHandle> cfHandles = new ArrayList();
- open(cfDescriptors, cfHandles);
-
+ cfDescriptors.add(new ColumnFamilyDescriptor(OFFSET_COLUMN_FAMILY,
offsetCfOptions));
+ open(cfDescriptors);
this.defaultCFHandle = cfHandles.get(0);
this.offsetCFHandle = cfHandles.get(1);
} catch (final Exception e) {
@@ -130,4 +119,4 @@ public class ConsumeQueueRocksDBStorage extends
AbstractRocksDBStorage {
public ColumnFamilyHandle getOffsetCFHandle() {
return this.offsetCFHandle;
}
-}
\ No newline at end of file
+}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
index a3a99d3346..c7d5041bd8 100644
---
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
+++
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
@@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.store.rocksdb;
-import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
+import org.apache.rocketmq.common.config.ConfigHelper;
import org.apache.rocketmq.store.MessageStore;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
@@ -71,7 +71,7 @@ public class RocksDBOptionsFactory {
setTableFormatConfig(blockBasedTableConfig).
setMemTableConfig(new SkipListMemTableConfig()).
setCompressionType(CompressionType.LZ4_COMPRESSION).
- setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION).
+ setBottommostCompressionType(CompressionType.LZ4_COMPRESSION).
setNumLevels(7).
setCompactionStyle(CompactionStyle.UNIVERSAL).
setCompactionOptionsUniversal(compactionOption).
@@ -134,7 +134,7 @@ public class RocksDBOptionsFactory {
Statistics statistics = new Statistics();
statistics.setStatsLevel(StatsLevel.EXCEPT_DETAILED_TIMERS);
return options.
- setDbLogDir(ConfigRocksDBStorage.getDBLogDir()).
+ setDbLogDir(ConfigHelper.getDBLogDir()).
setInfoLogLevel(InfoLogLevel.INFO_LEVEL).
setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery).
setManualWalFlush(true).
@@ -144,9 +144,9 @@ public class RocksDBOptionsFactory {
setCreateIfMissing(true).
setCreateMissingColumnFamilies(true).
setMaxOpenFiles(-1).
- setMaxLogFileSize(1 * SizeUnit.GB).
+ setMaxLogFileSize(SizeUnit.GB).
setKeepLogFileNum(5).
- setMaxManifestFileSize(1 * SizeUnit.GB).
+ setMaxManifestFileSize(SizeUnit.GB).
setAllowConcurrentMemtableWrite(false).
setStatistics(statistics).
setAtomicFlush(true).