This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1d9b02ca46 [ISSUE #9707] Integrate RunningFlags with MappedFile system 
for better error handling and state management  (#9708)
1d9b02ca46 is described below

commit 1d9b02ca468c2b892ffba27b012be19b81c9e0e4
Author: guyinyou <[email protected]>
AuthorDate: Tue Sep 16 16:58:42 2025 +0800

    [ISSUE #9707] Integrate RunningFlags with MappedFile system for better 
error handling and state management  (#9708)
    
    * Add RunningFlags support to MappedFileQueue
    
    - Integrate RunningFlags throughout MappedFileQueue hierarchy
    - Add writeable state checking and error handling in DefaultMappedFile
    - Update MappedFile interface and constructors to support RunningFlags
    - Implement proper error state management during flush operations
    
    * fix ut
    
    * fix ut
    
    ---------
    
    Co-authored-by: guyinyou <[email protected]>
---
 .../rocketmq/store/AllocateMappedFileService.java  |  6 +--
 .../java/org/apache/rocketmq/store/CommitLog.java  |  3 +-
 .../org/apache/rocketmq/store/MappedFileQueue.java | 23 +++++---
 .../rocketmq/store/MultiPathMappedFileQueue.java   |  9 +++-
 .../rocketmq/store/logfile/DefaultMappedFile.java  | 61 +++++++++++++++++-----
 .../apache/rocketmq/store/logfile/MappedFile.java  |  3 +-
 6 files changed, 79 insertions(+), 26 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java 
b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
index a56fa46157..970e9b05ee 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
@@ -176,13 +176,13 @@ public class AllocateMappedFileService extends 
ServiceThread {
                 if (messageStore.isTransientStorePoolEnable()) {
                     try {
                         mappedFile = 
ServiceLoader.load(MappedFile.class).iterator().next();
-                        mappedFile.init(req.getFilePath(), req.getFileSize(), 
messageStore.getTransientStorePool());
+                        mappedFile.init(req.getFilePath(), req.getFileSize(), 
messageStore.getRunningFlags(), messageStore.getTransientStorePool());
                     } catch (RuntimeException e) {
                         log.warn("Use default implementation.");
-                        mappedFile = new DefaultMappedFile(req.getFilePath(), 
req.getFileSize(), messageStore.getTransientStorePool(), writeWithoutMmap);
+                        mappedFile = new DefaultMappedFile(req.getFilePath(), 
req.getFileSize(), messageStore.getRunningFlags(), 
messageStore.getTransientStorePool(), writeWithoutMmap);
                     }
                 } else {
-                    mappedFile = new DefaultMappedFile(req.getFilePath(), 
req.getFileSize(), writeWithoutMmap);
+                    mappedFile = new DefaultMappedFile(req.getFilePath(), 
req.getFileSize(), messageStore.getRunningFlags(), writeWithoutMmap);
                 }
 
                 long elapsedTime = 
UtilAll.computeElapsedTimeMilliseconds(beginTime);
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 3b26afcc09..38894abc81 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -114,11 +114,12 @@ public class CommitLog implements Swappable {
         if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) {
             this.mappedFileQueue = new 
MultiPathMappedFileQueue(messageStore.getMessageStoreConfig(),
                 
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
-                messageStore.getAllocateMappedFileService(), 
this::getFullStorePaths);
+                messageStore.getAllocateMappedFileService(), 
this::getFullStorePaths, messageStore.getRunningFlags());
         } else {
             this.mappedFileQueue = new MappedFileQueue(storePath,
                 
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
                 messageStore.getAllocateMappedFileService(),
+                messageStore.getRunningFlags(),
                 messageStore.getMessageStoreConfig().isWriteWithoutMmap());
         }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index 320e842154..70cc65f8f6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -53,7 +53,9 @@ public class MappedFileQueue implements Swappable {
     protected long committedWhere = 0;
 
     protected volatile long storeTimestamp = 0;
-    
+
+    protected RunningFlags runningFlags;
+
     /**
      * Configuration flag to use RandomAccessFile instead of MappedByteBuffer 
for writing
      */
@@ -61,16 +63,25 @@ public class MappedFileQueue implements Swappable {
 
     public MappedFileQueue(final String storePath, int mappedFileSize,
         AllocateMappedFileService allocateMappedFileService) {
-        this.storePath = storePath;
-        this.mappedFileSize = mappedFileSize;
-        this.allocateMappedFileService = allocateMappedFileService;
+        this(storePath, mappedFileSize, allocateMappedFileService, null, 
false);
+    }
+
+    public MappedFileQueue(final String storePath, int mappedFileSize,
+        AllocateMappedFileService allocateMappedFileService, RunningFlags 
runningFlags) {
+        this(storePath, mappedFileSize, allocateMappedFileService, 
runningFlags, false);
     }
 
     public MappedFileQueue(final String storePath, int mappedFileSize,
         AllocateMappedFileService allocateMappedFileService, boolean 
writeWithoutMmap) {
+        this(storePath, mappedFileSize, allocateMappedFileService, null, 
writeWithoutMmap);
+    }
+
+    public MappedFileQueue(final String storePath, int mappedFileSize,
+        AllocateMappedFileService allocateMappedFileService, RunningFlags 
runningFlags, boolean writeWithoutMmap) {
         this.storePath = storePath;
         this.mappedFileSize = mappedFileSize;
         this.allocateMappedFileService = allocateMappedFileService;
+        this.runningFlags = runningFlags;
         this.writeWithoutMmap = writeWithoutMmap;
     }
 
@@ -279,7 +290,7 @@ public class MappedFileQueue implements Swappable {
             }
 
             try {
-                MappedFile mappedFile = new DefaultMappedFile(file.getPath(), 
mappedFileSize, writeWithoutMmap);
+                MappedFile mappedFile = new DefaultMappedFile(file.getPath(), 
mappedFileSize, runningFlags, writeWithoutMmap);
 
                 mappedFile.setWrotePosition(this.mappedFileSize);
                 mappedFile.setFlushedPosition(this.mappedFileSize);
@@ -369,7 +380,7 @@ public class MappedFileQueue implements Swappable {
                     nextNextFilePath, this.mappedFileSize);
         } else {
             try {
-                mappedFile = new DefaultMappedFile(nextFilePath, 
this.mappedFileSize, this.writeWithoutMmap);
+                mappedFile = new DefaultMappedFile(nextFilePath, 
this.mappedFileSize, runningFlags, this.writeWithoutMmap);
             } catch (IOException e) {
                 log.error("create mappedFile exception", e);
             }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
index 72ec8820a6..fcae4948c6 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
@@ -36,10 +36,15 @@ public class MultiPathMappedFileQueue extends 
MappedFileQueue {
     private final MessageStoreConfig config;
     private final Supplier<Set<String>> fullStorePathsSupplier;
 
+    public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int 
mappedFileSize,
+        AllocateMappedFileService allocateMappedFileService,
+        Supplier<Set<String>> fullStorePathsSupplier) {
+        this(messageStoreConfig, mappedFileSize, allocateMappedFileService, 
fullStorePathsSupplier, null);
+    }
     public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int 
mappedFileSize,
                                     AllocateMappedFileService 
allocateMappedFileService,
-                                    Supplier<Set<String>> 
fullStorePathsSupplier) {
-        super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, 
allocateMappedFileService, 
+                                    Supplier<Set<String>> 
fullStorePathsSupplier, RunningFlags runningFlags) {
+        super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, 
allocateMappedFileService, runningFlags,
               messageStoreConfig.isWriteWithoutMmap());
         this.config = messageStoreConfig;
         this.fullStorePathsSupplier = fullStorePathsSupplier;
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index b2d89108b4..f2383993d4 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -53,6 +53,7 @@ import org.apache.rocketmq.store.AppendMessageResult;
 import org.apache.rocketmq.store.AppendMessageStatus;
 import org.apache.rocketmq.store.CompactionAppendMsgCallback;
 import org.apache.rocketmq.store.PutMessageContext;
+import org.apache.rocketmq.store.RunningFlags;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.TransientStorePool;
 import org.apache.rocketmq.store.config.FlushDiskType;
@@ -121,6 +122,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
     private static int maxSharedNum = 16;
     private static final SharedByteBuffer[] SHARED_BYTE_BUFFER;
 
+    protected RunningFlags runningFlags;
     static class SharedByteBuffer {
         private final ReentrantLock lock;
         private final ByteBuffer buffer;
@@ -173,24 +175,36 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
     }
 
     public DefaultMappedFile(final String fileName, final int fileSize) throws 
IOException {
-        init(fileName, fileSize);
+        this(fileName, fileSize, null);
     }
 
-    public DefaultMappedFile(final String fileName, final int fileSize,
+    public DefaultMappedFile(final String fileName, final int fileSize, 
boolean writeWithoutMmap) throws IOException {
+        this(fileName, fileSize, null, null, writeWithoutMmap);
+    }
+
+    public DefaultMappedFile(final String fileName, final int fileSize, 
RunningFlags runningFlags) throws IOException {
+        this(fileName, fileSize, runningFlags, null, false);
+    }
+
+    public DefaultMappedFile(final String fileName, final int fileSize, final 
RunningFlags runningFlags,
         final TransientStorePool transientStorePool) throws IOException {
-        init(fileName, fileSize, transientStorePool);
+        this(fileName, fileSize, runningFlags, transientStorePool, false);
     }
 
-    public DefaultMappedFile(final String fileName, final int fileSize,
+    public DefaultMappedFile(final String fileName, final int fileSize, final 
RunningFlags runningFlags,
         final boolean writeWithoutMmap) throws IOException {
-        this.writeWithoutMmap = writeWithoutMmap;
-        init(fileName, fileSize);
+        this(fileName, fileSize, runningFlags, null, writeWithoutMmap);
     }
 
     public DefaultMappedFile(final String fileName, final int fileSize,
+        final TransientStorePool transientStorePool, final boolean 
writeWithoutMmap) throws IOException {
+        this(fileName, fileSize, null, transientStorePool, writeWithoutMmap);
+    }
+
+    public DefaultMappedFile(final String fileName, final int fileSize, final 
RunningFlags runningFlags,
         final TransientStorePool transientStorePool, final boolean 
writeWithoutMmap) throws IOException {
         this.writeWithoutMmap = writeWithoutMmap;
-        init(fileName, fileSize, transientStorePool);
+        init(fileName, fileSize, runningFlags, transientStorePool);
     }
 
     public static int getTotalMappedFiles() {
@@ -202,30 +216,30 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
     }
 
     @Override
-    public void init(final String fileName, final int fileSize,
+    public void init(final String fileName, final int fileSize, final 
RunningFlags runningFlags,
         final TransientStorePool transientStorePool) throws IOException {
-        init(fileName, fileSize);
+        init(fileName, fileSize, runningFlags);
         if (transientStorePool != null) {
             this.writeBuffer = transientStorePool.borrowBuffer();
             this.transientStorePool = transientStorePool;
         }
     }
 
-    private void init(final String fileName, final int fileSize) throws 
IOException {
+    private void init(final String fileName, final int fileSize, final 
RunningFlags runningFlags) throws IOException {
         this.fileName = fileName;
         this.fileSize = fileSize;
         this.file = new File(fileName);
         this.fileFromOffset = Long.parseLong(this.file.getName());
+        this.runningFlags = runningFlags;
         boolean ok = false;
 
         UtilAll.ensureDirOK(this.file.getParent());
 
         try {
-            this.fileChannel = new RandomAccessFile(this.file, 
"rw").getChannel();
+            this.randomAccessFile = new RandomAccessFile(this.file, "rw");
+            this.fileChannel = this.randomAccessFile.getChannel();
 
             if (writeWithoutMmap) {
-                // Use RandomAccessFile for writing instead of MappedByteBuffer
-                this.randomAccessFile = new RandomAccessFile(this.file, "rw");
                 // Still create MappedByteBuffer for reading operations
                 this.mappedByteBuffer = 
this.fileChannel.map(MapMode.READ_ONLY, 0, fileSize);
             } else {
@@ -522,6 +536,10 @@ public class DefaultMappedFile extends AbstractMappedFile {
             if (this.hold()) {
                 int value = getReadPosition();
 
+                if (!isWriteable()) {
+                    return this.getFlushedPosition();
+                }
+
                 try {
                     this.mappedByteBufferAccessCountSinceLastSwap++;
 
@@ -538,6 +556,9 @@ public class DefaultMappedFile extends AbstractMappedFile {
                     }
                     this.lastFlushTime = System.currentTimeMillis();
                 } catch (Throwable e) {
+                    if (e instanceof IOException) {
+                        getAndMakeNotWriteable();
+                    }
                     log.error("Error occurred when force data to disk.", e);
                 }
 
@@ -597,6 +618,20 @@ public class DefaultMappedFile extends AbstractMappedFile {
         }
     }
 
+    public boolean getAndMakeNotWriteable() {
+        if (runningFlags == null) {
+            return false;
+        }
+        return runningFlags.getAndMakeNotWriteable();
+    }
+
+    public boolean isWriteable() {
+        if (runningFlags == null) {
+            return true;
+        }
+        return runningFlags.isWriteable();
+    }
+
     private boolean isAbleToFlush(final int flushLeastPages) {
         int flush = FLUSHED_POSITION_UPDATER.get(this);
         int write = getReadPosition();
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java 
b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
index d1f11959aa..0985ff1edc 100644
--- a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.store.AppendMessageCallback;
 import org.apache.rocketmq.store.AppendMessageResult;
 import org.apache.rocketmq.store.CompactionAppendMsgCallback;
 import org.apache.rocketmq.store.PutMessageContext;
+import org.apache.rocketmq.store.RunningFlags;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.TransientStorePool;
 import org.apache.rocketmq.store.config.FlushDiskType;
@@ -368,7 +369,7 @@ public interface MappedFile {
      * @param transientStorePool transient store pool
      * @throws IOException
      */
-    void init(String fileName, int fileSize, TransientStorePool 
transientStorePool) throws IOException;
+    void init(String fileName, int fileSize, RunningFlags runningFlags, 
TransientStorePool transientStorePool) throws IOException;
 
     Iterator<SelectMappedBufferResult> iterator(int pos);
 

Reply via email to