This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 280804c559 [ISSUE #8693] Fix checking MultiDispatchMessage when
appending commitlog
280804c559 is described below
commit 280804c5592341f92a43b6d72ec6e94db77b74ac
Author: Liu Shengzhong <[email protected]>
AuthorDate: Wed Sep 18 13:57:20 2024 +0800
[ISSUE #8693] Fix checking MultiDispatchMessage when appending commitlog
---
store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 8 +++++---
.../main/java/org/apache/rocketmq/store/MessageExtEncoder.java | 2 +-
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index f34c6944c9..972e71aadd 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -61,6 +61,7 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.queue.MultiDispatchUtils;
import org.apache.rocketmq.store.util.LibC;
import org.rocksdb.RocksDBException;
@@ -1903,7 +1904,7 @@ public class CommitLog implements Swappable {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
- boolean isMultiDispatchMsg =
messageStoreConfig.isEnableMultiDispatch() &&
CommitLog.isMultiDispatchMsg(msgInner);
+ final boolean isMultiDispatchMsg =
CommitLog.isMultiDispatchMsg(messageStoreConfig, msgInner);
if (isMultiDispatchMsg) {
AppendMessageResult appendMessageResult =
handlePropertiesForLmqMsg(preEncodeBuffer, msgInner);
if (appendMessageResult != null) {
@@ -2244,8 +2245,9 @@ public class CommitLog implements Swappable {
return flushManager;
}
- public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) {
- return
StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
&& !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
+ public static boolean isMultiDispatchMsg(MessageStoreConfig
messageStoreConfig, MessageExtBrokerInner msg) {
+ return
StringUtils.isNotBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
&&
+ MultiDispatchUtils.isNeedHandleMultiDispatch(messageStoreConfig,
msg.getTopic());
}
private boolean isCloseReadAhead() {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
index 20e9a652b7..5c74918d9e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
@@ -175,7 +175,7 @@ public class MessageExtEncoder {
public PutMessageResult encode(MessageExtBrokerInner msgInner) {
this.byteBuf.clear();
- if (messageStoreConfig.isEnableMultiDispatch() &&
CommitLog.isMultiDispatchMsg(msgInner)) {
+ if (CommitLog.isMultiDispatchMsg(messageStoreConfig, msgInner)) {
return encodeWithoutProperties(msgInner);
}