This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 94d9185f2c [ISSUE #8895] Fix NPE when broker shutdown and optimize the
log #9094
94d9185f2c is described below
commit 94d9185f2c80bcfb9ffca03a65080f48060f0a39
Author: qianye <[email protected]>
AuthorDate: Tue Jan 14 14:50:56 2025 +0800
[ISSUE #8895] Fix NPE when broker shutdown and optimize the log #9094
---
.../common/config/AbstractRocksDBStorage.java | 20 ++++++++++----------
.../apache/rocketmq/store/DefaultMessageStore.java | 4 +---
.../store/queue/RocksDBConsumeQueueOffsetTable.java | 2 +-
3 files changed, 12 insertions(+), 14 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index 347d92304d..6c0bce5929 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -18,7 +18,16 @@ package org.apache.rocketmq.common.config;
import com.google.common.collect.Maps;
import io.netty.buffer.PooledByteBufAllocator;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.ThreadUtils;
@@ -43,16 +52,6 @@ import org.rocksdb.Status;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
public abstract class AbstractRocksDBStorage {
protected static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
@@ -381,6 +380,7 @@ public abstract class AbstractRocksDBStorage {
public synchronized boolean shutdown() {
try {
if (!this.loaded) {
+ LOGGER.info("RocksDBStorage is not loaded, shutdown OK.
dbPath={}, readOnly={}", this.dbPath, this.readOnly);
return true;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 9d3c46a438..187a0729e8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -517,11 +517,9 @@ public class DefaultMessageStore implements MessageStore {
if (this.compactionService != null) {
this.compactionService.shutdown();
}
-
- if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) {
+ if (messageStoreConfig.isRocksdbCQDoubleWriteEnable() &&
this.rocksDBMessageStore != null) {
this.rocksDBMessageStore.consumeQueueStore.shutdown();
}
-
this.flushConsumeQueueService.shutdown();
this.allocateMappedFileService.shutdown();
this.storeCheckpoint.flush();
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
index a91ae5e244..cb989852fb 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
@@ -144,7 +144,7 @@ public class RocksDBConsumeQueueOffsetTable {
Function<OffsetEntry, Boolean> predicate = entry -> entry.type ==
OffsetEntryType.MAXIMUM;
Consumer<OffsetEntry> fn = entry -> {
topicQueueMaxCqOffset.putIfAbsent(entry.topic + "-" +
entry.queueId, entry.offset);
- ROCKSDB_LOG.info("Max {}:{} --> {}|{}", entry.topic,
entry.queueId, entry.offset, entry.commitLogOffset);
+ log.info("LoadMaxConsumeQueueOffsets Max {}:{} --> {}|{}",
entry.topic, entry.queueId, entry.offset, entry.commitLogOffset);
};
try {
forEach(predicate, fn);