This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e60b67afe7 [ISSUE #9838] IndexStoreService use forceShutdown when disk
is not writable (#9839)
e60b67afe7 is described below
commit e60b67afe7b1e27c4b7583c5b51ce18eadc244fb
Author: rongtong <[email protected]>
AuthorDate: Thu Nov 13 17:25:09 2025 +0800
[ISSUE #9838] IndexStoreService use forceShutdown when disk is not writable
(#9839)
* Update store and tieredstore implementation
Change-Id: I29c931148110c4238ce9a256ab2bf69d7b6139a8
Co-developed-by: Cursor <[email protected]>
* Fix UT NPE
Change-Id: I699ec538118166743308af9e78da50f2aa1b56b3
---------
Co-authored-by: RongtongJin <[email protected]>
---
.../src/main/java/org/apache/rocketmq/store/RunningFlags.java | 11 ++++++++++-
.../org/apache/rocketmq/store/logfile/DefaultMappedFile.java | 2 +-
.../org/apache/rocketmq/tieredstore/TieredMessageStore.java | 9 +++++++--
.../org/apache/rocketmq/tieredstore/index/IndexService.java | 7 +++++++
.../apache/rocketmq/tieredstore/index/IndexStoreService.java | 5 +++++
5 files changed, 30 insertions(+), 4 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
index 88b398a77e..f415487bd5 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
@@ -85,6 +85,15 @@ public class RunningFlags {
return false;
}
+ public boolean isStoreWriteable() {
+ if ((this.flagBits & NOT_WRITEABLE_BIT) == 0) {
+ return true;
+ }
+
+ return false;
+ }
+
+
//for consume queue, just ignore the DISK_FULL_BIT
public boolean isCQWriteable() {
if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT
| WRITE_INDEX_FILE_ERROR_BIT | LOGIC_DISK_FULL_BIT)) == 0) {
@@ -94,7 +103,7 @@ public class RunningFlags {
return false;
}
- public boolean getAndMakeNotWriteable() {
+ public boolean getAndMakeStoreNotWriteable() {
boolean result = this.isWriteable();
if (result) {
this.flagBits |= NOT_WRITEABLE_BIT;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index 0c16d705bd..b28b7a8aef 100644
---
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -592,7 +592,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
if (runningFlags == null) {
return false;
}
- return runningFlags.getAndMakeNotWriteable();
+ return runningFlags.getAndMakeStoreNotWriteable();
}
public boolean isWriteable() {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 19b587fa32..8720cb9412 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -338,7 +338,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
.thenApply(time -> {
Attributes latencyAttributes =
TieredStoreMetricsManager.newAttributesBuilder()
.put(TieredStoreMetricsConstant.LABEL_OPERATION,
-
TieredStoreMetricsConstant.OPERATION_API_GET_TIME_BY_OFFSET)
+
TieredStoreMetricsConstant.OPERATION_API_GET_TIME_BY_OFFSET)
.put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
.build();
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS),
latencyAttributes);
@@ -465,8 +465,13 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
dispatcher.shutdown();
}
if (indexService != null) {
- indexService.shutdown();
+ if (defaultStore.getRunningFlags() != null &&
defaultStore.getRunningFlags().isStoreWriteable()) {
+ indexService.shutdown();
+ } else {
+ indexService.forceShutdown();
+ }
}
+
if (flatFileStore != null) {
flatFileStore.shutdown();
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
index a4ea7e78a8..11fb1482c1 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
@@ -60,6 +60,13 @@ public interface IndexService {
*/
void shutdown();
+ /**
+ * Force shutdown the index service.
+ */
+ default void forceShutdown() {
+ shutdown();
+ };
+
/**
* Destroys the index service and releases all resources.
*/
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 2385628ed4..07609bbab9 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -414,6 +414,11 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
}
}
+ @Override
+ public void forceShutdown() {
+ super.shutdown();
+ }
+
@Override
public void run() {
while (!this.isStopped()) {