This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 47c07a9724 [ISSUE #9716] refactor: replace RandomAccessFile with FileChannel (#9715) 47c07a9724 is described below commit 47c07a97242bd979c4ce97c51efc350f3007882a Author: rongtong <jinrongton...@mails.ucas.ac.cn> AuthorDate: Fri Sep 19 15:43:01 2025 +0800 [ISSUE #9716] refactor: replace RandomAccessFile with FileChannel (#9715) * refactor: replace RandomAccessFile with FileChannel for better I/O performance - Remove RandomAccessFile field and related logic completely - Use FileChannel for all write operations when writeWithoutMmap is enabled - Change SharedByteBuffer to use direct memory allocation (ByteBuffer.allocateDirect) - Add RunningFlags support for better error handling - Improve constructor design with better parameter handling - Fix SharedByteBuffer write operation to ensure correct byte count This change improves I/O performance by: 1. Eliminating the overhead of RandomAccessFile 2. Using direct memory allocation for better memory management 3. Providing more consistent I/O operations through FileChannel 4. Better error handling with RunningFlags integration * writeWithoutMmap and transientStorePoolEnable cannot be used together. If both are enabled, only transientStorePoolEnable will take effect. * Fix config comment --- .../apache/rocketmq/store/DefaultMessageStore.java | 3 +- .../rocketmq/store/config/MessageStoreConfig.java | 7 ++- .../rocketmq/store/logfile/DefaultMappedFile.java | 72 ++++++++-------------- 3 files changed, 33 insertions(+), 49 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 4a8ecbfbf2..41b2e3da3e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -3025,7 +3025,8 @@ public class DefaultMessageStore implements MessageStore { */ public boolean isTransientStorePoolEnable() { return this.messageStoreConfig.isTransientStorePoolEnable() && - (this.brokerConfig.isEnableControllerMode() || this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE); + (this.brokerConfig.isEnableControllerMode() || this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE) + && !messageStoreConfig.isWriteWithoutMmap(); } public long getReputFromOffset() { diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index a48138b60d..2e72f9e6f2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -239,8 +239,11 @@ public class MessageStoreConfig { private boolean fastFailIfNoBufferInStorePool = false; /** - * When true, use RandomAccessFile for writing instead of MappedByteBuffer. - * This can be useful for certain scenarios where mmap is not desired. + * When true, use RandomAccessFile for writing instead of MappedByteBuffer. This can be useful for certain scenarios + * where mmap is not desired. + * + * The configurations writeWithoutMmap and transientStorePoolEnable are mutually exclusive. When both are set to + * true, only writeWithoutMmap will be effective. */ @ImportantField private boolean writeWithoutMmap = false; diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java index c566d9956d..147eb3d708 100644 --- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java @@ -82,10 +82,7 @@ public class DefaultMappedFile extends AbstractMappedFile { protected volatile int flushedPosition; protected int fileSize; protected FileChannel fileChannel; - /** - * RandomAccessFile for writing when writeWithoutMmap is enabled - */ - protected RandomAccessFile randomAccessFile = null; + /** * Message will put to here first, and then reput to FileChannel if writeBuffer is not null. */ @@ -130,7 +127,7 @@ public class DefaultMappedFile extends AbstractMappedFile { public SharedByteBuffer(int size) { this.lock = new ReentrantLock(); - this.buffer = ByteBuffer.allocate(size); + this.buffer = ByteBuffer.allocateDirect(size); } public void release() { @@ -237,8 +234,7 @@ public class DefaultMappedFile extends AbstractMappedFile { UtilAll.ensureDirOK(this.file.getParent()); try { - this.randomAccessFile = new RandomAccessFile(this.file, "rw"); - this.fileChannel = this.randomAccessFile.getChannel(); + this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); if (writeWithoutMmap) { // Still create MappedByteBuffer for reading operations @@ -261,9 +257,6 @@ public class DefaultMappedFile extends AbstractMappedFile { if (!ok && this.fileChannel != null) { this.fileChannel.close(); } - if (!ok && this.randomAccessFile != null) { - this.randomAccessFile.close(); - } } } @@ -333,7 +326,7 @@ public class DefaultMappedFile extends AbstractMappedFile { if (currentPos < this.fileSize) { SharedByteBuffer sharedByteBuffer = null; ByteBuffer byteBuffer; - if (writeWithoutMmap && randomAccessFile != null) { + if (writeWithoutMmap) { sharedByteBuffer = borrowSharedByteBuffer(); byteBuffer = sharedByteBuffer.acquire(); byteBuffer.position(0).limit(byteBuffer.capacity()); @@ -347,8 +340,9 @@ public class DefaultMappedFile extends AbstractMappedFile { if (sharedByteBuffer != null) { try { - randomAccessFile.seek(currentPos); - randomAccessFile.write(byteBuffer.array(), 0, result.getWroteBytes()); + this.fileChannel.position(currentPos); + byteBuffer.position(0).limit(result.getWroteBytes()); + this.fileChannel.write(byteBuffer); } catch (Throwable t) { log.error("Failed to write to mappedFile {}", this.fileName, t); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); @@ -388,7 +382,7 @@ public class DefaultMappedFile extends AbstractMappedFile { if (currentPos < this.fileSize) { SharedByteBuffer sharedByteBuffer = null; ByteBuffer byteBuffer; - if (writeWithoutMmap && randomAccessFile != null) { + if (writeWithoutMmap) { sharedByteBuffer = borrowSharedByteBuffer(); byteBuffer = sharedByteBuffer.acquire(); byteBuffer.position(0).limit(byteBuffer.capacity()); @@ -413,8 +407,9 @@ public class DefaultMappedFile extends AbstractMappedFile { if (sharedByteBuffer != null) { try { - randomAccessFile.seek(currentPos); - randomAccessFile.write(byteBuffer.array(), 0, result.getWroteBytes()); + this.fileChannel.position(currentPos); + byteBuffer.position(0).limit(result.getWroteBytes()); + this.fileChannel.write(byteBuffer); } catch (Throwable t) { log.error("Failed to write to mappedFile {}", this.fileName, t); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); @@ -452,12 +447,13 @@ public class DefaultMappedFile extends AbstractMappedFile { if ((currentPos + remaining) <= this.fileSize) { try { - if (writeWithoutMmap && randomAccessFile != null) { - // Use RandomAccessFile for writing - randomAccessFile.seek(currentPos); + if (writeWithoutMmap) { + // Use FileChannel for writing + this.fileChannel.position(currentPos); byte[] buffer = new byte[remaining]; data.get(buffer); - randomAccessFile.write(buffer); + ByteBuffer writeBuffer = ByteBuffer.wrap(buffer); + this.fileChannel.write(writeBuffer); } else { // Use FileChannel for writing (default behavior) this.fileChannel.position(currentPos); @@ -487,10 +483,11 @@ public class DefaultMappedFile extends AbstractMappedFile { if ((currentPos + length) <= this.fileSize) { try { - if (writeWithoutMmap && randomAccessFile != null) { - // Use RandomAccessFile for writing - randomAccessFile.seek(currentPos); - randomAccessFile.write(data, offset, length); + if (writeWithoutMmap) { + // Use FileChannel for writing + this.fileChannel.position(currentPos); + ByteBuffer writeBuffer = ByteBuffer.wrap(data, offset, length); + this.fileChannel.write(writeBuffer); } else { // Use MappedByteBuffer for writing (default behavior) ByteBuffer buf = this.mappedByteBuffer.slice(); @@ -542,17 +539,13 @@ public class DefaultMappedFile extends AbstractMappedFile { try { this.mappedByteBufferAccessCountSinceLastSwap++; - if (writeWithoutMmap && randomAccessFile != null) { - // Use RandomAccessFile for flushing - randomAccessFile.getChannel().force(false); + //We only append data to fileChannel or mappedByteBuffer, never both. + if (writeWithoutMmap || writeBuffer != null || this.fileChannel.position() != 0) { + this.fileChannel.force(false); } else { - //We only append data to fileChannel or mappedByteBuffer, never both. - if (writeBuffer != null || this.fileChannel.position() != 0) { - this.fileChannel.force(false); - } else { - this.mappedByteBuffer.force(); - } + this.mappedByteBuffer.force(); } + this.lastFlushTime = System.currentTimeMillis(); FLUSHED_POSITION_UPDATER.set(this, value); } catch (Throwable e) { @@ -750,14 +743,6 @@ public class DefaultMappedFile extends AbstractMappedFile { } catch (Throwable e) { log.warn("close file channel {" + this.fileName + "} failed when cleanup", e); } - try { - if (this.randomAccessFile != null) { - this.randomAccessFile.close(); - } - } catch (Throwable e) { - log.info("close random access file " + this.fileName + " failed", e); - } - } @Override @@ -770,11 +755,6 @@ public class DefaultMappedFile extends AbstractMappedFile { this.fileChannel.close(); log.info("close file channel " + this.fileName + " OK"); - if (this.randomAccessFile != null) { - this.randomAccessFile.close(); - log.info("close random access file " + this.fileName + " OK"); - } - long beginTime = System.currentTimeMillis(); boolean result = this.file.delete(); log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName