This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e262890216 [ISSUE #9776] Make SharedByteBuffer size configurable via
MessageStoreConfig.maxMessageSize (#9775)
e262890216 is described below
commit e26289021614d7407c5e2648af5bd708aa90b064
Author: rongtong <[email protected]>
AuthorDate: Fri Oct 24 13:36:52 2025 +0800
[ISSUE #9776] Make SharedByteBuffer size configurable via
MessageStoreConfig.maxMessageSize (#9775)
* Make SharedByteBuffer size configurable via
MessageStoreConfig.maxMessageSize
Change-Id: Ie3c291ba10b84963fb3ba0af90afa323d9b955ff
* Fix checkstyle
Change-Id: I75f9f767e30f33fc2ea4ceafd59b9d950875c765
* Fix UTs
Change-Id: I57b3c904d37558e4301394fc1dd4188b0866718b
* Fix UTs
Change-Id: I87775116926d3f5271eb13f0a86c0a40446ae432
* Fix bugs
Change-Id: Ib76596b91621b59d1e189642d081d259880f9ac8
* Fix comments
Change-Id: I7bdd3b9f24172afe77a7023b9aa4109dc271c27b
* refactor: make SharedByteBufferManager buffer count configurable
Change-Id: Ia97908f7e96f23542e8acf1a2cc6c1407d3d8e87
---
.../apache/rocketmq/store/DefaultMessageStore.java | 5 +
.../rocketmq/store/config/MessageStoreConfig.java | 11 ++
.../rocketmq/store/logfile/DefaultMappedFile.java | 117 ++++++++-----------
.../store/logfile/SharedByteBufferManager.java | 128 +++++++++++++++++++++
.../logfile/DefaultMappedFileConcurrencyTest.java | 3 +
.../DefaultMappedFileErrorHandlingTest.java | 3 +
.../logfile/DefaultMappedFilePerformanceTest.java | 3 +
.../DefaultMappedFileWriteWithoutMmapTest.java | 3 +
8 files changed, 204 insertions(+), 69 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7d939a969a..7db1daa39c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -103,6 +103,7 @@ import
org.apache.rocketmq.store.kv.CommitLogDispatcherCompaction;
import org.apache.rocketmq.store.kv.CompactionService;
import org.apache.rocketmq.store.kv.CompactionStore;
import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.logfile.SharedByteBufferManager;
import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
import org.apache.rocketmq.store.queue.CombineConsumeQueueStore;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
@@ -240,6 +241,10 @@ public class DefaultMessageStore implements MessageStore {
this.transientStorePool = new
TransientStorePool(messageStoreConfig.getTransientStorePoolSize(),
messageStoreConfig.getMappedFileSizeCommitLog());
+ if (messageStoreConfig.isWriteWithoutMmap()) {
+
SharedByteBufferManager.getInstance().init(messageStoreConfig.getMaxMessageSize(),
messageStoreConfig.getSharedByteBufferNum());
+ }
+
this.defaultStoreMetricsManager = new DefaultStoreMetricsManager();
this.scheduledExecutorService =
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 2e72f9e6f2..85d19f31b4 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -488,6 +488,9 @@ public class MessageStoreConfig {
*/
private boolean enableAcceleratedRecovery = false;
+ // Shared byte buffer manager configuration
+ private int sharedByteBufferNum = 16;
+
public String getRocksdbCompressionType() {
return rocksdbCompressionType;
}
@@ -2060,4 +2063,12 @@ public class MessageStoreConfig {
public void setEnableRunningFlagsInFlush(boolean
enableRunningFlagsInFlush) {
this.enableRunningFlagsInFlush = enableRunningFlagsInFlush;
}
+
+ public int getSharedByteBufferNum() {
+ return sharedByteBufferNum;
+ }
+
+ public void setSharedByteBufferNum(int sharedByteBufferNum) {
+ this.sharedByteBufferNum = sharedByteBufferNum;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index 147eb3d708..0c16d705bd 100644
---
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -33,11 +33,9 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Iterator;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.commons.lang3.SystemUtils;
import org.apache.rocketmq.common.UtilAll;
@@ -116,29 +114,11 @@ public class DefaultMappedFile extends AbstractMappedFile
{
*/
private long stopTimestamp = -1;
- private static int maxSharedNum = 16;
- private static final SharedByteBuffer[] SHARED_BYTE_BUFFER;
- protected RunningFlags runningFlags;
-
- static class SharedByteBuffer {
- private final ReentrantLock lock;
- private final ByteBuffer buffer;
- public SharedByteBuffer(int size) {
- this.lock = new ReentrantLock();
- this.buffer = ByteBuffer.allocateDirect(size);
- }
+ protected RunningFlags runningFlags;
- public void release() {
- this.lock.unlock();
- }
- public ByteBuffer acquire() {
- this.lock.lock();
- return buffer;
- }
- }
static {
WROTE_POSITION_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "wrotePosition");
@@ -156,18 +136,9 @@ public class DefaultMappedFile extends AbstractMappedFile {
}
}
IS_LOADED_METHOD = isLoaded0method;
-
- SHARED_BYTE_BUFFER = new SharedByteBuffer[maxSharedNum];
- for (int i = 0; i < maxSharedNum; i++) {
- SHARED_BYTE_BUFFER[i] = new SharedByteBuffer(4 * 1024 * 1024);
- }
}
- private static SharedByteBuffer borrowSharedByteBuffer() {
- int idx = ThreadLocalRandom.current().nextInt(maxSharedNum);
- SharedByteBuffer buffer = SHARED_BYTE_BUFFER[idx];
- return buffer;
- }
+
public DefaultMappedFile() {
}
@@ -324,10 +295,10 @@ public class DefaultMappedFile extends AbstractMappedFile
{
long fileFromOffset = this.getFileFromOffset();
if (currentPos < this.fileSize) {
- SharedByteBuffer sharedByteBuffer = null;
+ SharedByteBufferManager.SharedByteBuffer sharedByteBuffer = null;
ByteBuffer byteBuffer;
if (writeWithoutMmap) {
- sharedByteBuffer = borrowSharedByteBuffer();
+ sharedByteBuffer =
SharedByteBufferManager.getInstance().borrowSharedByteBuffer();
byteBuffer = sharedByteBuffer.acquire();
byteBuffer.position(0).limit(byteBuffer.capacity());
fileFromOffset += currentPos;
@@ -336,24 +307,28 @@ public class DefaultMappedFile extends AbstractMappedFile
{
byteBuffer.position(currentPos);
}
- AppendMessageResult result = cb.doAppend(byteBuffer,
fileFromOffset, this.fileSize - currentPos, byteBufferMsg);
+ try {
+ AppendMessageResult result = cb.doAppend(byteBuffer,
fileFromOffset, this.fileSize - currentPos, byteBufferMsg);
+
+ if (sharedByteBuffer != null) {
+ try {
+ this.fileChannel.position(currentPos);
+ byteBuffer.position(0).limit(result.getWroteBytes());
+ this.fileChannel.write(byteBuffer);
+ } catch (Throwable t) {
+ log.error("Failed to write to mappedFile {}",
this.fileName, t);
+ return new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+ }
+ }
- if (sharedByteBuffer != null) {
- try {
- this.fileChannel.position(currentPos);
- byteBuffer.position(0).limit(result.getWroteBytes());
- this.fileChannel.write(byteBuffer);
- } catch (Throwable t) {
- log.error("Failed to write to mappedFile {}",
this.fileName, t);
- return new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
- } finally {
+ WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
+ this.storeTimestamp = result.getStoreTimestamp();
+ return result;
+ } finally {
+ if (sharedByteBuffer != null) {
sharedByteBuffer.release();
}
}
-
- WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
- this.storeTimestamp = result.getStoreTimestamp();
- return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {}
fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
@@ -380,10 +355,10 @@ public class DefaultMappedFile extends AbstractMappedFile
{
long fileFromOffset = this.getFileFromOffset();
if (currentPos < this.fileSize) {
- SharedByteBuffer sharedByteBuffer = null;
+ SharedByteBufferManager.SharedByteBuffer sharedByteBuffer = null;
ByteBuffer byteBuffer;
if (writeWithoutMmap) {
- sharedByteBuffer = borrowSharedByteBuffer();
+ sharedByteBuffer =
SharedByteBufferManager.getInstance().borrowSharedByteBuffer();
byteBuffer = sharedByteBuffer.acquire();
byteBuffer.position(0).limit(byteBuffer.capacity());
fileFromOffset += currentPos;
@@ -393,27 +368,31 @@ public class DefaultMappedFile extends AbstractMappedFile
{
}
AppendMessageResult result;
- if (messageExt instanceof MessageExtBatch && !((MessageExtBatch)
messageExt).isInnerBatch()) {
- // traditional batch message
- result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize
- currentPos,
- (MessageExtBatch) messageExt, putMessageContext);
- } else if (messageExt instanceof MessageExtBrokerInner) {
- // traditional single message or newly introduced inner-batch
message
- result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize
- currentPos,
- (MessageExtBrokerInner) messageExt, putMessageContext);
- } else {
- return new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
- }
-
- if (sharedByteBuffer != null) {
- try {
- this.fileChannel.position(currentPos);
- byteBuffer.position(0).limit(result.getWroteBytes());
- this.fileChannel.write(byteBuffer);
- } catch (Throwable t) {
- log.error("Failed to write to mappedFile {}",
this.fileName, t);
+ try {
+ if (messageExt instanceof MessageExtBatch &&
!((MessageExtBatch) messageExt).isInnerBatch()) {
+ // traditional batch message
+ result = cb.doAppend(fileFromOffset, byteBuffer,
this.fileSize - currentPos,
+ (MessageExtBatch) messageExt, putMessageContext);
+ } else if (messageExt instanceof MessageExtBrokerInner) {
+ // traditional single message or newly introduced
inner-batch message
+ result = cb.doAppend(fileFromOffset, byteBuffer,
this.fileSize - currentPos,
+ (MessageExtBrokerInner) messageExt, putMessageContext);
+ } else {
return new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
- } finally {
+ }
+
+ if (sharedByteBuffer != null) {
+ try {
+ this.fileChannel.position(currentPos);
+ byteBuffer.position(0).limit(result.getWroteBytes());
+ this.fileChannel.write(byteBuffer);
+ } catch (Throwable t) {
+ log.error("Failed to write to mappedFile {}",
this.fileName, t);
+ return new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+ }
+ }
+ } finally {
+ if (sharedByteBuffer != null) {
sharedByteBuffer.release();
}
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/logfile/SharedByteBufferManager.java
b/store/src/main/java/org/apache/rocketmq/store/logfile/SharedByteBufferManager.java
new file mode 100644
index 0000000000..f7caff866b
--- /dev/null
+++
b/store/src/main/java/org/apache/rocketmq/store/logfile/SharedByteBufferManager.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.logfile;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Shared byte buffer manager for managing some shared ByteBuffers Buffer size
is set based on MessageStoreConfig's
+ * maxMessageSize
+ */
+public class SharedByteBufferManager {
+
+ private static volatile SharedByteBufferManager instance;
+ private static final Object LOCK = new Object();
+
+ private SharedByteBuffer[] sharedByteBuffers;
+ private int bufferSize;
+ private int maxSharedNum;
+ private volatile boolean initialized = false;
+
+ private SharedByteBufferManager() {
+ // Private constructor
+ }
+
+ /**
+ * Get singleton instance
+ */
+ public static SharedByteBufferManager getInstance() {
+ if (instance == null) {
+ synchronized (LOCK) {
+ if (instance == null) {
+ instance = new SharedByteBufferManager();
+ }
+ }
+ }
+ return instance;
+ }
+
+ /**
+ * Initialize shared buffers with specified messageSize size and shared
buffer number
+ *
+ * @param maxMessageSize max messageSize size
+ * @param sharedBufferNum number of shared buffers
+ */
+ public synchronized void init(int maxMessageSize, int sharedBufferNum) {
+ if (!initialized) {
+ //Reserve 64kb for encoding buffer outside body
+ bufferSize = Integer.MAX_VALUE - maxMessageSize >= 64 * 1024 ?
+ maxMessageSize + 64 * 1024 : Integer.MAX_VALUE;
+
+ this.maxSharedNum = sharedBufferNum;
+ this.sharedByteBuffers = new SharedByteBuffer[maxSharedNum];
+ for (int i = 0; i < maxSharedNum; i++) {
+ this.sharedByteBuffers[i] = new SharedByteBuffer(bufferSize);
+ }
+ this.initialized = true;
+ }
+ }
+
+ /**
+ * Borrow a shared buffer
+ *
+ * @return Shared buffer
+ */
+ public SharedByteBuffer borrowSharedByteBuffer() {
+ if (!initialized) {
+ throw new IllegalStateException("SharedByteBufferManager not
initialized");
+ }
+ int idx = ThreadLocalRandom.current().nextInt(maxSharedNum);
+ return sharedByteBuffers[idx];
+ }
+
+ /**
+ * Get current buffer size
+ *
+ * @return Buffer size
+ */
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ /**
+ * Check if initialized
+ *
+ * @return Whether initialized
+ */
+ public boolean isInitialized() {
+ return initialized;
+ }
+
+ /**
+ * Shared byte buffer class
+ */
+ public static class SharedByteBuffer {
+ private final ReentrantLock lock;
+ private final ByteBuffer buffer;
+
+ public SharedByteBuffer(int size) {
+ this.lock = new ReentrantLock();
+ this.buffer = ByteBuffer.allocateDirect(size);
+ }
+
+ public void release() {
+ this.lock.unlock();
+ }
+
+ public ByteBuffer acquire() {
+ this.lock.lock();
+ return buffer;
+ }
+ }
+}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java
index 06f94727d6..d8f3816c25 100644
---
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java
@@ -40,6 +40,9 @@ public class DefaultMappedFileConcurrencyTest {
storePath = System.getProperty("user.home") + File.separator +
"unitteststore" + System.currentTimeMillis();
fileName = storePath + File.separator + "00000000000000000000";
UtilAll.ensureDirOK(storePath);
+
+ // Initialize SharedByteBufferManager for tests
+ SharedByteBufferManager.getInstance().init(4 * 1024 * 1024, 16); //
4MB default, 16 shared buffers
}
@After
diff --git
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java
index 649e8071cc..efda8d84aa 100644
---
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java
@@ -43,6 +43,9 @@ public class DefaultMappedFileErrorHandlingTest {
storePath = System.getProperty("user.home") + File.separator +
"unitteststore" + System.currentTimeMillis();
fileName = storePath + File.separator + "00000000000000000000";
UtilAll.ensureDirOK(storePath);
+
+ // Initialize SharedByteBufferManager for tests
+ SharedByteBufferManager.getInstance().init(4 * 1024 * 1024, 16); //
4MB default, 16 shared buffers
}
@After
diff --git
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java
index b958487add..e418aecadb 100644
---
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java
@@ -38,6 +38,9 @@ public class DefaultMappedFilePerformanceTest {
storePath = System.getProperty("user.home") + File.separator +
"unitteststore" + System.currentTimeMillis();
fileName = storePath + File.separator + "00000000000000000000";
UtilAll.ensureDirOK(storePath);
+
+ // Initialize SharedByteBufferManager for tests
+ SharedByteBufferManager.getInstance().init(4 * 1024 * 1024, 16); //
4MB default, 16 shared buffers
}
@After
diff --git
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java
index 79bca016e4..8734a55b07 100644
---
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java
@@ -37,6 +37,9 @@ public class DefaultMappedFileWriteWithoutMmapTest {
storePath = System.getProperty("user.home") + File.separator +
"unitteststore" + System.currentTimeMillis();
fileName = storePath + File.separator + "00000000000000000000";
UtilAll.ensureDirOK(storePath);
+
+ // Initialize SharedByteBufferManager for tests
+ SharedByteBufferManager.getInstance().init(4 * 1024 * 1024, 16); //
4MB default, 16 shared buffers
}
@After