This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2017630383 [ISSUE #9834] Support writeWithoutMmap in IndexStoreFile
(#9835)
2017630383 is described below
commit 2017630383b41e351e099a5fc4d022af99f9679d
Author: rongtong <[email protected]>
AuthorDate: Thu Nov 13 17:25:31 2025 +0800
[ISSUE #9834] Support writeWithoutMmap in IndexStoreFile (#9835)
* Use FileChannel for writing when writeWithoutMmap is enabled in
IndexStoreFile
Change-Id: I23ac2a4dfb8286cd8c3e51aeeb2d54d91136bc03
Co-developed-by: Cursor <[email protected]>
* Use FileChannel for writing when writeWithoutMmap is enabled in
IndexStoreFile
Change-Id: I6a541d13c81a16f39b88ac91ce770717f60d64ff
Co-developed-by: Cursor <[email protected]>
* Remove unnecessary rewind() call in IndexStoreFile when using FileChannel
Change-Id: Id6deca99d34736761fd1d937f5eb5e75506ba1cc
Co-developed-by: Cursor <[email protected]>
* Add parameterized test for writeWithoutMmap in IndexStoreFileTest
Change-Id: I1ce92e4f728a018ff7bd3160995f395b480345ea
Co-developed-by: Cursor <[email protected]>
* Fix compactToNewFile to read data from file when using FileChannel
Change-Id: I9058b08f1bbcf646b5e7913260a5eefe853b9b16
Co-developed-by: Cursor <[email protected]>
* Fix newBuffer position and limit in compactToNewFile
Change-Id: I1a0100b6159d636a8db1124e3de416b3a2e62bc5
Co-developed-by: Cursor <[email protected]>
* Update test parameter order in IndexStoreFileTest
Change-Id: Ie01656574c1866badf726ead58a8ad2ca3d01cf7
Co-developed-by: Cursor <[email protected]>
---------
Co-authored-by: RongtongJin <[email protected]>
---
.../rocketmq/tieredstore/MessageStoreConfig.java | 9 ++
.../rocketmq/tieredstore/TieredMessageStore.java | 1 +
.../rocketmq/tieredstore/index/IndexStoreFile.java | 107 +++++++++++++++++----
.../tieredstore/index/IndexStoreFileTest.java | 18 ++++
4 files changed, 116 insertions(+), 19 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreConfig.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreConfig.java
index 10667566aa..d22ab80dd8 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreConfig.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreConfig.java
@@ -129,6 +129,7 @@ public class MessageStoreConfig {
private String objectStoreBucket = "";
private String objectStoreAccessKey = "";
private String objectStoreSecretKey = "";
+ private boolean writeWithoutMmap = false;
public static String localHostName() {
try {
@@ -418,4 +419,12 @@ public class MessageStoreConfig {
public String getObjectStoreEndpoint() {
return objectStoreEndpoint;
}
+
+ public boolean isWriteWithoutMmap() {
+ return writeWithoutMmap;
+ }
+
+ public void setWriteWithoutMmap(boolean writeWithoutMmap) {
+ this.writeWithoutMmap = writeWithoutMmap;
+ }
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 8720cb9412..9d56c4a8da 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -83,6 +83,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
this.storeConfig = new MessageStoreConfig();
this.context = context;
this.context.registerConfiguration(this.storeConfig);
+
this.storeConfig.setWriteWithoutMmap(context.getMessageStoreConfig().isWriteWithoutMmap());
this.brokerName = this.storeConfig.getBrokerName();
this.defaultStore = next;
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
index 528bce9bb8..c58e91e9c0 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -20,6 +20,7 @@ import com.google.common.base.Stopwatch;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
@@ -90,20 +91,26 @@ public class IndexStoreFile implements IndexFile {
private final AtomicInteger hashSlotCount = new AtomicInteger(0);
private final AtomicInteger indexItemCount = new AtomicInteger(0);
+ private final boolean writeWithoutMmap;
private MappedFile mappedFile;
private ByteBuffer byteBuffer;
private MappedFile compactMappedFile;
private FileSegment fileSegment;
+ private FileChannel fileChannel;
+
public IndexStoreFile(MessageStoreConfig storeConfig, long timestamp)
throws IOException {
+ this.writeWithoutMmap = storeConfig.isWriteWithoutMmap();
this.hashSlotMaxCount =
storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
this.indexItemMaxCount =
storeConfig.getTieredStoreIndexFileMaxIndexNum();
this.fileStatus = new AtomicReference<>(UNSEALED);
this.fileReadWriteLock = new ReentrantReadWriteLock();
this.mappedFile = new DefaultMappedFile(
Paths.get(storeConfig.getStorePathRootDir(), FILE_DIRECTORY_NAME,
String.valueOf(timestamp)).toString(),
- this.getItemPosition(indexItemMaxCount));
+ this.getItemPosition(indexItemMaxCount),
+ this.writeWithoutMmap);
this.byteBuffer = this.mappedFile.getMappedByteBuffer();
+ this.fileChannel = this.mappedFile.getFileChannel();
this.beginTimestamp.set(timestamp);
this.endTimestamp.set(byteBuffer.getLong(INDEX_BEGIN_TIME_STAMP));
@@ -113,6 +120,7 @@ public class IndexStoreFile implements IndexFile {
}
public IndexStoreFile(MessageStoreConfig storeConfig, FileSegment
fileSegment) {
+ this.writeWithoutMmap = storeConfig.isWriteWithoutMmap();
this.fileSegment = fileSegment;
this.fileStatus = new AtomicReference<>(UPLOAD);
this.fileReadWriteLock = new ReentrantReadWriteLock();
@@ -157,12 +165,31 @@ public class IndexStoreFile implements IndexFile {
return (keyHash < 0) ? -keyHash : keyHash;
}
- protected void flushNewMetadata(ByteBuffer byteBuffer, boolean end) {
- byteBuffer.putInt(INDEX_MAGIC_CODE, !end ? BEGIN_MAGIC_CODE :
END_MAGIC_CODE);
- byteBuffer.putLong(INDEX_BEGIN_TIME_STAMP, this.beginTimestamp.get());
- byteBuffer.putLong(INDEX_END_TIME_STAMP, this.endTimestamp.get());
- byteBuffer.putInt(INDEX_SLOT_COUNT, this.hashSlotCount.get());
- byteBuffer.putInt(INDEX_ITEM_INDEX, this.indexItemCount.get());
+ protected void flushNewMetadata(ByteBuffer byteBuffer, boolean end) throws
IOException {
+ flushNewMetadata(byteBuffer, end, null);
+ }
+
+ protected void flushNewMetadata(ByteBuffer byteBuffer, boolean end,
FileChannel channel) throws IOException {
+ FileChannel targetChannel = channel != null ? channel : fileChannel;
+ if (writeWithoutMmap && targetChannel != null) {
+ // Use FileChannel for writing
+ ByteBuffer writeBuffer = ByteBuffer.allocate(INDEX_HEADER_SIZE);
+ writeBuffer.putInt(!end ? BEGIN_MAGIC_CODE : END_MAGIC_CODE);
+ writeBuffer.putLong(this.beginTimestamp.get());
+ writeBuffer.putLong(this.endTimestamp.get());
+ writeBuffer.putInt(this.hashSlotCount.get());
+ writeBuffer.putInt(this.indexItemCount.get());
+ writeBuffer.flip();
+ targetChannel.position(INDEX_MAGIC_CODE);
+ targetChannel.write(writeBuffer);
+ } else {
+ // Use ByteBuffer for writing
+ byteBuffer.putInt(INDEX_MAGIC_CODE, !end ? BEGIN_MAGIC_CODE :
END_MAGIC_CODE);
+ byteBuffer.putLong(INDEX_BEGIN_TIME_STAMP,
this.beginTimestamp.get());
+ byteBuffer.putLong(INDEX_END_TIME_STAMP, this.endTimestamp.get());
+ byteBuffer.putInt(INDEX_SLOT_COUNT, this.hashSlotCount.get());
+ byteBuffer.putInt(INDEX_ITEM_INDEX, this.indexItemCount.get());
+ }
}
protected int getSlotPosition(int slotIndex) {
@@ -215,9 +242,26 @@ public class IndexStoreFile implements IndexFile {
IndexItem indexItem = new IndexItem(
topicId, queueId, offset, size, hashCode, timeDiff,
slotOldValue);
int itemIndex = this.indexItemCount.incrementAndGet();
- this.byteBuffer.position(this.getItemPosition(itemIndex));
- this.byteBuffer.put(indexItem.getByteBuffer());
- this.byteBuffer.putInt(slotPosition, itemIndex);
+ int itemPosition = this.getItemPosition(itemIndex);
+
+ if (writeWithoutMmap && fileChannel != null) {
+ // Use FileChannel for writing
+ ByteBuffer itemBuffer = indexItem.getByteBuffer();
+ fileChannel.position(itemPosition);
+ fileChannel.write(itemBuffer);
+
+ ByteBuffer slotBuffer = ByteBuffer.allocate(Integer.BYTES);
+ slotBuffer.putInt(0, itemIndex);
+ slotBuffer.position(0);
+ slotBuffer.limit(Integer.BYTES);
+ fileChannel.position(slotPosition);
+ fileChannel.write(slotBuffer);
+ } else {
+ // Use ByteBuffer for writing
+ this.byteBuffer.position(itemPosition);
+ this.byteBuffer.put(indexItem.getByteBuffer());
+ this.byteBuffer.putInt(slotPosition, itemIndex);
+ }
if (slotOldValue <= INVALID_INDEX) {
this.hashSlotCount.incrementAndGet();
@@ -231,7 +275,7 @@ public class IndexStoreFile implements IndexFile {
this.getTimestamp(), topic, key, hashCode %
this.hashSlotMaxCount, itemIndex, slotOldValue, indexItem);
}
return AppendResult.SUCCESS;
- } catch (Exception e) {
+ } catch (Throwable e) {
log.error("IndexStoreFile put key error, topic: {}, topicId: {},
queueId: {}, keySet: {}, offset: {}, " +
"size: {}, timestamp: {}", topic, topicId, queueId, keySet,
offset, size, timestamp, e);
} finally {
@@ -392,7 +436,7 @@ public class IndexStoreFile implements IndexFile {
buffer = compactToNewFile();
log.debug("IndexStoreFile do compaction, timestamp: {}, file size:
{}, cost: {}ms",
this.getTimestamp(), buffer.capacity(),
stopwatch.elapsed(TimeUnit.MICROSECONDS));
- } catch (Exception e) {
+ } catch (Throwable e) {
log.error("IndexStoreFile do compaction, timestamp: {}, cost:
{}ms",
this.getTimestamp(), stopwatch.elapsed(TimeUnit.MICROSECONDS),
e);
return null;
@@ -423,8 +467,9 @@ public class IndexStoreFile implements IndexFile {
int writePosition = INDEX_HEADER_SIZE + (hashSlotMaxCount *
HASH_SLOT_SIZE);
int fileMaxLength = writePosition + COMPACT_INDEX_ITEM_SIZE *
indexItemCount.get();
- compactMappedFile = new DefaultMappedFile(this.getCompactedFilePath(),
fileMaxLength);
+ compactMappedFile = new DefaultMappedFile(this.getCompactedFilePath(),
fileMaxLength, writeWithoutMmap);
MappedByteBuffer newBuffer = compactMappedFile.getMappedByteBuffer();
+ FileChannel compactFileChannel = compactMappedFile.getFileChannel();
for (int i = 0; i < hashSlotMaxCount; i++) {
int slotPosition = this.getSlotPosition(i);
@@ -437,24 +482,48 @@ public class IndexStoreFile implements IndexFile {
buffer.get(payload);
int newSlotValue =
payloadBuffer.getInt(COMPACT_INDEX_ITEM_SIZE);
buffer.limit(COMPACT_INDEX_ITEM_SIZE);
- newBuffer.position(writePosition);
- newBuffer.put(payload, 0, COMPACT_INDEX_ITEM_SIZE);
+
+ if (writeWithoutMmap && compactFileChannel != null) {
+ // Use FileChannel for writing
+ ByteBuffer writeBuffer = ByteBuffer.wrap(payload, 0,
COMPACT_INDEX_ITEM_SIZE);
+ compactFileChannel.position(writePosition);
+ compactFileChannel.write(writeBuffer);
+ } else {
+ // Use ByteBuffer for writing
+ newBuffer.position(writePosition);
+ newBuffer.put(payload, 0, COMPACT_INDEX_ITEM_SIZE);
+ }
log.trace("IndexStoreFile do compaction, write item, slot: {},
current: {}, next: {}", i, slotValue, newSlotValue);
slotValue = newSlotValue;
writePosition += COMPACT_INDEX_ITEM_SIZE;
}
int length = writePosition - writeBeginPosition;
- newBuffer.putInt(slotPosition, writeBeginPosition);
- newBuffer.putInt(slotPosition + Integer.BYTES, length);
+ if (writeWithoutMmap && compactFileChannel != null) {
+ // Use FileChannel for writing
+ ByteBuffer slotWriteBuffer = ByteBuffer.allocate(Integer.BYTES
* 2);
+ slotWriteBuffer.putInt(0, writeBeginPosition);
+ slotWriteBuffer.putInt(Integer.BYTES, length);
+ slotWriteBuffer.position(0);
+ slotWriteBuffer.limit(Integer.BYTES * 2);
+ compactFileChannel.position(slotPosition);
+ compactFileChannel.write(slotWriteBuffer);
+ } else {
+ // Use ByteBuffer for writing
+ newBuffer.putInt(slotPosition, writeBeginPosition);
+ newBuffer.putInt(slotPosition + Integer.BYTES, length);
+ }
if (length > 0) {
log.trace("IndexStoreFile do compaction, write slot, slot: {},
begin: {}, length: {}", i, writeBeginPosition, length);
}
}
- this.flushNewMetadata(newBuffer, true);
- newBuffer.flip();
+ this.flushNewMetadata(newBuffer, true, compactFileChannel);
+
+ // Set position and limit for reading
+ newBuffer.position(0);
+ newBuffer.limit(fileMaxLength);
return newBuffer;
}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
index d19b562463..10014ba76a 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreFileTest.java
@@ -39,7 +39,13 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(Parameterized.class)
public class IndexStoreFileTest {
private static final String TOPIC_NAME = "TopicTest";
@@ -50,6 +56,17 @@ public class IndexStoreFileTest {
private static final String KEY = "MessageKey";
private static final Set<String> KEY_SET = Collections.singleton(KEY);
+ @Parameterized.Parameter
+ public boolean writeWithoutMmap;
+
+ @Parameterized.Parameters(name = "writeWithoutMmap={0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ { true },
+ { false }
+ });
+ }
+
private String filePath;
private MessageStoreConfig storeConfig;
private IndexStoreFile indexStoreFile;
@@ -64,6 +81,7 @@ public class IndexStoreFileTest {
storeConfig.setTieredStoreIndexFileMaxHashSlotNum(5);
storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.PosixFileSegment");
+ storeConfig.setWriteWithoutMmap(writeWithoutMmap);
indexStoreFile = new IndexStoreFile(storeConfig,
System.currentTimeMillis());
}