This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e60b67afe7 [ISSUE #9838] IndexStoreService use forceShutdown when disk 
is not writable (#9839)
e60b67afe7 is described below

commit e60b67afe7b1e27c4b7583c5b51ce18eadc244fb
Author: rongtong <[email protected]>
AuthorDate: Thu Nov 13 17:25:09 2025 +0800

    [ISSUE #9838] IndexStoreService use forceShutdown when disk is not writable 
(#9839)
    
    * Update store and tieredstore implementation
    
    Change-Id: I29c931148110c4238ce9a256ab2bf69d7b6139a8
    Co-developed-by: Cursor <[email protected]>
    
    * Fix UT NPE
    
    Change-Id: I699ec538118166743308af9e78da50f2aa1b56b3
    
    ---------
    
    Co-authored-by: RongtongJin <[email protected]>
---
 .../src/main/java/org/apache/rocketmq/store/RunningFlags.java | 11 ++++++++++-
 .../org/apache/rocketmq/store/logfile/DefaultMappedFile.java  |  2 +-
 .../org/apache/rocketmq/tieredstore/TieredMessageStore.java   |  9 +++++++--
 .../org/apache/rocketmq/tieredstore/index/IndexService.java   |  7 +++++++
 .../apache/rocketmq/tieredstore/index/IndexStoreService.java  |  5 +++++
 5 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java 
b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
index 88b398a77e..f415487bd5 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
@@ -85,6 +85,15 @@ public class RunningFlags {
         return false;
     }
 
+    public boolean isStoreWriteable() {
+        if ((this.flagBits & NOT_WRITEABLE_BIT) == 0) {
+            return true;
+        }
+
+        return false;
+    }
+
+
     //for consume queue, just ignore the DISK_FULL_BIT
     public boolean isCQWriteable() {
         if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT 
| WRITE_INDEX_FILE_ERROR_BIT | LOGIC_DISK_FULL_BIT)) == 0) {
@@ -94,7 +103,7 @@ public class RunningFlags {
         return false;
     }
 
-    public boolean getAndMakeNotWriteable() {
+    public boolean getAndMakeStoreNotWriteable() {
         boolean result = this.isWriteable();
         if (result) {
             this.flagBits |= NOT_WRITEABLE_BIT;
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index 0c16d705bd..b28b7a8aef 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -592,7 +592,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
         if (runningFlags == null) {
             return false;
         }
-        return runningFlags.getAndMakeNotWriteable();
+        return runningFlags.getAndMakeStoreNotWriteable();
     }
 
     public boolean isWriteable() {
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 19b587fa32..8720cb9412 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -338,7 +338,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                 .thenApply(time -> {
                     Attributes latencyAttributes = 
TieredStoreMetricsManager.newAttributesBuilder()
                         .put(TieredStoreMetricsConstant.LABEL_OPERATION,
-                                
TieredStoreMetricsConstant.OPERATION_API_GET_TIME_BY_OFFSET)
+                            
TieredStoreMetricsConstant.OPERATION_API_GET_TIME_BY_OFFSET)
                         .put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
                         .build();
                     
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS),
 latencyAttributes);
@@ -465,8 +465,13 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
             dispatcher.shutdown();
         }
         if (indexService != null) {
-            indexService.shutdown();
+            if (defaultStore.getRunningFlags() != null && 
defaultStore.getRunningFlags().isStoreWriteable()) {
+                indexService.shutdown();
+            } else {
+                indexService.forceShutdown();
+            }
         }
+
         if (flatFileStore != null) {
             flatFileStore.shutdown();
         }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
index a4ea7e78a8..11fb1482c1 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexService.java
@@ -60,6 +60,13 @@ public interface IndexService {
      */
     void shutdown();
 
+    /**
+     * Force shutdown the index service.
+     */
+    default void forceShutdown() {
+        shutdown();
+    };
+
     /**
      * Destroys the index service and releases all resources.
      */
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 2385628ed4..07609bbab9 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -414,6 +414,11 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
         }
     }
 
+    @Override
+    public void forceShutdown() {
+        super.shutdown();
+    }
+
     @Override
     public void run() {
         while (!this.isStopped()) {

Reply via email to