This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e0db6545b0 [ISSUE #9112] Speedup revive scan in Pop Consumption and
support server side reset offset (#9113)
e0db6545b0 is described below
commit e0db6545b00770e8f41bf55cfb560ac79f9a17c9
Author: lizhimins <[email protected]>
AuthorDate: Wed Jan 8 23:00:22 2025 +0800
[ISSUE #9112] Speedup revive scan in Pop Consumption and support server
side reset offset (#9113)
---
.../rocketmq/broker/pop/PopConsumerKVStore.java | 11 ++-
.../broker/pop/PopConsumerRocksdbStore.java | 20 +++--
.../rocketmq/broker/pop/PopConsumerService.java | 60 ++++++++-----
.../broker/pop/PopConsumerRocksdbStoreTest.java | 97 +++++++++++++++++++++-
.../broker/pop/PopConsumerServiceTest.java | 13 +--
.../common/config/AbstractRocksDBStorage.java | 6 +-
6 files changed, 162 insertions(+), 45 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerKVStore.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerKVStore.java
index 5569abe3db..33072d699b 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerKVStore.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerKVStore.java
@@ -49,10 +49,13 @@ public interface PopConsumerKVStore {
void deleteRecords(List<PopConsumerRecord> consumerRecordList);
/**
- * Scans and returns a list of expired consumer records before the current
time.
- * @param currentTime The current revive checkpoint timestamp.
+ * Scans and returns a list of expired consumer records within the
specified time range.
+ * @param lowerTime The start time (inclusive) of the time range to
search, in milliseconds.
+ * @param upperTime The end time (exclusive) of the time range to search,
in milliseconds.
* @param maxCount The maximum number of records to return.
- * @return A list of expired consumer records.
+ * Even if more records match the criteria, only this many
will be returned.
+ * @return A list of expired consumer records within the specified time
range.
+ * If no matching records are found, an empty list is returned.
*/
- List<PopConsumerRecord> scanExpiredRecords(long currentTime, int maxCount);
+ List<PopConsumerRecord> scanExpiredRecords(long lowerTime, long upperTime,
int maxCount);
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java
index f2a617b408..7ab276a418 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStore.java
@@ -28,9 +28,11 @@ import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactRangeOptions;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
@@ -43,7 +45,7 @@ public class PopConsumerRocksdbStore extends
AbstractRocksDBStorage implements P
private WriteOptions writeOptions;
private WriteOptions deleteOptions;
- private ColumnFamilyHandle columnFamilyHandle;
+ protected ColumnFamilyHandle columnFamilyHandle;
public PopConsumerRocksdbStore(String filePath) {
super(filePath);
@@ -60,8 +62,7 @@ public class PopConsumerRocksdbStore extends
AbstractRocksDBStorage implements P
this.writeOptions.setNoSlowdown(false);
this.deleteOptions = new WriteOptions();
- this.deleteOptions.setSync(false);
- this.deleteOptions.setLowPri(true);
+ this.deleteOptions.setSync(true);
this.deleteOptions.setDisableWAL(false);
this.deleteOptions.setNoSlowdown(false);
@@ -135,18 +136,19 @@ public class PopConsumerRocksdbStore extends
AbstractRocksDBStorage implements P
}
@Override
- public List<PopConsumerRecord> scanExpiredRecords(long currentTime, int
maxCount) {
+ // https://github.com/facebook/rocksdb/issues/10300
+ public List<PopConsumerRecord> scanExpiredRecords(long lower, long upper,
int maxCount) {
// In RocksDB, we can use SstPartitionerFixedPrefixFactory in cfOptions
// and new ColumnFamilyOptions().useFixedLengthPrefixExtractor() to
// configure prefix indexing to improve the performance of scans.
// However, in the current implementation, this is not the bottleneck.
List<PopConsumerRecord> consumerRecordList = new ArrayList<>();
- try (RocksIterator iterator = db.newIterator(this.columnFamilyHandle))
{
- iterator.seekToFirst();
+ try (ReadOptions scanOptions = new ReadOptions()
+ .setIterateLowerBound(new
Slice(ByteBuffer.allocate(Long.BYTES).putLong(lower).array()))
+ .setIterateUpperBound(new
Slice(ByteBuffer.allocate(Long.BYTES).putLong(upper).array()));
+ RocksIterator iterator = db.newIterator(this.columnFamilyHandle,
scanOptions)) {
+
iterator.seek(ByteBuffer.allocate(Long.BYTES).putLong(lower).array());
while (iterator.isValid() && consumerRecordList.size() < maxCount)
{
- if (ByteBuffer.wrap(iterator.key()).getLong() > currentTime) {
- break;
- }
consumerRecordList.add(PopConsumerRecord.decode(iterator.value()));
iterator.next();
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index 647e3d6ff7..1f0125412a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -75,6 +75,7 @@ public class PopConsumerService extends ServiceThread {
private final AtomicBoolean consumerRunning;
private final BrokerConfig brokerConfig;
private final BrokerController brokerController;
+ private final AtomicLong currentTime;
private final AtomicLong lastCleanupLockTime;
private final PopConsumerCache popConsumerCache;
private final PopConsumerKVStore popConsumerStore;
@@ -88,6 +89,7 @@ public class PopConsumerService extends ServiceThread {
this.consumerRunning = new AtomicBoolean(false);
this.requestCountTable = new ConcurrentHashMap<>();
+ this.currentTime = new AtomicLong(TimeUnit.SECONDS.toMillis(3));
this.lastCleanupLockTime = new AtomicLong(System.currentTimeMillis());
this.consumerLockService = new
PopConsumerLockService(TimeUnit.MINUTES.toMillis(2));
this.popConsumerStore = new PopConsumerRocksdbStore(Paths.get(
@@ -195,12 +197,27 @@ public class PopConsumerService extends ServiceThread {
return context;
}
+ public Long getPopOffset(String groupId, String topicId, int queueId) {
+ Long resetOffset =
+
this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topicId,
groupId, queueId);
+ if (resetOffset != null) {
+ this.clearCache(groupId, topicId, queueId);
+
this.brokerController.getConsumerOrderInfoManager().clearBlock(topicId,
groupId, queueId);
+ this.brokerController.getConsumerOffsetManager()
+ .commitOffset("ResetPopOffset", groupId, topicId, queueId,
resetOffset);
+ }
+ return resetOffset;
+ }
+
public CompletableFuture<GetMessageResult> getMessageAsync(String
clientHost,
String groupId, String topicId, int queueId, long offset, int
batchSize, MessageFilter filter) {
log.debug("PopConsumerService getMessageAsync, groupId={}, topicId={},
queueId={}, offset={}, batchSize={}, filter={}",
groupId, topicId, offset, queueId, batchSize, filter != null);
+ Long resetOffset = this.getPopOffset(groupId, topicId, queueId);
+ final long currentOffset = resetOffset != null ? resetOffset : offset;
+
CompletableFuture<GetMessageResult> getMessageFuture =
brokerController.getMessageStore().getMessageAsync(groupId,
topicId, queueId, offset, batchSize, filter);
@@ -223,7 +240,7 @@ public class PopConsumerService extends ServiceThread {
log.warn("PopConsumerService getMessageAsync, initial offset
because store is no correct, " +
"groupId={}, topicId={}, queueId={}, batchSize={},
offset={}->{}",
- groupId, topicId, queueId, batchSize, offset,
result.getNextBeginOffset());
+ groupId, topicId, queueId, batchSize, currentOffset,
result.getNextBeginOffset());
return brokerController.getMessageStore().getMessageAsync(
groupId, topicId, queueId, result.getNextBeginOffset(),
batchSize, filter);
@@ -482,10 +499,12 @@ public class PopConsumerService extends ServiceThread {
}
}
- public long revive(long currentTime, int maxCount) {
+ public long revive(AtomicLong currentTime, int maxCount) {
Stopwatch stopwatch = Stopwatch.createStarted();
- List<PopConsumerRecord> consumerRecords =
- this.popConsumerStore.scanExpiredRecords(currentTime, maxCount);
+ long upperTime = System.currentTimeMillis() - 50L;
+ List<PopConsumerRecord> consumerRecords =
this.popConsumerStore.scanExpiredRecords(
+ currentTime.get() - TimeUnit.SECONDS.toMillis(3), upperTime,
maxCount);
+ long scanCostTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
Queue<PopConsumerRecord> failureList = new LinkedBlockingQueue<>();
List<CompletableFuture<?>> futureList = new
ArrayList<>(consumerRecords.size());
@@ -497,9 +516,9 @@ public class PopConsumerService extends ServiceThread {
long backoffInterval = 1000L *
REWRITE_INTERVALS_IN_SECONDS[
Math.min(REWRITE_INTERVALS_IN_SECONDS.length,
record.getAttemptTimes())];
long nextInvisibleTime = record.getInvisibleTime() +
backoffInterval;
- PopConsumerRecord retryRecord = new
PopConsumerRecord(record.getPopTime(), record.getGroupId(),
- record.getTopicId(), record.getQueueId(),
record.getRetryFlag(), nextInvisibleTime,
- record.getOffset(), record.getAttemptId());
+ PopConsumerRecord retryRecord = new
PopConsumerRecord(System.currentTimeMillis(),
+ record.getGroupId(), record.getTopicId(),
record.getQueueId(),
+ record.getRetryFlag(), nextInvisibleTime,
record.getOffset(), record.getAttemptId());
retryRecord.setAttemptTimes(record.getAttemptTimes() +
1);
failureList.add(retryRecord);
log.warn("PopConsumerService revive backoff retry,
record={}", retryRecord);
@@ -513,14 +532,20 @@ public class PopConsumerService extends ServiceThread {
CompletableFuture.allOf(futureList.toArray(new
CompletableFuture[0])).join();
this.popConsumerStore.writeRecords(new ArrayList<>(failureList));
this.popConsumerStore.deleteRecords(consumerRecords);
+ currentTime.set(consumerRecords.isEmpty() ?
+ upperTime : consumerRecords.get(consumerRecords.size() -
1).getVisibilityTimeout());
if (brokerConfig.isEnablePopBufferMerge()) {
- log.info("PopConsumerService, key size={}, cache size={}, revive
count={}, failure count={}, cost={}ms",
- popConsumerCache.getCacheKeySize(),
popConsumerCache.getCacheSize(), consumerRecords.size(),
- failureList.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ log.info("PopConsumerService, key size={}, cache size={}, revive
count={}, failure count={}, " +
+ "behindInMillis={}, scanInMillis={}, costInMillis={}",
+ popConsumerCache.getCacheKeySize(),
popConsumerCache.getCacheSize(),
+ consumerRecords.size(), failureList.size(), upperTime -
currentTime.get(),
+ scanCostTime, stopwatch.elapsed(TimeUnit.MILLISECONDS));
} else {
- log.info("PopConsumerService, revive count={}, failure count={},
cost={}ms",
- consumerRecords.size(), failureList.size(),
stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ log.info("PopConsumerService, revive count={}, failure count={}, "
+
+ "behindInMillis={}, scanInMillis={}, costInMillis={}",
+ consumerRecords.size(), failureList.size(), upperTime -
currentTime.get(),
+ scanCostTime, stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
return consumerRecords.size();
@@ -588,11 +613,6 @@ public class PopConsumerService extends ServiceThread {
PutMessageResult putMessageResult =
brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
- if (brokerConfig.isEnablePopLog()) {
- log.debug("PopConsumerService revive retry msg, put status={},
ck={}, delay={}ms",
- putMessageResult, JSON.toJSONString(record),
System.currentTimeMillis() - record.getVisibilityTimeout());
- }
-
if (putMessageResult.getAppendMessageResult() == null ||
putMessageResult.getAppendMessageResult().getStatus() !=
AppendMessageStatus.PUT_OK) {
log.error("PopConsumerService revive retry msg error, put
status={}, ck={}, delay={}ms",
@@ -616,7 +636,7 @@ public class PopConsumerService extends ServiceThread {
while (true) {
try {
List<PopConsumerRecord> consumerRecords =
this.popConsumerStore.scanExpiredRecords(
- Long.MAX_VALUE,
brokerConfig.getPopReviveMaxReturnSizePerRead());
+ 0, Long.MAX_VALUE,
brokerConfig.getPopReviveMaxReturnSizePerRead());
if (consumerRecords == null || consumerRecords.isEmpty()) {
break;
}
@@ -695,7 +715,7 @@ public class PopConsumerService extends ServiceThread {
while (!isStopped()) {
try {
// to prevent concurrency issues during read and write
operations
- long reviveCount = this.revive(System.currentTimeMillis() -
50L,
+ long reviveCount = this.revive(this.currentTime,
brokerConfig.getPopReviveMaxReturnSizePerRead());
long current = System.currentTimeMillis();
@@ -704,7 +724,7 @@ public class PopConsumerService extends ServiceThread {
this.lastCleanupLockTime.set(current);
}
- if (reviveCount == 0) {
+ if (reviveCount <
brokerConfig.getPopReviveMaxReturnSizePerRead()) {
this.waitForRunning(500);
}
} catch (Exception e) {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java
index 5facaeb55f..3c2b190d1c 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerRocksdbStoreTest.java
@@ -16,18 +16,26 @@
*/
package org.apache.rocketmq.broker.pop;
+import com.google.common.base.Stopwatch;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
+import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,18 +93,101 @@ public class PopConsumerRocksdbStoreTest {
.collect(Collectors.toList()));
List<PopConsumerRecord> consumerRecords =
- consumerStore.scanExpiredRecords(20002, 2);
+ consumerStore.scanExpiredRecords(0, 20002, 2);
Assert.assertEquals(2, consumerRecords.size());
consumerStore.deleteRecords(consumerRecords);
- consumerRecords = consumerStore.scanExpiredRecords(20002, 2);
+ consumerRecords = consumerStore.scanExpiredRecords(0, 20003, 2);
Assert.assertEquals(1, consumerRecords.size());
consumerStore.deleteRecords(consumerRecords);
- consumerRecords = consumerStore.scanExpiredRecords(20004, 3);
+ consumerRecords = consumerStore.scanExpiredRecords(0, 20005, 3);
Assert.assertEquals(2, consumerRecords.size());
consumerStore.shutdown();
deleteStoreDirectory(filePath);
}
+
+ private long getDirectorySizeRecursive(File directory) {
+ long size = 0;
+ File[] files = directory.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ if (file.isFile()) {
+ size += file.length();
+ } else if (file.isDirectory()) {
+ size += getDirectorySizeRecursive(file);
+ }
+ }
+ }
+ return size;
+ }
+
+ @Test
+ @Ignore
+ @SuppressWarnings("ConstantValue")
+ public void tombstoneDeletionTest() throws IllegalAccessException,
NoSuchFieldException {
+ PopConsumerRocksdbStore rocksdbStore = new
PopConsumerRocksdbStore(getRandomStorePath());
+ rocksdbStore.start();
+
+ int iterCount = 1000 * 1000;
+ boolean useSeekFirstDelete = false;
+ Field dbField = AbstractRocksDBStorage.class.getDeclaredField("db");
+ dbField.setAccessible(true);
+ RocksDB rocksDB = (RocksDB) dbField.get(rocksdbStore);
+
+ long currentTime = 0L;
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ for (int i = 0; i < iterCount; i++) {
+ List<PopConsumerRecord> records = new ArrayList<>();
+ for (int j = 0; j < 1000; j++) {
+ PopConsumerRecord record = getConsumerRecord();
+ record.setPopTime((long) i * iterCount + j);
+ record.setGroupId("GroupTest");
+ record.setTopicId("TopicTest");
+ record.setQueueId(i % 10);
+ record.setRetryFlag(0);
+ record.setInvisibleTime(TimeUnit.SECONDS.toMillis(30));
+ record.setOffset(i);
+ records.add(record);
+ }
+ rocksdbStore.writeRecords(records);
+
+ long start = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ List<PopConsumerRecord> deleteList = new ArrayList<>();
+ if (useSeekFirstDelete) {
+ try (RocksIterator iterator =
rocksDB.newIterator(rocksdbStore.columnFamilyHandle)) {
+ iterator.seekToFirst();
+ if (i % 10 == 0) {
+ long fileSize = getDirectorySizeRecursive(new
File(rocksdbStore.getFilePath()));
+ log.info("DirectorySize={}, Cost={}ms",
+ MessageStoreUtil.toHumanReadable(fileSize),
stopwatch.elapsed(TimeUnit.MILLISECONDS) - start);
+ }
+ while (iterator.isValid() && deleteList.size() < 1024) {
+
deleteList.add(PopConsumerRecord.decode(iterator.value()));
+ iterator.next();
+ }
+ }
+ } else {
+ long upper = System.currentTimeMillis();
+ deleteList = rocksdbStore.scanExpiredRecords(currentTime,
upper, 800);
+ if (!deleteList.isEmpty()) {
+ currentTime = deleteList.get(deleteList.size() -
1).getVisibilityTimeout();
+ }
+ long scanCost = stopwatch.elapsed(TimeUnit.MILLISECONDS) -
start;
+ if (i % 100 == 0) {
+ long fileSize = getDirectorySizeRecursive(new
File(rocksdbStore.getFilePath()));
+ long seekTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ try (RocksIterator iterator =
rocksDB.newIterator(rocksdbStore.columnFamilyHandle)) {
+ iterator.seekToFirst();
+ }
+ log.info("DirectorySize={}, Cost={}ms,
SeekFirstCost={}ms", MessageStoreUtil.toHumanReadable(fileSize),
+ scanCost, stopwatch.elapsed(TimeUnit.MILLISECONDS) -
seekTime);
+ }
+ }
+ rocksdbStore.deleteRecords(deleteList);
+ }
+ rocksdbStore.shutdown();
+ deleteStoreDirectory(rocksdbStore.getFilePath());
+ }
}
\ No newline at end of file
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
index b77c170c8c..2b930d5852 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
@@ -371,17 +372,17 @@ public class PopConsumerServiceTest {
Mockito.doReturn(CompletableFuture.completedFuture(null))
.when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class));
- consumerServiceSpy.revive(20 * 1000, 1);
+ consumerServiceSpy.revive(new AtomicLong(20 * 1000), 1);
Mockito.doReturn(CompletableFuture.completedFuture(
Triple.of(null, "GetMessageResult is null", false)))
.when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class));
- consumerServiceSpy.revive(20 * 1000, 1);
+ consumerServiceSpy.revive(new AtomicLong(20 * 1000), 1);
Mockito.doReturn(CompletableFuture.completedFuture(
Triple.of(Mockito.mock(MessageExt.class), null, false)))
.when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class));
- consumerServiceSpy.revive(20 * 1000, 1);
+ consumerServiceSpy.revive(new AtomicLong(20 * 1000), 1);
consumerService.shutdown();
}
@@ -412,11 +413,11 @@ public class PopConsumerServiceTest {
long visibleTimestamp = popTime + invisibleTime;
// revive fails
- Assert.assertEquals(1, consumerServiceSpy.revive(visibleTimestamp, 1));
+ Assert.assertEquals(1, consumerServiceSpy.revive(new
AtomicLong(visibleTimestamp), 1));
// should be invisible now
- Assert.assertEquals(0,
consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp,
1).size());
+ Assert.assertEquals(0,
consumerService.getPopConsumerStore().scanExpiredRecords(0, visibleTimestamp,
1).size());
// will be visible again in 10 seconds
- Assert.assertEquals(1,
consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp + 10
* 1000, 1).size());
+ Assert.assertEquals(1,
consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp,
System.currentTimeMillis() + visibleTimestamp + 10 * 1000, 1).size());
consumerService.shutdown();
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index 48ba4b8086..347d92304d 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -365,7 +365,7 @@ public abstract class AbstractRocksDBStorage {
}
if (postLoad()) {
this.loaded = true;
- LOGGER.info("RocksDB[{}] starts OK", this.dbPath);
+ LOGGER.info("RocksDB [{}] starts OK", this.dbPath);
this.closed = false;
return true;
} else {
@@ -437,9 +437,9 @@ public abstract class AbstractRocksDBStorage {
this.options = null;
this.loaded = false;
- LOGGER.info("shutdown OK. {}", this.dbPath);
+ LOGGER.info("RocksDB shutdown OK. {}", this.dbPath);
} catch (Exception e) {
- LOGGER.error("shutdown Failed. {}", this.dbPath, e);
+ LOGGER.error("RocksDB shutdown failed. {}", this.dbPath, e);
return false;
}
return true;