This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 47c07a9724 [ISSUE #9716] refactor: replace RandomAccessFile with 
FileChannel (#9715)
47c07a9724 is described below

commit 47c07a97242bd979c4ce97c51efc350f3007882a
Author: rongtong <jinrongton...@mails.ucas.ac.cn>
AuthorDate: Fri Sep 19 15:43:01 2025 +0800

    [ISSUE #9716] refactor: replace RandomAccessFile with FileChannel (#9715)
    
    * refactor: replace RandomAccessFile with FileChannel for better I/O 
performance
    
    - Remove RandomAccessFile field and related logic completely
    - Use FileChannel for all write operations when writeWithoutMmap is enabled
    - Change SharedByteBuffer to use direct memory allocation 
(ByteBuffer.allocateDirect)
    - Add RunningFlags support for better error handling
    - Improve constructor design with better parameter handling
    - Fix SharedByteBuffer write operation to ensure correct byte count
    
    This change improves I/O performance by:
    1. Eliminating the overhead of RandomAccessFile
    2. Using direct memory allocation for better memory management
    3. Providing more consistent I/O operations through FileChannel
    4. Better error handling with RunningFlags integration
    
    * writeWithoutMmap and transientStorePoolEnable cannot be used together. If 
both are enabled, only transientStorePoolEnable will take effect.
    
    * Fix config comment
---
 .../apache/rocketmq/store/DefaultMessageStore.java |  3 +-
 .../rocketmq/store/config/MessageStoreConfig.java  |  7 ++-
 .../rocketmq/store/logfile/DefaultMappedFile.java  | 72 ++++++++--------------
 3 files changed, 33 insertions(+), 49 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 4a8ecbfbf2..41b2e3da3e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -3025,7 +3025,8 @@ public class DefaultMessageStore implements MessageStore {
      */
     public boolean isTransientStorePoolEnable() {
         return this.messageStoreConfig.isTransientStorePoolEnable() &&
-            (this.brokerConfig.isEnableControllerMode() || 
this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE);
+            (this.brokerConfig.isEnableControllerMode() || 
this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE)
+            && !messageStoreConfig.isWriteWithoutMmap();
     }
 
     public long getReputFromOffset() {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index a48138b60d..2e72f9e6f2 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -239,8 +239,11 @@ public class MessageStoreConfig {
     private boolean fastFailIfNoBufferInStorePool = false;
 
     /**
-     * When true, use RandomAccessFile for writing instead of MappedByteBuffer.
-     * This can be useful for certain scenarios where mmap is not desired.
+     * When true, use RandomAccessFile for writing instead of 
MappedByteBuffer. This can be useful for certain scenarios
+     * where mmap is not desired.
+     *
+     * The configurations writeWithoutMmap and transientStorePoolEnable are 
mutually exclusive. When both are set to
+     * true, only writeWithoutMmap will be effective.
      */
     @ImportantField
     private boolean writeWithoutMmap = false;
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index c566d9956d..147eb3d708 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -82,10 +82,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
     protected volatile int flushedPosition;
     protected int fileSize;
     protected FileChannel fileChannel;
-    /**
-     * RandomAccessFile for writing when writeWithoutMmap is enabled
-     */
-    protected RandomAccessFile randomAccessFile = null;
+
     /**
      * Message will put to here first, and then reput to FileChannel if 
writeBuffer is not null.
      */
@@ -130,7 +127,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
 
         public SharedByteBuffer(int size) {
             this.lock = new ReentrantLock();
-            this.buffer = ByteBuffer.allocate(size);
+            this.buffer = ByteBuffer.allocateDirect(size);
         }
 
         public void release() {
@@ -237,8 +234,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
         UtilAll.ensureDirOK(this.file.getParent());
 
         try {
-            this.randomAccessFile = new RandomAccessFile(this.file, "rw");
-            this.fileChannel = this.randomAccessFile.getChannel();
+            this.fileChannel = new RandomAccessFile(this.file, 
"rw").getChannel();
 
             if (writeWithoutMmap) {
                 // Still create MappedByteBuffer for reading operations
@@ -261,9 +257,6 @@ public class DefaultMappedFile extends AbstractMappedFile {
             if (!ok && this.fileChannel != null) {
                 this.fileChannel.close();
             }
-            if (!ok && this.randomAccessFile != null) {
-                this.randomAccessFile.close();
-            }
         }
     }
 
@@ -333,7 +326,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
         if (currentPos < this.fileSize) {
             SharedByteBuffer sharedByteBuffer = null;
             ByteBuffer byteBuffer;
-            if (writeWithoutMmap && randomAccessFile != null) {
+            if (writeWithoutMmap) {
                 sharedByteBuffer = borrowSharedByteBuffer();
                 byteBuffer = sharedByteBuffer.acquire();
                 byteBuffer.position(0).limit(byteBuffer.capacity());
@@ -347,8 +340,9 @@ public class DefaultMappedFile extends AbstractMappedFile {
 
             if (sharedByteBuffer != null) {
                 try {
-                    randomAccessFile.seek(currentPos);
-                    randomAccessFile.write(byteBuffer.array(), 0, 
result.getWroteBytes());
+                    this.fileChannel.position(currentPos);
+                    byteBuffer.position(0).limit(result.getWroteBytes());
+                    this.fileChannel.write(byteBuffer);
                 } catch (Throwable t) {
                     log.error("Failed to write to mappedFile {}", 
this.fileName, t);
                     return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
@@ -388,7 +382,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
         if (currentPos < this.fileSize) {
             SharedByteBuffer sharedByteBuffer = null;
             ByteBuffer byteBuffer;
-            if (writeWithoutMmap && randomAccessFile != null) {
+            if (writeWithoutMmap) {
                 sharedByteBuffer = borrowSharedByteBuffer();
                 byteBuffer = sharedByteBuffer.acquire();
                 byteBuffer.position(0).limit(byteBuffer.capacity());
@@ -413,8 +407,9 @@ public class DefaultMappedFile extends AbstractMappedFile {
 
             if (sharedByteBuffer != null) {
                 try {
-                    randomAccessFile.seek(currentPos);
-                    randomAccessFile.write(byteBuffer.array(), 0, 
result.getWroteBytes());
+                    this.fileChannel.position(currentPos);
+                    byteBuffer.position(0).limit(result.getWroteBytes());
+                    this.fileChannel.write(byteBuffer);
                 } catch (Throwable t) {
                     log.error("Failed to write to mappedFile {}", 
this.fileName, t);
                     return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
@@ -452,12 +447,13 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
 
         if ((currentPos + remaining) <= this.fileSize) {
             try {
-                if (writeWithoutMmap && randomAccessFile != null) {
-                    // Use RandomAccessFile for writing
-                    randomAccessFile.seek(currentPos);
+                if (writeWithoutMmap) {
+                    // Use FileChannel for writing
+                    this.fileChannel.position(currentPos);
                     byte[] buffer = new byte[remaining];
                     data.get(buffer);
-                    randomAccessFile.write(buffer);
+                    ByteBuffer writeBuffer = ByteBuffer.wrap(buffer);
+                    this.fileChannel.write(writeBuffer);
                 } else {
                     // Use FileChannel for writing (default behavior)
                     this.fileChannel.position(currentPos);
@@ -487,10 +483,11 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
 
         if ((currentPos + length) <= this.fileSize) {
             try {
-                if (writeWithoutMmap && randomAccessFile != null) {
-                    // Use RandomAccessFile for writing
-                    randomAccessFile.seek(currentPos);
-                    randomAccessFile.write(data, offset, length);
+                if (writeWithoutMmap) {
+                    // Use FileChannel for writing
+                    this.fileChannel.position(currentPos);
+                    ByteBuffer writeBuffer = ByteBuffer.wrap(data, offset, 
length);
+                    this.fileChannel.write(writeBuffer);
                 } else {
                     // Use MappedByteBuffer for writing (default behavior)
                     ByteBuffer buf = this.mappedByteBuffer.slice();
@@ -542,17 +539,13 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
                 try {
                     this.mappedByteBufferAccessCountSinceLastSwap++;
 
-                    if (writeWithoutMmap && randomAccessFile != null) {
-                        // Use RandomAccessFile for flushing
-                        randomAccessFile.getChannel().force(false);
+                    //We only append data to fileChannel or mappedByteBuffer, 
never both.
+                    if (writeWithoutMmap || writeBuffer != null || 
this.fileChannel.position() != 0) {
+                        this.fileChannel.force(false);
                     } else {
-                        //We only append data to fileChannel or 
mappedByteBuffer, never both.
-                        if (writeBuffer != null || this.fileChannel.position() 
!= 0) {
-                            this.fileChannel.force(false);
-                        } else {
-                            this.mappedByteBuffer.force();
-                        }
+                        this.mappedByteBuffer.force();
                     }
+
                     this.lastFlushTime = System.currentTimeMillis();
                     FLUSHED_POSITION_UPDATER.set(this, value);
                 } catch (Throwable e) {
@@ -750,14 +743,6 @@ public class DefaultMappedFile extends AbstractMappedFile {
         } catch (Throwable e) {
             log.warn("close file channel {" + this.fileName + "} failed when 
cleanup", e);
         }
-        try {
-            if (this.randomAccessFile != null) {
-                this.randomAccessFile.close();
-            }
-        } catch (Throwable e) {
-            log.info("close random access file " + this.fileName + " failed", 
e);
-        }
-
     }
 
     @Override
@@ -770,11 +755,6 @@ public class DefaultMappedFile extends AbstractMappedFile {
                 this.fileChannel.close();
                 log.info("close file channel " + this.fileName + " OK");
 
-                if (this.randomAccessFile != null) {
-                    this.randomAccessFile.close();
-                    log.info("close random access file " + this.fileName + " 
OK");
-                }
-
                 long beginTime = System.currentTimeMillis();
                 boolean result = this.file.delete();
                 log.info("delete file[REF:" + this.getRefCount() + "] " + 
this.fileName

Reply via email to