This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 493e52b94 [ISSUE#4520] [Optimization] Implenment adjusting
maxMessageSize dynamicly (#4521)
493e52b94 is described below
commit 493e52b945c7fdbdddb759847816d4952c966cff
Author: Shengmin Wang <[email protected]>
AuthorDate: Thu Jun 30 16:06:43 2022 +0800
[ISSUE#4520] [Optimization] Implenment adjusting maxMessageSize dynamicly
(#4521)
* add updateEncoderBufferCapacity funcation
* add junit test, testDynamicMaxMessageSize()
* add updateEncoderBufferCapacity in MultiDispatch class
* add notion
* add updateMaxMessageBodySize in DLedger mode
* modify code style, add some whitespaces
* remove the adaptation of DLeger mode
* modify notion
* DLedger mode recovery
* check newMaxMessageSize need to >= 10
---
.../java/org/apache/rocketmq/store/CommitLog.java | 32 +++++++++++++++++++---
.../org/apache/rocketmq/store/MultiDispatch.java | 12 +++++++-
.../rocketmq/store/DefaultMessageStoreTest.java | 22 +++++++++++++++
.../store/dledger/DLedgerCommitlogTest.java | 1 -
4 files changed, 61 insertions(+), 6 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 8b8a86315..efed87f67 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -604,6 +604,15 @@ public class CommitLog {
return keyBuilder.toString();
}
+ public void updateMaxMessageSize(PutMessageThreadLocal
putMessageThreadLocal) {
+ // dynamically adjust maxMessageSize, but not support DLedger mode
temporarily
+ int newMaxMessageSize =
this.defaultMessageStore.getMessageStoreConfig().getMaxMessageSize();
+ if (newMaxMessageSize >= 10 &&
+ putMessageThreadLocal.getEncoder().getMaxMessageBodySize() !=
newMaxMessageSize) {
+
putMessageThreadLocal.getEncoder().updateEncoderBufferCapacity(newMaxMessageSize);
+ }
+ }
+
public CompletableFuture<PutMessageResult> asyncPutMessage(final
MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
@@ -650,6 +659,7 @@ public class CommitLog {
}
PutMessageThreadLocal putMessageThreadLocal =
this.putMessageThreadLocal.get();
+ updateMaxMessageSize(putMessageThreadLocal);
if (!multiDispatch.isMultiDispatchMsg(msg)) {
PutMessageResult encodeResult =
putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null) {
@@ -768,6 +778,7 @@ public class CommitLog {
//fine-grained lock instead of the coarse-grained
PutMessageThreadLocal pmThreadLocal = this.putMessageThreadLocal.get();
+ updateMaxMessageSize(pmThreadLocal);
MessageExtEncoder batchEncoder = pmThreadLocal.getEncoder();
PutMessageContext putMessageContext = new
PutMessageContext(generateKey(pmThreadLocal.getKeyBuilder(), messageExtBatch));
@@ -1479,15 +1490,16 @@ public class CommitLog {
}
public static class MessageExtEncoder {
- private final ByteBuf byteBuf;
+ private ByteBuf byteBuf;
// The maximum length of the message body.
- private final int maxMessageBodySize;
+ private int maxMessageBodySize;
// The maximum length of the full message.
- private final int maxMessageSize;
+ private int maxMessageSize;
MessageExtEncoder(final int maxMessageBodySize) {
ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
//Reserve 64kb for encoding buffer outside body
- int maxMessageSize = maxMessageBodySize + 64 * 1024;
+ int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64
* 1024 ?
+ maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE;
byteBuf = alloc.directBuffer(maxMessageSize);
this.maxMessageBodySize = maxMessageBodySize;
this.maxMessageSize = maxMessageSize;
@@ -1692,6 +1704,18 @@ public class CommitLog {
public ByteBuffer getEncoderBuffer() {
return this.byteBuf.nioBuffer();
}
+
+ public int getMaxMessageBodySize() {
+ return this.maxMessageBodySize;
+ }
+
+ public void updateEncoderBufferCapacity(int newMaxMessageBodySize) {
+ this.maxMessageBodySize = newMaxMessageBodySize;
+ //Reserve 64kb for encoding buffer outside body
+ this.maxMessageSize = Integer.MAX_VALUE - newMaxMessageBodySize >=
64 * 1024 ?
+ this.maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE;
+ this.byteBuf.capacity(this.maxMessageSize);
+ }
}
static class PutMessageThreadLocal {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
index e74b6ea9e..3ae3ac612 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
@@ -116,8 +116,18 @@ public class MultiDispatch {
}
}
+ public void updateMaxMessageSize(CommitLog.PutMessageThreadLocal
putMessageThreadLocal) {
+ int newMaxMessageSize =
this.messageStore.getMessageStoreConfig().getMaxMessageSize();
+ if (newMaxMessageSize >= 10 &&
+ putMessageThreadLocal.getEncoder().getMaxMessageBodySize() !=
newMaxMessageSize) {
+
putMessageThreadLocal.getEncoder().updateEncoderBufferCapacity(newMaxMessageSize);
+ }
+ }
+
private boolean rebuildMsgInner(MessageExtBrokerInner msgInner) {
- MessageExtEncoder encoder =
this.commitLog.getPutMessageThreadLocal().get().getEncoder();
+ CommitLog.PutMessageThreadLocal putMessageThreadLocal =
this.commitLog.getPutMessageThreadLocal().get();
+ updateMaxMessageSize(putMessageThreadLocal);
+ MessageExtEncoder encoder = putMessageThreadLocal.getEncoder();
PutMessageResult encodeResult = encoder.encode(msgInner);
if (encodeResult != null) {
LOGGER.error("rebuild msgInner for multiDispatch", encodeResult);
diff --git
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 0cabd18b0..491437d80 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -696,6 +696,28 @@ public class DefaultMessageStoreTest {
assertTrue(encodeResult5.getPutMessageStatus() ==
PutMessageStatus.MESSAGE_ILLEGAL);
}
+ @Test
+ public void testDynamicMaxMessageSize(){
+ MessageExtBrokerInner messageExtBrokerInner = buildMessage();
+ MessageStoreConfig messageStoreConfig = ((DefaultMessageStore)
messageStore).getMessageStoreConfig();
+ int originMaxMessageSize = messageStoreConfig.getMaxMessageSize();
+
+ messageExtBrokerInner.setBody(new byte[originMaxMessageSize + 10]);
+ PutMessageResult putMessageResult =
messageStore.putMessage(messageExtBrokerInner);
+ assertTrue(putMessageResult.getPutMessageStatus() ==
PutMessageStatus.MESSAGE_ILLEGAL);
+
+ int newMaxMessageSize = originMaxMessageSize + 10;
+ messageStoreConfig.setMaxMessageSize(newMaxMessageSize);
+ putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+ assertTrue(putMessageResult.getPutMessageStatus() ==
PutMessageStatus.PUT_OK);
+
+ messageStoreConfig.setMaxMessageSize(10);
+ putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+ assertTrue(putMessageResult.getPutMessageStatus() ==
PutMessageStatus.MESSAGE_ILLEGAL);
+
+ messageStoreConfig.setMaxMessageSize(originMaxMessageSize);
+ }
+
private class MyMessageArrivingListener implements MessageArrivingListener
{
@Override
public void arriving(String topic, int queueId, long logicOffset, long
tagsCode, long msgStoreTime,
diff --git
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
index 88637db2d..a1f8c803e 100644
---
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
@@ -382,5 +382,4 @@ public class DLedgerCommitlogTest extends
MessageStoreTestBase {
followerStore.shutdown();
}
-
}