This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2017630383 [ISSUE #9834] Support writeWithoutMmap in IndexStoreFile 
(#9835)
2017630383 is described below

commit 2017630383b41e351e099a5fc4d022af99f9679d
Author: rongtong <[email protected]>
AuthorDate: Thu Nov 13 17:25:31 2025 +0800

    [ISSUE #9834] Support writeWithoutMmap in IndexStoreFile (#9835)
    
    * Use FileChannel for writing when writeWithoutMmap is enabled in 
IndexStoreFile
    
    Change-Id: I23ac2a4dfb8286cd8c3e51aeeb2d54d91136bc03
    Co-developed-by: Cursor <[email protected]>
    
    * Use FileChannel for writing when writeWithoutMmap is enabled in 
IndexStoreFile
    
    Change-Id: I6a541d13c81a16f39b88ac91ce770717f60d64ff
    Co-developed-by: Cursor <[email protected]>
    
    * Remove unnecessary rewind() call in IndexStoreFile when using FileChannel
    
    Change-Id: Id6deca99d34736761fd1d937f5eb5e75506ba1cc
    Co-developed-by: Cursor <[email protected]>
    
    * Add parameterized test for writeWithoutMmap in IndexStoreFileTest
    
    Change-Id: I1ce92e4f728a018ff7bd3160995f395b480345ea
    Co-developed-by: Cursor <[email protected]>
    
    * Fix compactToNewFile to read data from file when using FileChannel
    
    Change-Id: I9058b08f1bbcf646b5e7913260a5eefe853b9b16
    Co-developed-by: Cursor <[email protected]>
    
    * Fix newBuffer position and limit in compactToNewFile
    
    Change-Id: I1a0100b6159d636a8db1124e3de416b3a2e62bc5
    Co-developed-by: Cursor <[email protected]>
    
    * Update test parameter order in IndexStoreFileTest
    
    Change-Id: Ie01656574c1866badf726ead58a8ad2ca3d01cf7
    Co-developed-by: Cursor <[email protected]>
    
    ---------
    
    Co-authored-by: RongtongJin <[email protected]>
---
 .../rocketmq/tieredstore/MessageStoreConfig.java   |   9 ++
 .../rocketmq/tieredstore/TieredMessageStore.java   |   1 +
 .../rocketmq/tieredstore/index/IndexStoreFile.java | 107 +++++++++++++++++----
 .../tieredstore/index/IndexStoreFileTest.java      |  18 ++++
 4 files changed, 116 insertions(+), 19 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreConfig.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreConfig.java
index 10667566aa..d22ab80dd8 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreConfig.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreConfig.java
@@ -129,6 +129,7 @@ public class MessageStoreConfig {
     private String objectStoreBucket = "";
     private String objectStoreAccessKey = "";
     private String objectStoreSecretKey = "";
+    private boolean writeWithoutMmap = false;
 
     public static String localHostName() {
         try {
@@ -418,4 +419,12 @@ public class MessageStoreConfig {
     public String getObjectStoreEndpoint() {
         return objectStoreEndpoint;
     }
+
+    public boolean isWriteWithoutMmap() {
+        return writeWithoutMmap;
+    }
+
+    public void setWriteWithoutMmap(boolean writeWithoutMmap) {
+        this.writeWithoutMmap = writeWithoutMmap;
+    }
 }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 8720cb9412..9d56c4a8da 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -83,6 +83,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
         this.storeConfig = new MessageStoreConfig();
         this.context = context;
         this.context.registerConfiguration(this.storeConfig);
+        
this.storeConfig.setWriteWithoutMmap(context.getMessageStoreConfig().isWriteWithoutMmap());
         this.brokerName = this.storeConfig.getBrokerName();
         this.defaultStore = next;
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
index 528bce9bb8..c58e91e9c0 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -20,6 +20,7 @@ import com.google.common.base.Stopwatch;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -90,20 +91,26 @@ public class IndexStoreFile implements IndexFile {
     private final AtomicInteger hashSlotCount = new AtomicInteger(0);
     private final AtomicInteger indexItemCount = new AtomicInteger(0);
 
+    private final boolean writeWithoutMmap;
     private MappedFile mappedFile;
     private ByteBuffer byteBuffer;
     private MappedFile compactMappedFile;
     private FileSegment fileSegment;
 
+    private FileChannel fileChannel;
+
     public IndexStoreFile(MessageStoreConfig storeConfig, long timestamp) 
throws IOException {
+        this.writeWithoutMmap = storeConfig.isWriteWithoutMmap();
         this.hashSlotMaxCount = 
storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
         this.indexItemMaxCount = 
storeConfig.getTieredStoreIndexFileMaxIndexNum();
         this.fileStatus = new AtomicReference<>(UNSEALED);
         this.fileReadWriteLock = new ReentrantReadWriteLock();
         this.mappedFile = new DefaultMappedFile(
             Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME, 
String.valueOf(timestamp)).toString(),
-            this.getItemPosition(indexItemMaxCount));
+            this.getItemPosition(indexItemMaxCount),
+            this.writeWithoutMmap);
         this.byteBuffer = this.mappedFile.getMappedByteBuffer();
+        this.fileChannel = this.mappedFile.getFileChannel();
 
         this.beginTimestamp.set(timestamp);
         this.endTimestamp.set(byteBuffer.getLong(INDEX_BEGIN_TIME_STAMP));
@@ -113,6 +120,7 @@ public class IndexStoreFile implements IndexFile {
     }
 
     public IndexStoreFile(MessageStoreConfig storeConfig, FileSegment 
fileSegment) {
+        this.writeWithoutMmap = storeConfig.isWriteWithoutMmap();
         this.fileSegment = fileSegment;
         this.fileStatus = new AtomicReference<>(UPLOAD);
         this.fileReadWriteLock = new ReentrantReadWriteLock();
@@ -157,12 +165,31 @@ public class IndexStoreFile implements IndexFile {
         return (keyHash < 0) ? -keyHash : keyHash;
     }
 
-    protected void flushNewMetadata(ByteBuffer byteBuffer, boolean end) {
-        byteBuffer.putInt(INDEX_MAGIC_CODE, !end ? BEGIN_MAGIC_CODE : 
END_MAGIC_CODE);
-        byteBuffer.putLong(INDEX_BEGIN_TIME_STAMP, this.beginTimestamp.get());
-        byteBuffer.putLong(INDEX_END_TIME_STAMP, this.endTimestamp.get());
-        byteBuffer.putInt(INDEX_SLOT_COUNT, this.hashSlotCount.get());
-        byteBuffer.putInt(INDEX_ITEM_INDEX, this.indexItemCount.get());
+    protected void flushNewMetadata(ByteBuffer byteBuffer, boolean end) throws 
IOException {
+        flushNewMetadata(byteBuffer, end, null);
+    }
+
+    protected void flushNewMetadata(ByteBuffer byteBuffer, boolean end, 
FileChannel channel) throws IOException {
+        FileChannel targetChannel = channel != null ? channel : fileChannel;
+        if (writeWithoutMmap && targetChannel != null) {
+            // Use FileChannel for writing
+            ByteBuffer writeBuffer = ByteBuffer.allocate(INDEX_HEADER_SIZE);
+            writeBuffer.putInt(!end ? BEGIN_MAGIC_CODE : END_MAGIC_CODE);
+            writeBuffer.putLong(this.beginTimestamp.get());
+            writeBuffer.putLong(this.endTimestamp.get());
+            writeBuffer.putInt(this.hashSlotCount.get());
+            writeBuffer.putInt(this.indexItemCount.get());
+            writeBuffer.flip();
+            targetChannel.position(INDEX_MAGIC_CODE);
+            targetChannel.write(writeBuffer);
+        } else {
+            // Use ByteBuffer for writing
+            byteBuffer.putInt(INDEX_MAGIC_CODE, !end ? BEGIN_MAGIC_CODE : 
END_MAGIC_CODE);
+            byteBuffer.putLong(INDEX_BEGIN_TIME_STAMP, 
this.beginTimestamp.get());
+            byteBuffer.putLong(INDEX_END_TIME_STAMP, this.endTimestamp.get());
+            byteBuffer.putInt(INDEX_SLOT_COUNT, this.hashSlotCount.get());
+            byteBuffer.putInt(INDEX_ITEM_INDEX, this.indexItemCount.get());
+        }
     }
 
     protected int getSlotPosition(int slotIndex) {
@@ -215,9 +242,26 @@ public class IndexStoreFile implements IndexFile {
                 IndexItem indexItem = new IndexItem(
                     topicId, queueId, offset, size, hashCode, timeDiff, 
slotOldValue);
                 int itemIndex = this.indexItemCount.incrementAndGet();
-                this.byteBuffer.position(this.getItemPosition(itemIndex));
-                this.byteBuffer.put(indexItem.getByteBuffer());
-                this.byteBuffer.putInt(slotPosition, itemIndex);
+                int itemPosition = this.getItemPosition(itemIndex);
+                
+                if (writeWithoutMmap && fileChannel != null) {
+                    // Use FileChannel for writing
+                    ByteBuffer itemBuffer = indexItem.getByteBuffer();
+                    fileChannel.position(itemPosition);
+                    fileChannel.write(itemBuffer);
+                    
+                    ByteBuffer slotBuffer = ByteBuffer.allocate(Integer.BYTES);
+                    slotBuffer.putInt(0, itemIndex);
+                    slotBuffer.position(0);
+                    slotBuffer.limit(Integer.BYTES);
+                    fileChannel.position(slotPosition);
+                    fileChannel.write(slotBuffer);
+                } else {
+                    // Use ByteBuffer for writing
+                    this.byteBuffer.position(itemPosition);
+                    this.byteBuffer.put(indexItem.getByteBuffer());
+                    this.byteBuffer.putInt(slotPosition, itemIndex);
+                }
 
                 if (slotOldValue <= INVALID_INDEX) {
                     this.hashSlotCount.incrementAndGet();
@@ -231,7 +275,7 @@ public class IndexStoreFile implements IndexFile {
                     this.getTimestamp(), topic, key, hashCode % 
this.hashSlotMaxCount, itemIndex, slotOldValue, indexItem);
             }
             return AppendResult.SUCCESS;
-        } catch (Exception e) {
+        } catch (Throwable e) {
             log.error("IndexStoreFile put key error, topic: {}, topicId: {}, 
queueId: {}, keySet: {}, offset: {}, " +
                 "size: {}, timestamp: {}", topic, topicId, queueId, keySet, 
offset, size, timestamp, e);
         } finally {
@@ -392,7 +436,7 @@ public class IndexStoreFile implements IndexFile {
             buffer = compactToNewFile();
             log.debug("IndexStoreFile do compaction, timestamp: {}, file size: 
{}, cost: {}ms",
                 this.getTimestamp(), buffer.capacity(), 
stopwatch.elapsed(TimeUnit.MICROSECONDS));
-        } catch (Exception e) {
+        } catch (Throwable e) {
             log.error("IndexStoreFile do compaction, timestamp: {}, cost: 
{}ms",
                 this.getTimestamp(), stopwatch.elapsed(TimeUnit.MICROSECONDS), 
e);
             return null;
@@ -423,8 +467,9 @@ public class IndexStoreFile implements IndexFile {
         int writePosition = INDEX_HEADER_SIZE + (hashSlotMaxCount * 
HASH_SLOT_SIZE);
         int fileMaxLength = writePosition + COMPACT_INDEX_ITEM_SIZE * 
indexItemCount.get();
 
-        compactMappedFile = new DefaultMappedFile(this.getCompactedFilePath(), 
fileMaxLength);
+        compactMappedFile = new DefaultMappedFile(this.getCompactedFilePath(), 
fileMaxLength, writeWithoutMmap);
         MappedByteBuffer newBuffer = compactMappedFile.getMappedByteBuffer();
+        FileChannel compactFileChannel = compactMappedFile.getFileChannel();
 
         for (int i = 0; i < hashSlotMaxCount; i++) {
             int slotPosition = this.getSlotPosition(i);
@@ -437,24 +482,48 @@ public class IndexStoreFile implements IndexFile {
                 buffer.get(payload);
                 int newSlotValue = 
payloadBuffer.getInt(COMPACT_INDEX_ITEM_SIZE);
                 buffer.limit(COMPACT_INDEX_ITEM_SIZE);
-                newBuffer.position(writePosition);
-                newBuffer.put(payload, 0, COMPACT_INDEX_ITEM_SIZE);
+                
+                if (writeWithoutMmap && compactFileChannel != null) {
+                    // Use FileChannel for writing
+                    ByteBuffer writeBuffer = ByteBuffer.wrap(payload, 0, 
COMPACT_INDEX_ITEM_SIZE);
+                    compactFileChannel.position(writePosition);
+                    compactFileChannel.write(writeBuffer);
+                } else {
+                    // Use ByteBuffer for writing
+                    newBuffer.position(writePosition);
+                    newBuffer.put(payload, 0, COMPACT_INDEX_ITEM_SIZE);
+                }
                 log.trace("IndexStoreFile do compaction, write item, slot: {}, 
current: {}, next: {}", i, slotValue, newSlotValue);
                 slotValue = newSlotValue;
                 writePosition += COMPACT_INDEX_ITEM_SIZE;
             }
 
             int length = writePosition - writeBeginPosition;
-            newBuffer.putInt(slotPosition, writeBeginPosition);
-            newBuffer.putInt(slotPosition + Integer.BYTES, length);
+            if (writeWithoutMmap && compactFileChannel != null) {
+                // Use FileChannel for writing
+                ByteBuffer slotWriteBuffer = ByteBuffer.allocate(Integer.BYTES 
* 2);
+                slotWriteBuffer.putInt(0, writeBeginPosition);
+                slotWriteBuffer.putInt(Integer.BYTES, length);
+                slotWriteBuffer.position(0);
+                slotWriteBuffer.limit(Integer.BYTES * 2);
+                compactFileChannel.position(slotPosition);
+                compactFileChannel.write(slotWriteBuffer);
+            } else {
+                // Use ByteBuffer for writing
+                newBuffer.putInt(slotPosition, writeBeginPosition);
+                newBuffer.putInt(slotPosition + Integer.BYTES, length);
+            }
 
             if (length > 0) {
                 log.trace("IndexStoreFile do compaction, write slot, slot: {}, 
begin: {}, length: {}", i, writeBeginPosition, length);
             }
         }
 
-        this.flushNewMetadata(newBuffer, true);
-        newBuffer.flip();
+        this.flushNewMetadata(newBuffer, true, compactFileChannel);
+
+        // Set position and limit for reading
+        newBuffer.position(0);
+        newBuffer.limit(fileMaxLength);
         return newBuffer;
     }
 
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
index d19b562463..10014ba76a 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
@@ -39,7 +39,13 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(Parameterized.class)
 public class IndexStoreFileTest {
 
     private static final String TOPIC_NAME = "TopicTest";
@@ -50,6 +56,17 @@ public class IndexStoreFileTest {
     private static final String KEY = "MessageKey";
     private static final Set<String> KEY_SET = Collections.singleton(KEY);
 
+    @Parameterized.Parameter
+    public boolean writeWithoutMmap;
+
+    @Parameterized.Parameters(name = "writeWithoutMmap={0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+            { true },
+            { false }
+        });
+    }
+
     private String filePath;
     private MessageStoreConfig storeConfig;
     private IndexStoreFile indexStoreFile;
@@ -64,6 +81,7 @@ public class IndexStoreFileTest {
         storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5);
         storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
         
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.PosixFileSegment");
+        storeConfig.setWriteWithoutMmap(writeWithoutMmap);
         indexStoreFile = new IndexStoreFile(storeConfig, 
System.currentTimeMillis());
     }
 

Reply via email to