This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9a891f1d49 fix: multiple patches during long running tests for LMQ
over RocksDB (#8915)
9a891f1d49 is described below
commit 9a891f1d49af0cfe4e384bc28a0bf0eeed02587f
Author: Zhanhui Li <[email protected]>
AuthorDate: Mon Dec 2 10:07:54 2024 +0800
fix: multiple patches during long running tests for LMQ over RocksDB (#8915)
* fix: multiple patches during long running tests for LMQ over RocksDB
Signed-off-by: Li Zhanhui <[email protected]>
* fix: fix a bug in RocksGroupCommitService; remove
RocksDBConsumeQueueStore#findConsumeQueueMap override
Signed-off-by: Li Zhanhui <[email protected]>
* fix: async fsync on RocksDB WAL flush
Signed-off-by: Li Zhanhui <[email protected]>
* fix: use a dedicated thread to flush and sync RocksDB WAL
Signed-off-by: Li Zhanhui <[email protected]>
* fix: trigger WAL rolling according to estimated WAL file size
Signed-off-by: Li Zhanhui <[email protected]>
* chore: add doc, explaining config RocksDB instance flush/sync strategy
Signed-off-by: Li Zhanhui <[email protected]>
* fix: data-version should be per table
Signed-off-by: Li Zhanhui <[email protected]>
* fix: test case: RocksdbTransferOffsetAndCqTest
Signed-off-by: Li Zhanhui <[email protected]>
---------
Signed-off-by: Li Zhanhui <[email protected]>
---
.../apache/rocketmq/broker/BrokerController.java | 2 +-
.../rocketmq/broker/config/v2/ConfigHelper.java | 4 +-
.../rocketmq/broker/config/v2/ConfigStorage.java | 156 ++++++++++++++++++++-
.../broker/config/v2/ConsumerOffsetManagerV2.java | 8 +-
.../config/v2/SubscriptionGroupManagerV2.java | 6 +-
.../broker/config/v2/TopicConfigManagerV2.java | 6 +-
.../config/v2/ConsumerOffsetManagerV2Test.java | 19 ++-
.../config/v2/SubscriptionGroupManagerV2Test.java | 15 +-
.../broker/config/v2/TopicConfigManagerV2Test.java | 25 +++-
.../offset/RocksdbTransferOffsetAndCqTest.java | 16 ++-
.../common/config/AbstractRocksDBStorage.java | 34 ++---
.../rocketmq/common/config/ConfigHelper.java | 32 +++--
.../common/config/ConfigRocksDBStorage.java | 2 +-
.../java/org/apache/rocketmq/store/CommitLog.java | 16 ++-
.../rocketmq/store/config/MessageStoreConfig.java | 24 ++++
.../store/queue/RocksDBConsumeQueueStore.java | 95 +++++++++----
.../store/queue/RocksGroupCommitService.java | 103 ++++++++++++++
.../store/rocksdb/RocksDBOptionsFactory.java | 7 +-
18 files changed, 474 insertions(+), 96 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 99e5b85d2e..e1edd2f512 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -344,7 +344,7 @@ public class BrokerController {
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new
LmqBrokerStatsManager(this.brokerConfig) : new
BrokerStatsManager(this.brokerConfig.getBrokerClusterName(),
this.brokerConfig.isEnableDetailStat());
this.broadcastOffsetManager = new BroadcastOffsetManager(this);
if
(ConfigManagerVersion.V2.getVersion().equals(brokerConfig.getConfigManagerVersion()))
{
- this.configStorage = new
ConfigStorage(messageStoreConfig.getStorePathRootDir());
+ this.configStorage = new ConfigStorage(messageStoreConfig);
this.topicConfigManager = new TopicConfigManagerV2(this,
configStorage);
this.subscriptionGroupManager = new
SubscriptionGroupManagerV2(this, configStorage);
this.consumerOffsetManager = new ConsumerOffsetManagerV2(this,
configStorage);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java
index 8183a1f835..29a7c313ba 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java
@@ -64,7 +64,7 @@ public class ConfigHelper {
return Optional.empty();
}
- public static void stampDataVersion(WriteBatch writeBatch, DataVersion
dataVersion, long stateMachineVersion)
+ public static void stampDataVersion(WriteBatch writeBatch, TableId table,
DataVersion dataVersion, long stateMachineVersion)
throws RocksDBException {
// Increase data version
dataVersion.nextVersion(stateMachineVersion);
@@ -75,7 +75,7 @@ public class ConfigHelper {
ByteBuf valueBuf =
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(Long.BYTES * 3);
try {
keyBuf.writeByte(TablePrefix.TABLE.getValue());
- keyBuf.writeShort(TableId.CONSUMER_OFFSET.getValue());
+ keyBuf.writeShort(table.getValue());
keyBuf.writeByte(RecordPrefix.DATA_VERSION.getValue());
keyBuf.writeBytes(ConfigStorage.DATA_VERSION_KEY_BYTES);
valueBuf.writeLong(dataVersion.getStateVersion());
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
index 6bc62957a8..c4056d142f 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
@@ -16,17 +16,29 @@
*/
package org.apache.rocketmq.broker.config.v2;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.PooledByteBufAllocatorMetric;
import io.netty.util.internal.PlatformDependent;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
import org.apache.rocketmq.common.config.ConfigHelper;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.FlushOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
@@ -43,8 +55,48 @@ public class ConfigStorage extends AbstractRocksDBStorage {
public static final String DATA_VERSION_KEY = "data_version";
public static final byte[] DATA_VERSION_KEY_BYTES =
DATA_VERSION_KEY.getBytes(StandardCharsets.UTF_8);
- public ConfigStorage(String storePath) {
- super(storePath + File.separator + "config" + File.separator + "rdb");
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ /**
+ * Number of write ops since previous flush.
+ */
+ private final AtomicInteger writeOpsCounter;
+
+ private final AtomicLong estimateWalFileSize = new AtomicLong(0L);
+
+ private final MessageStoreConfig messageStoreConfig;
+
+ private final FlushSyncService flushSyncService;
+
+ public ConfigStorage(MessageStoreConfig messageStoreConfig) {
+ super(messageStoreConfig.getStorePathRootDir() + File.separator +
"config" + File.separator + "rdb");
+ this.messageStoreConfig = messageStoreConfig;
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("config-storage-%d")
+ .build();
+ scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
threadFactory);
+ writeOpsCounter = new AtomicInteger(0);
+ this.flushSyncService = new FlushSyncService();
+ this.flushSyncService.setDaemon(true);
+ }
+
+ private void statNettyMemory() {
+ PooledByteBufAllocatorMetric metric =
AbstractRocksDBStorage.POOLED_ALLOCATOR.metric();
+ LOGGER.info("Netty Memory Usage: {}", metric);
+ }
+
+ @Override
+ public synchronized boolean start() {
+ boolean started = super.start();
+ if (started) {
+ scheduledExecutorService.scheduleWithFixedDelay(() ->
statRocksdb(LOGGER), 1, 10, TimeUnit.SECONDS);
+
scheduledExecutorService.scheduleWithFixedDelay(this::statNettyMemory, 10, 10,
TimeUnit.SECONDS);
+ this.flushSyncService.start();
+ } else {
+ LOGGER.error("Failed to start config storage");
+ }
+ return started;
}
@Override
@@ -58,7 +110,7 @@ public class ConfigStorage extends AbstractRocksDBStorage {
initOptions();
List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
- ColumnFamilyOptions defaultOptions =
ConfigHelper.createConfigOptions();
+ ColumnFamilyOptions defaultOptions =
ConfigHelper.createConfigColumnFamilyOptions();
this.cfOptions.add(defaultOptions);
cfDescriptors.add(new
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions));
@@ -66,7 +118,7 @@ public class ConfigStorage extends AbstractRocksDBStorage {
open(cfDescriptors);
this.defaultCFHandle = cfHandles.get(0);
- } catch (final Exception e) {
+ } catch (Exception e) {
AbstractRocksDBStorage.LOGGER.error("postLoad Failed. {}",
this.dbPath, e);
return false;
}
@@ -75,7 +127,8 @@ public class ConfigStorage extends AbstractRocksDBStorage {
@Override
protected void preShutdown() {
-
+ scheduledExecutorService.shutdown();
+ flushSyncService.shutdown();
}
protected void initOptions() {
@@ -105,6 +158,12 @@ public class ConfigStorage extends AbstractRocksDBStorage {
public void write(WriteBatch writeBatch) throws RocksDBException {
db.write(ableWalWriteOptions, writeBatch);
+ accountWriteOps(writeBatch.getDataSize());
+ }
+
+ private void accountWriteOps(long dataSize) {
+ writeOpsCounter.incrementAndGet();
+ estimateWalFileSize.addAndGet(dataSize);
}
public RocksIterator iterate(ByteBuffer beginKey, ByteBuffer endKey) {
@@ -125,4 +184,91 @@ public class ConfigStorage extends AbstractRocksDBStorage {
return iterator;
}
}
+
+ /**
+ * RocksDB writes contain 3 stages: application memory buffer --> OS Page
Cache --> Disk.
+ * Given that we are having DBOptions::manual_wal_flush, we need to
manually call DB::FlushWAL and DB::SyncWAL
+ * Note: DB::FlushWAL(true) will internally call DB::SyncWAL.
+ * <p>
+ * See <a href="https://rocksdb.org/blog/2017/08/25/flushwal.html">Flush
And Sync WAL</a>
+ */
+ class FlushSyncService extends ServiceThread {
+
+ private long lastSyncTime = 0;
+
+ private static final long MAX_SYNC_INTERVAL_IN_MILLIS = 100;
+
+ private final Stopwatch stopwatch = Stopwatch.createUnstarted();
+
+ private final FlushOptions flushOptions = new FlushOptions();
+
+ @Override
+ public String getServiceName() {
+ return "FlushSyncService";
+ }
+
+ @Override
+ public void run() {
+ flushOptions.setAllowWriteStall(false);
+ flushOptions.setWaitForFlush(true);
+ log.info("{} service started", this.getServiceName());
+ while (!this.isStopped()) {
+ try {
+ this.waitForRunning(10);
+ this.flushAndSyncWAL(false);
+ } catch (Exception e) {
+ log.warn("{} service has exception. ",
this.getServiceName(), e);
+ }
+ }
+ try {
+ flushAndSyncWAL(true);
+ } catch (Exception e) {
+ log.warn("{} raised an exception while performing
flush-and-sync WAL on exit",
+ this.getServiceName(), e);
+ }
+ flushOptions.close();
+ log.info("{} service end", this.getServiceName());
+ }
+
+ private void flushAndSyncWAL(boolean onExit) throws RocksDBException {
+ int writeOps = writeOpsCounter.get();
+ if (0 == writeOps) {
+ // No write ops to flush
+ return;
+ }
+
+ /*
+ * Normally, when MemTables become full then immutable, RocksDB
threads will automatically flush them to L0
+ * SST files. The use case here is different: the MemTable may
never get full and immutable given that the
+ * volume of data involved is relatively small. Further, we are
constantly modifying the key-value pairs and
+ * generating WAL entries. The WAL file size can grow up to dozens
of gigabytes without manual triggering of
+ * flush.
+ */
+ if (ConfigStorage.this.estimateWalFileSize.get() >=
messageStoreConfig.getRocksdbWalFileRollingThreshold()) {
+ ConfigStorage.this.flush(flushOptions);
+ estimateWalFileSize.set(0L);
+ }
+
+ // Flush and Sync WAL if we have committed enough writes
+ if (writeOps >= messageStoreConfig.getRocksdbFlushWalFrequency()
|| onExit) {
+ stopwatch.reset().start();
+ ConfigStorage.this.db.flushWal(true);
+ long elapsed = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
+ writeOpsCounter.getAndAdd(-writeOps);
+ lastSyncTime = System.currentTimeMillis();
+ LOGGER.debug("Flush and Sync WAL of RocksDB[{}] costs {}ms,
write-ops={}", dbPath, elapsed, writeOps);
+ return;
+ }
+ // Flush and Sync WAL if some writes are out there for a period of
time
+ long elapsedTime = System.currentTimeMillis() - lastSyncTime;
+ if (elapsedTime > MAX_SYNC_INTERVAL_IN_MILLIS) {
+ stopwatch.reset().start();
+ ConfigStorage.this.db.flushWal(true);
+ long elapsed = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
+ LOGGER.debug("Flush and Sync WAL of RocksDB[{}] costs {}ms,
write-ops={}", dbPath, elapsed, writeOps);
+ writeOpsCounter.getAndAdd(-writeOps);
+ lastSyncTime = System.currentTimeMillis();
+ }
+ }
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
index 2c5d3677d8..1821c801cb 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
@@ -97,7 +97,7 @@ public class ConsumerOffsetManagerV2 extends
ConsumerOffsetManager {
// TODO: we have to make a copy here as WriteBatch lacks
ByteBuffer API here
writeBatch.deleteRange(ConfigHelper.readBytes(beginKey),
ConfigHelper.readBytes(endKey));
long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
- ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ ConfigHelper.stampDataVersion(writeBatch, TableId.CONSUMER_OFFSET,
dataVersion, stateMachineVersion);
configStorage.write(writeBatch);
} catch (RocksDBException e) {
LOG.error("Failed to removeConsumerOffset, topicAtGroup={}",
topicAtGroup, e);
@@ -138,7 +138,7 @@ public class ConsumerOffsetManagerV2 extends
ConsumerOffsetManager {
writeBatch.deleteRange(ConfigHelper.readBytes(beginKey),
ConfigHelper.readBytes(endKey));
MessageStore messageStore = brokerController.getMessageStore();
long stateMachineVersion = messageStore != null ?
messageStore.getStateMachineVersion() : 0;
- ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ ConfigHelper.stampDataVersion(writeBatch, TableId.CONSUMER_OFFSET,
dataVersion, stateMachineVersion);
configStorage.write(writeBatch);
} catch (RocksDBException e) {
LOG.error("Failed to consumer offsets by group={}", group, e);
@@ -194,7 +194,7 @@ public class ConsumerOffsetManagerV2 extends
ConsumerOffsetManager {
writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
MessageStore messageStore = brokerController.getMessageStore();
long stateMachineVersion = messageStore != null ?
messageStore.getStateMachineVersion() : 0;
- ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ ConfigHelper.stampDataVersion(writeBatch, TableId.CONSUMER_OFFSET,
dataVersion, stateMachineVersion);
configStorage.write(writeBatch);
} catch (RocksDBException e) {
LOG.error("Failed to commit consumer offset", e);
@@ -394,7 +394,7 @@ public class ConsumerOffsetManagerV2 extends
ConsumerOffsetManager {
try (WriteBatch writeBatch = new WriteBatch()) {
writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
- ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ ConfigHelper.stampDataVersion(writeBatch, TableId.PULL_OFFSET,
dataVersion, stateMachineVersion);
configStorage.write(writeBatch);
} catch (RocksDBException e) {
LOG.error("Failed to commit pull offset. group={}, topic={},
queueId={}, offset={}",
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
index dea8a2d2c1..dd67871f18 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
@@ -137,8 +137,10 @@ public class SubscriptionGroupManagerV2 extends
SubscriptionGroupManager {
try (WriteBatch writeBatch = new WriteBatch()) {
writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
- ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ ConfigHelper.stampDataVersion(writeBatch,
TableId.SUBSCRIPTION_GROUP, dataVersion, stateMachineVersion);
configStorage.write(writeBatch);
+ // fdatasync on core metadata change
+ persist();
} catch (RocksDBException e) {
log.error("update subscription group config error", e);
} finally {
@@ -163,7 +165,7 @@ public class SubscriptionGroupManagerV2 extends
SubscriptionGroupManager {
try (WriteBatch writeBatch = new WriteBatch()) {
writeBatch.delete(ConfigHelper.readBytes(keyBuf));
long stateMachineVersion =
brokerController.getMessageStore().getStateMachineVersion();
- ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ ConfigHelper.stampDataVersion(writeBatch,
TableId.SUBSCRIPTION_GROUP, dataVersion, stateMachineVersion);
configStorage.write(writeBatch);
} catch (RocksDBException e) {
log.error("Failed to remove subscription group config by
group-name={}", groupName, e);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java
index 4e36b08727..7991d70445 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java
@@ -151,8 +151,10 @@ public class TopicConfigManagerV2 extends
TopicConfigManager {
try (WriteBatch writeBatch = new WriteBatch()) {
writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
- ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ ConfigHelper.stampDataVersion(writeBatch, TableId.TOPIC,
dataVersion, stateMachineVersion);
configStorage.write(writeBatch);
+ // fdatasync on core metadata change
+ this.persist();
} catch (RocksDBException e) {
log.error("Failed to update topic config", e);
} finally {
@@ -167,7 +169,7 @@ public class TopicConfigManagerV2 extends
TopicConfigManager {
try (WriteBatch writeBatch = new WriteBatch()) {
writeBatch.delete(keyBuf.nioBuffer());
long stateMachineVersion = brokerController.getMessageStore() !=
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
- ConfigHelper.stampDataVersion(writeBatch, dataVersion,
stateMachineVersion);
+ ConfigHelper.stampDataVersion(writeBatch, TableId.TOPIC,
dataVersion, stateMachineVersion);
configStorage.write(writeBatch);
} catch (RocksDBException e) {
log.error("Failed to delete topic config by topicName={}",
topicName, e);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java
index d7f46855e1..132bd5c1a5 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -44,6 +45,8 @@ public class ConsumerOffsetManagerV2Test {
@Mock
private BrokerController controller;
+ private MessageStoreConfig messageStoreConfig;
+
@Rule
public TemporaryFolder tf = new TemporaryFolder();
@@ -60,7 +63,9 @@ public class ConsumerOffsetManagerV2Test {
Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
File configStoreDir = tf.newFolder();
- configStorage = new ConfigStorage(configStoreDir.getAbsolutePath());
+ messageStoreConfig = new MessageStoreConfig();
+
messageStoreConfig.setStorePathRootDir(configStoreDir.getAbsolutePath());
+ configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller,
configStorage);
}
@@ -84,7 +89,9 @@ public class ConsumerOffsetManagerV2Test {
consumerOffsetManagerV2.getOffsetTable().clear();
Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group,
topic, queueId));
+ configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
+ consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller,
configStorage);
consumerOffsetManagerV2.load();
Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
}
@@ -106,7 +113,9 @@ public class ConsumerOffsetManagerV2Test {
configStorage.shutdown();
+ configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
+ consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller,
configStorage);
consumerOffsetManagerV2.load();
Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
}
@@ -129,7 +138,9 @@ public class ConsumerOffsetManagerV2Test {
configStorage.shutdown();
+ configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
+ consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller,
configStorage);
consumerOffsetManagerV2.load();
Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryPullOffset(group, topic, queueId));
}
@@ -157,7 +168,10 @@ public class ConsumerOffsetManagerV2Test {
Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryOffset(group, topic2, queueId));
configStorage.shutdown();
+
+ configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
+ consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller,
configStorage);
consumerOffsetManagerV2.load();
Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group,
topic, queueId));
Assert.assertEquals(queueOffset,
consumerOffsetManagerV2.queryOffset(group, topic2, queueId));
@@ -184,7 +198,10 @@ public class ConsumerOffsetManagerV2Test {
Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group,
topic2, queueId));
configStorage.shutdown();
+
+ configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
+ consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller,
configStorage);
consumerOffsetManagerV2.load();
Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group,
topic, queueId));
Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group,
topic2, queueId));
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
index 6d436a7c4d..4ff8a81e60 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
@@ -25,6 +25,7 @@ import
org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy;
import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -38,6 +39,9 @@ import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class SubscriptionGroupManagerV2Test {
+
+ private MessageStoreConfig messageStoreConfig;
+
private ConfigStorage configStorage;
private SubscriptionGroupManagerV2 subscriptionGroupManagerV2;
@@ -68,7 +72,9 @@ public class SubscriptionGroupManagerV2Test {
Mockito.doReturn(1L).when(messageStore).getStateMachineVersion();
File configStoreDir = tf.newFolder();
- configStorage = new ConfigStorage(configStoreDir.getAbsolutePath());
+ messageStoreConfig = new MessageStoreConfig();
+
messageStoreConfig.setStorePathRootDir(configStoreDir.getAbsolutePath());
+ configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
subscriptionGroupManagerV2 = new
SubscriptionGroupManagerV2(controller, configStorage);
}
@@ -98,7 +104,10 @@ public class SubscriptionGroupManagerV2Test {
subscriptionGroupManagerV2.getSubscriptionGroupTable().clear();
configStorage.shutdown();
+
+ configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
+ subscriptionGroupManagerV2 = new
SubscriptionGroupManagerV2(controller, configStorage);
subscriptionGroupManagerV2.load();
found =
subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
Assert.assertEquals(subscriptionGroupConfig, found);
@@ -132,7 +141,11 @@ public class SubscriptionGroupManagerV2Test {
Assert.assertNull(found);
configStorage.shutdown();
+
+ configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
+
+ subscriptionGroupManagerV2 = new
SubscriptionGroupManagerV2(controller, configStorage);
subscriptionGroupManagerV2.load();
found =
subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
Assert.assertNull(found);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
index 92c936b110..731a1f538f 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Assert;
@@ -35,17 +36,19 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
-
@RunWith(value = MockitoJUnitRunner.class)
public class TopicConfigManagerV2Test {
- private ConfigStorage configStorage;
+ private MessageStoreConfig messageStoreConfig;
- private TopicConfigManagerV2 topicConfigManagerV2;
+ private ConfigStorage configStorage;
@Mock
private BrokerController controller;
+ @Mock
+ private MessageStore messageStore;
+
@Rule
public TemporaryFolder tf = new TemporaryFolder();
@@ -61,17 +64,22 @@ public class TopicConfigManagerV2Test {
BrokerConfig brokerConfig = new BrokerConfig();
Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
- MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig = new MessageStoreConfig();
Mockito.doReturn(messageStoreConfig).when(controller).getMessageStoreConfig();
+ Mockito.doReturn(messageStore).when(controller).getMessageStore();
File configStoreDir = tf.newFolder();
- configStorage = new ConfigStorage(configStoreDir.getAbsolutePath());
+
messageStoreConfig.setStorePathRootDir(configStoreDir.getAbsolutePath());
+
+ configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
- topicConfigManagerV2 = new TopicConfigManagerV2(controller,
configStorage);
}
@Test
public void testUpdateTopicConfig() {
+ TopicConfigManagerV2 topicConfigManagerV2 = new
TopicConfigManagerV2(controller, configStorage);
+ topicConfigManagerV2.load();
+
TopicConfig topicConfig = new TopicConfig();
String topicName = "T1";
topicConfig.setTopicName(topicName);
@@ -86,7 +94,9 @@ public class TopicConfigManagerV2Test {
topicConfigManagerV2.getTopicConfigTable().clear();
+ configStorage = new ConfigStorage(messageStoreConfig);
Assert.assertTrue(configStorage.start());
+ topicConfigManagerV2 = new TopicConfigManagerV2(controller,
configStorage);
Assert.assertTrue(topicConfigManagerV2.load());
TopicConfig loaded = topicConfigManagerV2.selectTopicConfig(topicName);
@@ -111,12 +121,15 @@ public class TopicConfigManagerV2Test {
topicConfig.setWriteQueueNums(4);
topicConfig.setOrder(true);
topicConfig.setTopicSysFlag(4);
+ TopicConfigManagerV2 topicConfigManagerV2 = new
TopicConfigManagerV2(controller, configStorage);
topicConfigManagerV2.updateTopicConfig(topicConfig);
topicConfigManagerV2.removeTopicConfig(topicName);
Assert.assertFalse(topicConfigManagerV2.containsTopic(topicName));
Assert.assertTrue(configStorage.shutdown());
+ configStorage = new ConfigStorage(messageStoreConfig);
Assert.assertTrue(configStorage.start());
+ topicConfigManagerV2 = new TopicConfigManagerV2(controller,
configStorage);
Assert.assertTrue(topicConfigManagerV2.load());
Assert.assertFalse(topicConfigManagerV2.containsTopic(topicName));
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
index 4b320eb53f..6a805b0434 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
@@ -23,11 +23,11 @@ import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.MapUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -38,6 +38,7 @@ import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -135,6 +136,7 @@ public class RocksdbTransferOffsetAndCqTest {
}
RocksDBMessageStore kvStore =
defaultMessageStore.getRocksDBMessageStore();
ConsumeQueueStoreInterface store = kvStore.getConsumeQueueStore();
+ store.start();
ConsumeQueueInterface rocksdbCq =
defaultMessageStore.getRocksDBMessageStore().findConsumeQueue(topic, queueId);
ConsumeQueueInterface fileCq =
defaultMessageStore.findConsumeQueue(topic, queueId);
for (int i = 0; i < 200; i++) {
@@ -142,13 +144,21 @@ public class RocksdbTransferOffsetAndCqTest {
fileCq.putMessagePositionInfoWrapper(request);
store.putMessagePositionInfoWrapper(request);
}
+ Awaitility.await()
+ .pollInterval(100, TimeUnit.MILLISECONDS)
+ .atMost(3, TimeUnit.SECONDS)
+ .until(() -> rocksdbCq.getMaxOffsetInQueue() == 200);
Pair<CqUnit, Long> unit = rocksdbCq.getCqUnitAndStoreTime(100);
Pair<CqUnit, Long> unit1 = fileCq.getCqUnitAndStoreTime(100);
- Assert.assertTrue(unit.getObject1().getPos() ==
unit1.getObject1().getPos());
+ Assert.assertEquals(unit.getObject1().getPos(),
unit1.getObject1().getPos());
}
+ /**
+ * No need to skip macOS platform.
+ * @return true if some platform is NOT a good fit for this test case.
+ */
private boolean notToBeExecuted() {
- return MixAll.isMac();
+ return false;
}
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index 28ed4e924c..48ba4b8086 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -121,14 +121,16 @@ public abstract class AbstractRocksDBStorage {
this.writeOptions = new WriteOptions();
this.writeOptions.setSync(false);
this.writeOptions.setDisableWAL(true);
- this.writeOptions.setNoSlowdown(true);
+ // https://github.com/facebook/rocksdb/wiki/Write-Stalls
+ this.writeOptions.setNoSlowdown(false);
}
protected void initAbleWalWriteOptions() {
this.ableWalWriteOptions = new WriteOptions();
this.ableWalWriteOptions.setSync(false);
this.ableWalWriteOptions.setDisableWAL(false);
- this.ableWalWriteOptions.setNoSlowdown(true);
+ // https://github.com/facebook/rocksdb/wiki/Write-Stalls
+ this.ableWalWriteOptions.setNoSlowdown(false);
}
protected void initReadOptions() {
@@ -363,7 +365,7 @@ public abstract class AbstractRocksDBStorage {
}
if (postLoad()) {
this.loaded = true;
- LOGGER.info("start OK. {}", this.dbPath);
+ LOGGER.info("RocksDB[{}] starts OK", this.dbPath);
this.closed = false;
return true;
} else {
@@ -560,7 +562,15 @@ public abstract class AbstractRocksDBStorage {
public void statRocksdb(Logger logger) {
try {
+ // Log Memory Usage
+ String blockCacheMemUsage =
this.db.getProperty("rocksdb.block-cache-usage");
+ String indexesAndFilterBlockMemUsage =
this.db.getProperty("rocksdb.estimate-table-readers-mem");
+ String memTableMemUsage =
this.db.getProperty("rocksdb.cur-size-all-mem-tables");
+ String blocksPinnedByIteratorMemUsage =
this.db.getProperty("rocksdb.block-cache-pinned-usage");
+ logger.info("RocksDB Memory Usage: BlockCache: {},
IndexesAndFilterBlock: {}, MemTable: {}, BlocksPinnedByIterator: {}",
+ blockCacheMemUsage, indexesAndFilterBlockMemUsage,
memTableMemUsage, blocksPinnedByIteratorMemUsage);
+ // Log file metadata by level
List<LiveFileMetaData> liveFileMetaDataList =
this.getCompactionStatus();
if (liveFileMetaDataList == null ||
liveFileMetaDataList.isEmpty()) {
return;
@@ -570,21 +580,13 @@ public abstract class AbstractRocksDBStorage {
StringBuilder sb = map.computeIfAbsent(metaData.level(), k ->
new StringBuilder(256));
sb.append(new String(metaData.columnFamilyName(),
StandardCharsets.UTF_8)).append(SPACE).
append(metaData.fileName()).append(SPACE).
- append("s: ").append(metaData.size()).append(SPACE).
- append("a: ").append(metaData.numEntries()).append(SPACE).
- append("r:
").append(metaData.numReadsSampled()).append(SPACE).
- append("d:
").append(metaData.numDeletions()).append(SPACE).
- append(metaData.beingCompacted()).append("\n");
+ append("file-size:
").append(metaData.size()).append(SPACE).
+ append("number-of-entries:
").append(metaData.numEntries()).append(SPACE).
+ append("file-read-times:
").append(metaData.numReadsSampled()).append(SPACE).
+ append("deletions:
").append(metaData.numDeletions()).append(SPACE).
+ append("being-compacted:
").append(metaData.beingCompacted()).append("\n");
}
-
map.forEach((key, value) -> logger.info("level: {}\n{}", key,
value.toString()));
-
- String blockCacheMemUsage =
this.db.getProperty("rocksdb.block-cache-usage");
- String indexesAndFilterBlockMemUsage =
this.db.getProperty("rocksdb.estimate-table-readers-mem");
- String memTableMemUsage =
this.db.getProperty("rocksdb.cur-size-all-mem-tables");
- String blocksPinnedByIteratorMemUsage =
this.db.getProperty("rocksdb.block-cache-pinned-usage");
- logger.info("MemUsage. blockCache: {}, indexesAndFilterBlock: {},
MemTable: {}, blocksPinnedByIterator: {}",
- blockCacheMemUsage, indexesAndFilterBlockMemUsage,
memTableMemUsage, blocksPinnedByIteratorMemUsage);
} catch (Exception ignored) {
}
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java
index a4ba35bd5a..e3f6f22002 100644
--- a/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java
@@ -38,7 +38,7 @@ import org.rocksdb.WALRecoveryMode;
import org.rocksdb.util.SizeUnit;
public class ConfigHelper {
- public static ColumnFamilyOptions createConfigOptions() {
+ public static ColumnFamilyOptions createConfigColumnFamilyOptions() {
BlockBasedTableConfig blockBasedTableConfig = new
BlockBasedTableConfig().
setFormatVersion(5).
setIndexType(IndexType.kBinarySearch).
@@ -46,7 +46,7 @@ public class ConfigHelper {
setBlockSize(32 * SizeUnit.KB).
setFilterPolicy(new BloomFilter(16, false)).
// Indicating if we'd put index/filter blocks to the block cache.
- setCacheIndexAndFilterBlocks(false).
+ setCacheIndexAndFilterBlocks(true).
setCacheIndexAndFilterBlocksWithHighPriority(true).
setPinL0FilterAndIndexBlocksInCache(false).
setPinTopLevelIndexAndFilter(true).
@@ -54,9 +54,8 @@ public class ConfigHelper {
setWholeKeyFiltering(true);
ColumnFamilyOptions options = new ColumnFamilyOptions();
- return options.setMaxWriteBufferNumber(2).
- // MemTable size, MemTable(cache) -> immutable MemTable(cache) ->
SST(disk)
- setWriteBufferSize(8 * SizeUnit.MB).
+ return options.setMaxWriteBufferNumber(4).
+ setWriteBufferSize(64 * SizeUnit.MB).
setMinWriteBufferNumberToMerge(1).
setTableFormatConfig(blockBasedTableConfig).
setMemTableConfig(new SkipListMemTableConfig()).
@@ -67,17 +66,17 @@ public class ConfigHelper {
setLevel0SlowdownWritesTrigger(8).
setLevel0StopWritesTrigger(12).
// The target file size for compaction.
- setTargetFileSizeBase(64 * SizeUnit.MB).
+ setTargetFileSizeBase(64 * SizeUnit.MB).
setTargetFileSizeMultiplier(2).
// The upper-bound of the total size of L1 files in bytes
- setMaxBytesForLevelBase(256 * SizeUnit.MB).
+ setMaxBytesForLevelBase(256 * SizeUnit.MB).
setMaxBytesForLevelMultiplier(2).
setMergeOperator(new StringAppendOperator()).
setInplaceUpdateSupport(true);
}
public static DBOptions createConfigDBOptions() {
- //Turn based on
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
+ // Tune based on
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
// and
http://gitlab.alibaba-inc.com/aloha/aloha/blob/branch_2_5_0/jstorm-core/src/main/java/com/alibaba/jstorm/cache/rocksdb/RocksDbOptionsFactory.java
DBOptions options = new DBOptions();
Statistics statistics = new Statistics();
@@ -86,10 +85,20 @@ public class ConfigHelper {
setDbLogDir(getDBLogDir()).
setInfoLogLevel(InfoLogLevel.INFO_LEVEL).
setWalRecoveryMode(WALRecoveryMode.SkipAnyCorruptedRecords).
+ /*
+ * We use manual flush to achieve desired balance between
reliability and performance:
+ * for metadata that matters, including {topic,
subscription}-config changes, each write incurs a
+ * flush-and-sync to ensure reliability; for {commit, pull}-offset
advancements, group-flush are offered for
+ * every N(configurable, 1024 by default) writes or aging of
writes, similar to OS page-cache flush
+ * mechanism.
+ */
setManualWalFlush(true).
- setMaxTotalWalSize(500 * SizeUnit.MB).
- setWalSizeLimitMB(0).
- setWalTtlSeconds(0).
+ // This option takes effect only when we have multiple column
families
+ // https://github.com/facebook/rocksdb/issues/4180
+ // setMaxTotalWalSize(1024 * SizeUnit.MB).
+ setDbWriteBufferSize(128 * SizeUnit.MB).
+ setBytesPerSync(SizeUnit.MB).
+ setWalBytesPerSync(SizeUnit.MB).
setCreateIfMissing(true).
setCreateMissingColumnFamilies(true).
setMaxOpenFiles(-1).
@@ -99,7 +108,6 @@ public class ConfigHelper {
setAllowConcurrentMemtableWrite(false).
setStatistics(statistics).
setStatsDumpPeriodSec(600).
- setAtomicFlush(true).
setMaxBackgroundJobs(32).
setMaxSubcompactions(4).
setParanoidChecks(true).
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
index 3b924a6a0d..5fd9bab2d7 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
@@ -70,7 +70,7 @@ public class ConfigRocksDBStorage extends
AbstractRocksDBStorage {
final List<ColumnFamilyDescriptor> cfDescriptors = new
ArrayList<>();
- ColumnFamilyOptions defaultOptions =
ConfigHelper.createConfigOptions();
+ ColumnFamilyOptions defaultOptions =
ConfigHelper.createConfigColumnFamilyOptions();
this.cfOptions.add(defaultOptions);
cfDescriptors.add(new
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions));
cfDescriptors.add(new
ColumnFamilyDescriptor(KV_DATA_VERSION_COLUMN_FAMILY_NAME, defaultOptions));
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 7cf9746551..d30691908b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -326,22 +326,26 @@ public class CommitLog implements Swappable {
boolean checkDupInfo =
this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
final List<MappedFile> mappedFiles =
this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
- // Began to recover from the last third file
- int index = mappedFiles.size() - 3;
- if (index < 0) {
- index = 0;
+ int index = mappedFiles.size() - 1;
+ while (index > 0) {
+ MappedFile mappedFile = mappedFiles.get(index);
+ if (mappedFile.getFileFromOffset() <=
maxPhyOffsetOfConsumeQueue) {
+ // It's safe to recover from this mapped file
+ break;
+ }
+ index--;
}
+ // TODO: Discuss if we need to load more commit-log mapped files
into memory.
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
long lastValidMsgPhyOffset = this.getConfirmOffset();
- // normal recover doesn't require dispatching
- boolean doDispatch = false;
while (true) {
DispatchRequest dispatchRequest =
this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
int size = dispatchRequest.getMsgSize();
+ boolean doDispatch = dispatchRequest.getCommitLogOffset() >
maxPhyOffsetOfConsumeQueue;
// Normal data
if (dispatchRequest.isSuccess() && size > 0) {
lastValidMsgPhyOffset = processOffset + mappedFileOffset;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index fe090e3fa2..6dfdc0b1c8 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.StoreType;
import org.apache.rocketmq.store.queue.BatchConsumeQueue;
import org.rocksdb.CompressionType;
+import org.rocksdb.util.SizeUnit;
public class MessageStoreConfig {
@@ -444,6 +445,13 @@ public class MessageStoreConfig {
private String rocksdbCompressionType =
CompressionType.LZ4_COMPRESSION.getLibraryName();
+ /**
+ * Flush RocksDB WAL frequency, aka, flush WAL every N write ops.
+ */
+ private int rocksdbFlushWalFrequency = 1024;
+
+ private long rocksdbWalFileRollingThreshold = SizeUnit.GB;
+
public String getRocksdbCompressionType() {
return rocksdbCompressionType;
}
@@ -1902,6 +1910,22 @@ public class MessageStoreConfig {
this.bottomMostCompressionTypeForConsumeQueueStore =
bottomMostCompressionTypeForConsumeQueueStore;
}
+ public int getRocksdbFlushWalFrequency() {
+ return rocksdbFlushWalFrequency;
+ }
+
+ public void setRocksdbFlushWalFrequency(int rocksdbFlushWalFrequency) {
+ this.rocksdbFlushWalFrequency = rocksdbFlushWalFrequency;
+ }
+
+ public long getRocksdbWalFileRollingThreshold() {
+ return rocksdbWalFileRollingThreshold;
+ }
+
+ public void setRocksdbWalFileRollingThreshold(long
rocksdbWalFileRollingThreshold) {
+ this.rocksdbWalFileRollingThreshold = rocksdbWalFileRollingThreshold;
+ }
+
public int getSpinLockCollisionRetreatOptimalDegree() {
return spinLockCollisionRetreatOptimalDegree;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 0242ec2309..7e3aa70d02 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -30,12 +30,14 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
@@ -84,6 +86,10 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
private final OffsetInitializer offsetInitializer;
+ private final RocksGroupCommitService groupCommitService;
+
+ private final AtomicReference<ServiceState> serviceState = new
AtomicReference<>(ServiceState.CREATE_JUST);
+
public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
super(messageStore);
@@ -93,6 +99,7 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
this.rocksDBConsumeQueueOffsetTable = new
RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage,
messageStore);
this.offsetInitializer = new OffsetInitializerRocksDBImpl(this);
+ this.groupCommitService = new RocksGroupCommitService(this);
this.cqBBPairList = new ArrayList<>(16);
this.offsetBBPairList = new ArrayList<>(DEFAULT_BYTE_BUFFER_CAPACITY);
for (int i = 0; i < DEFAULT_BYTE_BUFFER_CAPACITY; i++) {
@@ -123,14 +130,17 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
@Override
public void start() {
- log.info("RocksDB ConsumeQueueStore start!");
- this.scheduledExecutorService.scheduleAtFixedRate(() -> {
- this.rocksDBStorage.statRocksdb(ROCKSDB_LOG);
- }, 10, this.messageStoreConfig.getStatRocksDBCQIntervalSec(),
TimeUnit.SECONDS);
-
- this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
- cleanDirty(messageStore.getTopicConfigs().keySet());
- }, 10, this.messageStoreConfig.getCleanRocksDBDirtyCQIntervalMin(),
TimeUnit.MINUTES);
+ if (serviceState.compareAndSet(ServiceState.CREATE_JUST,
ServiceState.RUNNING)) {
+ log.info("RocksDB ConsumeQueueStore start!");
+ this.groupCommitService.start();
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+ this.rocksDBStorage.statRocksdb(ROCKSDB_LOG);
+ }, 10, this.messageStoreConfig.getStatRocksDBCQIntervalSec(),
TimeUnit.SECONDS);
+
+ this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
+ cleanDirty(messageStore.getTopicConfigs().keySet());
+ }, 10,
this.messageStoreConfig.getCleanRocksDBDirtyCQIntervalMin(), TimeUnit.MINUTES);
+ }
}
private void cleanDirty(final Set<String> existTopicSet) {
@@ -165,18 +175,23 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
@Override
public void recover() {
- // ignored
+ start();
}
@Override
public boolean recoverConcurrently() {
+ start();
return true;
}
@Override
public boolean shutdown() {
- this.scheduledExecutorService.shutdown();
- return shutdownInner();
+ if (serviceState.compareAndSet(ServiceState.RUNNING,
ServiceState.SHUTDOWN_ALREADY)) {
+ this.groupCommitService.shutdown();
+ this.scheduledExecutorService.shutdown();
+ return shutdownInner();
+ }
+ return true;
}
private boolean shutdownInner() {
@@ -188,23 +203,25 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
if (null == request) {
return;
}
- // We are taking advantage of Atomic Flush, this operation is purely
memory-based.
- // batch and cache in Java heap does not make sense, instead, we
should put the metadata into RocksDB immediately
- // to optimized overall end-to-end latency.
- putMessagePosition(request);
+
+ try {
+ groupCommitService.putRequest(request);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
- public void putMessagePosition(DispatchRequest request) throws
RocksDBException {
+ public void putMessagePosition(List<DispatchRequest> requests) throws
RocksDBException {
final int maxRetries = 30;
for (int i = 0; i < maxRetries; i++) {
- if (putMessagePosition0(request)) {
+ if (putMessagePosition0(requests)) {
if (this.isCQError) {
this.messageStore.getRunningFlags().clearLogicsQueueError();
this.isCQError = false;
}
return;
} else {
- ERROR_LOG.warn("{} put cq Failed. retryTime: {}", i);
+ ERROR_LOG.warn("Put cq Failed. retryTime: {}", i);
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
@@ -219,34 +236,43 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
throw new RocksDBException("put CQ Failed");
}
- private boolean putMessagePosition0(DispatchRequest request) {
+ private boolean putMessagePosition0(List<DispatchRequest> requests) {
if (!this.rocksDBStorage.hold()) {
return false;
}
try (WriteBatch writeBatch = new WriteBatch()) {
+ final int size = requests.size();
+ if (size == 0) {
+ return true;
+ }
long maxPhyOffset = 0;
- DispatchEntry entry = DispatchEntry.from(request);
- dispatch(entry, writeBatch);
- dispatchLMQ(request, writeBatch);
-
- final int msgSize = request.getMsgSize();
- final long phyOffset = request.getCommitLogOffset();
- if (phyOffset + msgSize >= maxPhyOffset) {
- maxPhyOffset = phyOffset + msgSize;
+ for (int i = size - 1; i >= 0; i--) {
+ final DispatchRequest request = requests.get(i);
+ DispatchEntry entry = DispatchEntry.from(request);
+ dispatch(entry, writeBatch);
+ dispatchLMQ(request, writeBatch);
+
+ final int msgSize = request.getMsgSize();
+ final long phyOffset = request.getCommitLogOffset();
+ if (phyOffset + msgSize >= maxPhyOffset) {
+ maxPhyOffset = phyOffset + msgSize;
+ }
}
this.rocksDBConsumeQueueOffsetTable.putMaxPhyAndCqOffset(tempTopicQueueMaxOffsetMap,
writeBatch, maxPhyOffset);
this.rocksDBStorage.batchPut(writeBatch);
+
this.rocksDBConsumeQueueOffsetTable.putHeapMaxCqOffset(tempTopicQueueMaxOffsetMap);
- long storeTimeStamp = request.getStoreTimestamp();
+
+ long storeTimeStamp = requests.get(size - 1).getStoreTimestamp();
if (this.messageStore.getMessageStoreConfig().getBrokerRole() ==
BrokerRole.SLAVE
||
this.messageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimeStamp);
}
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimeStamp);
- notifyMessageArrival(request);
+ notifyMessageArriveAndClear(requests);
return true;
} catch (Exception e) {
ERROR_LOG.error("putMessagePosition0 failed.", e);
@@ -311,9 +337,12 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
}
}
- private void notifyMessageArrival(DispatchRequest request) {
+ private void notifyMessageArriveAndClear(List<DispatchRequest> requests) {
try {
- this.messageStore.notifyMessageArriveIfNecessary(request);
+ for (DispatchRequest dp : requests) {
+ this.messageStore.notifyMessageArriveIfNecessary(dp);
+ }
+ requests.clear();
} catch (Exception e) {
ERROR_LOG.error("notifyMessageArriveAndClear Failed.", e);
}
@@ -538,4 +567,8 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
}
return super.getMaxOffset(topic, queueId);
}
+
+ public boolean isStopped() {
+ return ServiceState.SHUTDOWN_ALREADY == serviceState.get();
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java
new file mode 100644
index 0000000000..e2f2c9ee2c
--- /dev/null
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.queue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.rocksdb.RocksDBException;
+
+public class RocksGroupCommitService extends ServiceThread {
+
+ private static final int MAX_BUFFER_SIZE = 100_000;
+
+ private static final int PREFERRED_DISPATCH_REQUEST_COUNT = 256;
+
+ private final LinkedBlockingQueue<DispatchRequest> buffer;
+
+ private final RocksDBConsumeQueueStore store;
+
+ private final List<DispatchRequest> requests = new
ArrayList<>(PREFERRED_DISPATCH_REQUEST_COUNT);
+
+ public RocksGroupCommitService(RocksDBConsumeQueueStore store) {
+ this.store = store;
+ this.buffer = new LinkedBlockingQueue<>(MAX_BUFFER_SIZE);
+ }
+
+ @Override
+ public String getServiceName() {
+ return "RocksGroupCommit";
+ }
+
+ @Override
+ public void run() {
+ log.info("{} service started", this.getServiceName());
+ while (!this.isStopped()) {
+ try {
+ this.waitForRunning(10);
+ this.doCommit();
+ } catch (Exception e) {
+ log.warn("{} service has exception. ", this.getServiceName(),
e);
+ }
+ }
+ log.info("{} service end", this.getServiceName());
+ }
+
+ public void putRequest(final DispatchRequest request) throws
InterruptedException {
+ while (!buffer.offer(request, 3, TimeUnit.SECONDS)) {
+ log.warn("RocksGroupCommitService#buffer is full, 3s elapsed
before space becomes available");
+ }
+ this.wakeup();
+ }
+
+ private void doCommit() {
+ while (!buffer.isEmpty()) {
+ while (true) {
+ DispatchRequest dispatchRequest = buffer.poll();
+ if (null != dispatchRequest) {
+ requests.add(dispatchRequest);
+ }
+
+ if (requests.isEmpty()) {
+ // buffer has been drained
+ break;
+ }
+
+ if (null == dispatchRequest || requests.size() >=
PREFERRED_DISPATCH_REQUEST_COUNT) {
+ groupCommit();
+ }
+ }
+ }
+ }
+
+ private void groupCommit() {
+ while (!store.isStopped()) {
+ try {
+ // putMessagePosition will clear requests after consume queue
building completion
+ store.putMessagePosition(requests);
+ break;
+ } catch (RocksDBException e) {
+ log.error("Failed to build consume queue in RocksDB", e);
+ }
+ }
+ }
+
+}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
index 66f5cbd095..2fac3bf485 100644
---
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
+++
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
@@ -22,6 +22,7 @@ import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionOptionsUniversal;
+import org.rocksdb.CompactionPriority;
import org.rocksdb.CompactionStopStyle;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
@@ -79,6 +80,7 @@ public class RocksDBOptionsFactory {
setCompressionType(compressionType).
setBottommostCompressionType(bottomMostCompressionType).
setNumLevels(7).
+ setCompactionPriority(CompactionPriority.MinOverlappingRatio).
setCompactionStyle(CompactionStyle.UNIVERSAL).
setCompactionOptionsUniversal(compactionOption).
setMaxCompactionBytes(100 * SizeUnit.GB).
@@ -144,10 +146,8 @@ public class RocksDBOptionsFactory {
setInfoLogLevel(InfoLogLevel.INFO_LEVEL).
setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery).
setManualWalFlush(true).
- setMaxTotalWalSize(0).
- setWalSizeLimitMB(0).
- setWalTtlSeconds(0).
setCreateIfMissing(true).
+ setBytesPerSync(SizeUnit.MB).
setCreateMissingColumnFamilies(true).
setMaxOpenFiles(-1).
setMaxLogFileSize(SizeUnit.GB).
@@ -156,6 +156,7 @@ public class RocksDBOptionsFactory {
setAllowConcurrentMemtableWrite(false).
setStatistics(statistics).
setAtomicFlush(true).
+ setCompactionReadaheadSize(4 * SizeUnit.MB).
setMaxBackgroundJobs(32).
setMaxSubcompactions(8).
setParanoidChecks(true).