This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new ead3d90501 [ISSUE #7511] Lock granularity issue causing LMQ message
loss (#7525)
ead3d90501 is described below
commit ead3d905016d9db4785a46beaa555c7fafd4f9bb
Author: Dongyuan Pan <[email protected]>
AuthorDate: Wed Nov 8 10:40:52 2023 +0800
[ISSUE #7511] Lock granularity issue causing LMQ message loss (#7525)
* bug fix: assignOffset and increaseOffset in LMQ has concurrency issues in
topicQueueLock, should be in putMessageLock
* fix MultiDispatchTest
* fix MultiDispatchTest
* fix unit test
---
.../common/message/MessageExtBrokerInner.java | 10 ++
.../java/org/apache/rocketmq/store/CommitLog.java | 94 ++++++++++++++--
.../org/apache/rocketmq/store/ConsumeQueue.java | 44 +-------
.../apache/rocketmq/store/DefaultMessageStore.java | 1 -
.../apache/rocketmq/store/MessageExtEncoder.java | 118 ++++++++++++++++++---
.../org/apache/rocketmq/store/MultiDispatch.java | 77 ++++++++++++++
.../store/queue/AbstractConsumeQueueStore.java | 10 ++
.../store/queue/ConsumeQueueInterface.java | 1 -
.../store/queue/ConsumeQueueStoreInterface.java | 14 +++
...{MultiDispatch.java => MultiDispatchUtils.java} | 17 +--
.../rocketmq/store/queue/QueueOffsetOperator.java | 6 +-
.../rocketmq/store/queue/RocksDBConsumeQueue.java | 42 --------
.../apache/rocketmq/store/AppendCallbackTest.java | 6 +-
.../apache/rocketmq/store/AppendPropCRCTest.java | 5 +-
.../apache/rocketmq/store/MultiDispatchTest.java | 12 +--
.../rocketmq/store/kv/CompactionLogTest.java | 2 +-
16 files changed, 322 insertions(+), 137 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
index 52501dbca0..147f23f123 100644
---
a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
+++
b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
@@ -28,6 +28,8 @@ public class MessageExtBrokerInner extends MessageExt {
private ByteBuffer encodedBuff;
+ private volatile boolean encodeCompleted;
+
private MessageVersion version = MessageVersion.MESSAGE_VERSION_V1;
public ByteBuffer getEncodedBuff() {
@@ -92,4 +94,12 @@ public class MessageExtBrokerInner extends MessageExt {
this.setPropertiesString(MessageDecoder.messageProperties2String(this.getProperties()));
}
}
+
+ public boolean isEncodeCompleted() {
+ return encodeCompleted;
+ }
+
+ public void setEncodeCompleted(boolean encodeCompleted) {
+ this.encodeCompleted = encodeCompleted;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 6c3afde70f..35c1d0e2d7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -35,6 +35,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.sun.jna.NativeLong;
import com.sun.jna.Pointer;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock;
@@ -56,6 +57,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.MessageExtEncoder.PutMessageThreadLocal;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
import org.apache.rocketmq.store.logfile.MappedFile;
@@ -101,6 +103,7 @@ public class CommitLog implements Swappable {
protected int commitLogSize;
private final boolean enabledAppendPropCRC;
+ protected final MultiDispatch multiDispatch;
public CommitLog(final DefaultMessageStore messageStore) {
String storePath =
messageStore.getMessageStoreConfig().getStorePathCommitLog();
@@ -119,13 +122,11 @@ public class CommitLog implements Swappable {
this.flushManager = new DefaultFlushManager();
this.coldDataCheckService = new ColdDataCheckService();
- this.appendMessageCallback = new DefaultAppendMessageCallback();
+ this.appendMessageCallback = new
DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig());
putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
@Override
protected PutMessageThreadLocal initialValue() {
- return new PutMessageThreadLocal(
-
defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(),
-
defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC());
+ return new
PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig());
}
};
this.putMessageLock =
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new
PutMessageReentrantLock() : new PutMessageSpinLock();
@@ -137,6 +138,8 @@ public class CommitLog implements Swappable {
this.commitLogSize =
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
this.enabledAppendPropCRC =
messageStore.getMessageStoreConfig().isEnabledAppendPropCRC();
+
+ this.multiDispatch = new MultiDispatch(defaultMessageStore);
}
public void setFullStorePaths(Set<String> fullStorePaths) {
@@ -1830,15 +1833,84 @@ public class CommitLog implements Swappable {
// Store the message content
private final ByteBuffer msgStoreItemMemory;
private final int crc32ReservedLength = CommitLog.CRC32_RESERVED_LEN;
+ private final MessageStoreConfig messageStoreConfig;
- DefaultAppendMessageCallback() {
+ DefaultAppendMessageCallback(MessageStoreConfig messageStoreConfig) {
this.msgStoreItemMemory =
ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
+ this.messageStoreConfig = messageStoreConfig;
+ }
+
+ public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer
preEncodeBuffer, final MessageExtBrokerInner msgInner) {
+ if (msgInner.isEncodeCompleted()) {
+ return null;
+ }
+
+ multiDispatch.wrapMultiDispatch(msgInner);
+
+
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+
+ final byte[] propertiesData =
+ msgInner.getPropertiesString() == null ? null :
msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
+
+ boolean needAppendLastPropertySeparator = enabledAppendPropCRC &&
propertiesData != null && propertiesData.length > 0
+ && propertiesData[propertiesData.length - 1] !=
MessageDecoder.PROPERTY_SEPARATOR;
+
+ final int propertiesLength = (propertiesData == null ? 0 :
propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) +
crc32ReservedLength;
+
+ if (propertiesLength > Short.MAX_VALUE) {
+ log.warn("putMessage message properties length too long.
length={}", propertiesData.length);
+ return new
AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
+ }
+
+ int msgLenWithoutProperties = preEncodeBuffer.getInt(0);
+
+ int msgLen = msgLenWithoutProperties + 2 + propertiesLength;
+
+ // Exceeds the maximum message
+ if (msgLen > this.messageStoreConfig.getMaxMessageSize()) {
+ log.warn("message size exceeded, msg total size: " + msgLen +
", maxMessageSize: " + this.messageStoreConfig.getMaxMessageSize());
+ return new
AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
+ }
+
+ // Back filling total message length
+ preEncodeBuffer.putInt(0, msgLen);
+ // Modify position to msgLenWithoutProperties
+ preEncodeBuffer.position(msgLenWithoutProperties);
+
+ preEncodeBuffer.putShort((short) propertiesLength);
+
+ if (propertiesLength > crc32ReservedLength) {
+ preEncodeBuffer.put(propertiesData);
+ }
+
+ if (needAppendLastPropertySeparator) {
+ preEncodeBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR);
+ }
+ // 18 CRC32
+ preEncodeBuffer.position(preEncodeBuffer.position() +
crc32ReservedLength);
+
+ msgInner.setEncodeCompleted(true);
+
+ return null;
}
public AppendMessageResult doAppend(final long fileFromOffset, final
ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner, PutMessageContext
putMessageContext) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
+ ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
+ boolean isMultiDispatchMsg =
messageStoreConfig.isEnableMultiDispatch() &&
CommitLog.isMultiDispatchMsg(msgInner);
+ if (isMultiDispatchMsg) {
+ AppendMessageResult appendMessageResult =
handlePropertiesForLmqMsg(preEncodeBuffer, msgInner);
+ if (appendMessageResult != null) {
+ return appendMessageResult;
+ }
+ }
+
+ final int msgLen = preEncodeBuffer.getInt(0);
+ preEncodeBuffer.position(0);
+ preEncodeBuffer.limit(msgLen);
+
// PHY OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position();
@@ -1872,9 +1944,6 @@ public class CommitLog implements Swappable {
break;
}
- ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
- final int msgLen = preEncodeBuffer.getInt(0);
-
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.msgStoreItemMemory.clear();
@@ -1919,6 +1988,11 @@ public class CommitLog implements Swappable {
byteBuffer.put(preEncodeBuffer);
CommitLog.this.getMessageStore().getPerfCounter().endTick("WRITE_MEMORY_TIME_MS");
msgInner.setEncodedBuff(null);
+
+ if (isMultiDispatchMsg) {
+ CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
+ }
+
return new AppendMessageResult(AppendMessageStatus.PUT_OK,
wroteOffset, msgLen, msgIdSupplier,
msgInner.getStoreTimestamp(), queueOffset,
CommitLog.this.defaultMessageStore.now() - beginTimeMills, messageNum);
}
@@ -2159,6 +2233,10 @@ public class CommitLog implements Swappable {
return flushManager;
}
+ public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) {
+ return
StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
&& !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
+ }
+
private boolean isCloseReadAhead() {
return !MixAll.isWindows() &&
!defaultMessageStore.getMessageStoreConfig().isDataReadAheadEnable();
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 623509c8bf..453c9d1dc7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -27,7 +27,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -38,7 +37,7 @@ import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
-import org.apache.rocketmq.store.queue.MultiDispatch;
+import org.apache.rocketmq.store.queue.MultiDispatchUtils;
import org.apache.rocketmq.store.queue.QueueOffsetOperator;
import org.apache.rocketmq.store.queue.ReferredIterator;
@@ -702,7 +701,7 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
- if
(MultiDispatch.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(),
request)) {
+ if
(MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(),
request)) {
multiDispatchLmqQueue(request, maxRetries);
}
return;
@@ -776,28 +775,6 @@ public class ConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCycle {
String topicQueueKey = getTopic() + "-" + getQueueId();
long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey);
msg.setQueueOffset(queueOffset);
-
-
- // Handling the multi dispatch message. In the context of a light
message queue (as defined in RIP-28),
- // light message queues are constructed based on message properties,
which requires special handling of queue offset of the light message queue.
- if
(!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(),
msg.getTopic())) {
- return;
- }
- String multiDispatchQueue =
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
- if (StringUtils.isBlank(multiDispatchQueue)) {
- return;
- }
- String[] queues =
multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
- Long[] queueOffsets = new Long[queues.length];
- for (int i = 0; i < queues.length; i++) {
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() &&
MixAll.isLmq(queues[i])) {
- String key = MultiDispatch.lmqQueueKey(queues[i]);
- queueOffsets[i] = queueOffsetOperator.getLmqOffset(key);
- }
- }
- MessageAccessor.putProperty(msg,
MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
- StringUtils.join(queueOffsets,
MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
- msg.removeWaitStorePropertyString();
}
@Override
@@ -805,23 +782,6 @@ public class ConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCycle {
short messageNum) {
String topicQueueKey = getTopic() + "-" + getQueueId();
queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
-
- // Handling the multi dispatch message. In the context of a light
message queue (as defined in RIP-28),
- // light message queues are constructed based on message properties,
which requires special handling of queue offset of the light message queue.
- if
(!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(),
msg.getTopic())) {
- return;
- }
- String multiDispatchQueue =
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
- if (StringUtils.isBlank(multiDispatchQueue)) {
- return;
- }
- String[] queues =
multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
- for (int i = 0; i < queues.length; i++) {
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() &&
MixAll.isLmq(queues[i])) {
- String key = MultiDispatch.lmqQueueKey(queues[i]);
- queueOffsetOperator.increaseLmqOffset(key, (short) 1);
- }
- }
}
private boolean putMessagePositionInfo(final long offset, final int size,
final long tagsCode,
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 99a54e2d7f..dc5f312e5a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2112,7 +2112,6 @@ public class DefaultMessageStore implements MessageStore {
}
}
-
@Override
public void increaseOffset(MessageExtBrokerInner msg, short messageNum) {
final int tranType =
MessageSysFlag.getTransactionValue(msg.getSysFlag());
diff --git
a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
index c1d8087285..20e9a652b7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageVersion;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
public class MessageExtEncoder {
protected static final Logger log =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -38,20 +39,22 @@ public class MessageExtEncoder {
// The maximum length of the full message.
private int maxMessageSize;
private final int crc32ReservedLength;
+ private MessageStoreConfig messageStoreConfig;
- public MessageExtEncoder(final int maxMessageBodySize) {
- this(maxMessageBodySize, false);
+ public MessageExtEncoder(final int maxMessageBodySize, final
MessageStoreConfig messageStoreConfig) {
+ this(messageStoreConfig);
}
- public MessageExtEncoder(final int maxMessageBodySize, boolean
enabledAppendPropCRC) {
+ public MessageExtEncoder(final MessageStoreConfig messageStoreConfig) {
ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
+ this.messageStoreConfig = messageStoreConfig;
+ this.maxMessageBodySize = messageStoreConfig.getMaxMessageSize();
//Reserve 64kb for encoding buffer outside body
int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 *
1024 ?
maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE;
byteBuf = alloc.directBuffer(maxMessageSize);
- this.maxMessageBodySize = maxMessageBodySize;
this.maxMessageSize = maxMessageSize;
- this.crc32ReservedLength = enabledAppendPropCRC ?
CommitLog.CRC32_RESERVED_LEN : 0;
+ this.crc32ReservedLength = messageStoreConfig.isEnabledAppendPropCRC()
? CommitLog.CRC32_RESERVED_LEN : 0;
}
public static int calMsgLength(MessageVersion messageVersion,
@@ -79,8 +82,103 @@ public class MessageExtEncoder {
+ 2 + (Math.max(propertiesLength, 0)); //propertiesLength
}
+ public static int calMsgLengthNoProperties(MessageVersion messageVersion,
+ int sysFlag, int bodyLength,
int topicLength) {
+
+ int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0
? 8 : 20;
+ int storehostAddressLength = (sysFlag &
MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
+
+ return 4 //TOTALSIZE
+ + 4 //MAGICCODE
+ + 4 //BODYCRC
+ + 4 //QUEUEID
+ + 4 //FLAG
+ + 8 //QUEUEOFFSET
+ + 8 //PHYSICALOFFSET
+ + 4 //SYSFLAG
+ + 8 //BORNTIMESTAMP
+ + bornhostLength //BORNHOST
+ + 8 //STORETIMESTAMP
+ + storehostAddressLength //STOREHOSTADDRESS
+ + 4 //RECONSUMETIMES
+ + 8 //Prepared Transaction Offset
+ + 4 + (Math.max(bodyLength, 0)) //BODY
+ + messageVersion.getTopicLengthSize() + topicLength; //TOPIC
+ }
+
+ public PutMessageResult encodeWithoutProperties(MessageExtBrokerInner
msgInner) {
+
+ final byte[] topicData =
msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+ final int topicLength = topicData.length;
+
+ final int bodyLength = msgInner.getBody() == null ? 0 :
msgInner.getBody().length;
+
+ // Exceeds the maximum message body
+ if (bodyLength > this.maxMessageBodySize) {
+ CommitLog.log.warn("message body size exceeded, msg body size: " +
bodyLength
+ + ", maxMessageSize: " + this.maxMessageBodySize);
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
null);
+ }
+
+ final int msgLenNoProperties =
calMsgLengthNoProperties(msgInner.getVersion(), msgInner.getSysFlag(),
bodyLength, topicLength);
+
+ // 1 TOTALSIZE
+ this.byteBuf.writeInt(msgLenNoProperties);
+ // 2 MAGICCODE
+ this.byteBuf.writeInt(msgInner.getVersion().getMagicCode());
+ // 3 BODYCRC
+ this.byteBuf.writeInt(msgInner.getBodyCRC());
+ // 4 QUEUEID
+ this.byteBuf.writeInt(msgInner.getQueueId());
+ // 5 FLAG
+ this.byteBuf.writeInt(msgInner.getFlag());
+ // 6 QUEUEOFFSET, need update later
+ this.byteBuf.writeLong(0);
+ // 7 PHYSICALOFFSET, need update later
+ this.byteBuf.writeLong(0);
+ // 8 SYSFLAG
+ this.byteBuf.writeInt(msgInner.getSysFlag());
+ // 9 BORNTIMESTAMP
+ this.byteBuf.writeLong(msgInner.getBornTimestamp());
+
+ // 10 BORNHOST
+ ByteBuffer bornHostBytes = msgInner.getBornHostBytes();
+ this.byteBuf.writeBytes(bornHostBytes.array());
+
+ // 11 STORETIMESTAMP
+ this.byteBuf.writeLong(msgInner.getStoreTimestamp());
+
+ // 12 STOREHOSTADDRESS
+ ByteBuffer storeHostBytes = msgInner.getStoreHostBytes();
+ this.byteBuf.writeBytes(storeHostBytes.array());
+
+ // 13 RECONSUMETIMES
+ this.byteBuf.writeInt(msgInner.getReconsumeTimes());
+ // 14 Prepared Transaction Offset
+ this.byteBuf.writeLong(msgInner.getPreparedTransactionOffset());
+ // 15 BODY
+ this.byteBuf.writeInt(bodyLength);
+ if (bodyLength > 0)
+ this.byteBuf.writeBytes(msgInner.getBody());
+
+ // 16 TOPIC
+ if (MessageVersion.MESSAGE_VERSION_V2.equals(msgInner.getVersion())) {
+ this.byteBuf.writeShort((short) topicLength);
+ } else {
+ this.byteBuf.writeByte((byte) topicLength);
+ }
+ this.byteBuf.writeBytes(topicData);
+
+ return null;
+ }
+
public PutMessageResult encode(MessageExtBrokerInner msgInner) {
this.byteBuf.clear();
+
+ if (messageStoreConfig.isEnableMultiDispatch() &&
CommitLog.isMultiDispatchMsg(msgInner)) {
+ return encodeWithoutProperties(msgInner);
+ }
+
/**
* Serialize message
*/
@@ -303,7 +401,7 @@ public class MessageExtEncoder {
}
public ByteBuffer getEncoderBuffer() {
- return this.byteBuf.nioBuffer();
+ return this.byteBuf.nioBuffer(0, this.byteBuf.capacity());
}
public int getMaxMessageBodySize() {
@@ -322,12 +420,8 @@ public class MessageExtEncoder {
private final MessageExtEncoder encoder;
private final StringBuilder keyBuilder;
- PutMessageThreadLocal(int size) {
- this(size, false);
- }
-
- PutMessageThreadLocal(int size, boolean enabledAppendPropCRC) {
- encoder = new MessageExtEncoder(size, enabledAppendPropCRC);
+ PutMessageThreadLocal(MessageStoreConfig messageStoreConfig) {
+ encoder = new MessageExtEncoder(messageStoreConfig);
keyBuilder = new StringBuilder();
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
new file mode 100644
index 0000000000..5bc587a8e0
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+
+/**
+ * MultiDispatch for lmq, not-thread-safe
+ */
+public class MultiDispatch {
+ private final StringBuilder keyBuilder = new StringBuilder();
+ private final DefaultMessageStore messageStore;
+ private static final short VALUE_OF_EACH_INCREMENT = 1;
+
+ public MultiDispatch(DefaultMessageStore messageStore) {
+ this.messageStore = messageStore;
+ }
+
+ public String queueKey(String queueName, MessageExtBrokerInner msgInner) {
+ keyBuilder.delete(0, keyBuilder.length());
+ keyBuilder.append(queueName);
+ keyBuilder.append('-');
+ int queueId = msgInner.getQueueId();
+ if (messageStore.getMessageStoreConfig().isEnableLmq() &&
MixAll.isLmq(queueName)) {
+ queueId = 0;
+ }
+ keyBuilder.append(queueId);
+ return keyBuilder.toString();
+ }
+
+ public void wrapMultiDispatch(final MessageExtBrokerInner msg) {
+
+ String multiDispatchQueue =
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+ String[] queues =
multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+ Long[] queueOffsets = new Long[queues.length];
+ if (messageStore.getMessageStoreConfig().isEnableLmq()) {
+ for (int i = 0; i < queues.length; i++) {
+ String key = queueKey(queues[i], msg);
+ if (MixAll.isLmq(key)) {
+ queueOffsets[i] =
messageStore.getQueueStore().getLmqQueueOffset(key);
+ }
+ }
+ }
+ MessageAccessor.putProperty(msg,
MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
+ StringUtils.join(queueOffsets,
MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
+ msg.removeWaitStorePropertyString();
+ }
+
+ public void updateMultiQueueOffset(final MessageExtBrokerInner msgInner) {
+ String multiDispatchQueue =
msgInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+ String[] queues =
multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+ for (String queue : queues) {
+ String key = queueKey(queue, msgInner);
+ if (messageStore.getMessageStoreConfig().isEnableLmq() &&
MixAll.isLmq(key)) {
+ messageStore.getQueueStore().increaseLmqOffset(key,
VALUE_OF_EACH_INCREMENT);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
index 30054fa509..d76b055773 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
@@ -74,6 +74,16 @@ public abstract class AbstractConsumeQueueStore implements
ConsumeQueueStoreInte
consumeQueue.increaseQueueOffset(this.queueOffsetOperator, msg,
messageNum);
}
+ @Override
+ public void increaseLmqOffset(String queueKey, short messageNum) {
+ queueOffsetOperator.increaseLmqOffset(queueKey, messageNum);
+ }
+
+ @Override
+ public long getLmqQueueOffset(String queueKey) {
+ return queueOffsetOperator.getLmqOffset(queueKey);
+ }
+
@Override
public void removeTopicQueueTable(String topic, Integer queueId) {
this.queueOffsetOperator.remove(topic, queueId);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index c65f2a68b0..768c782b1d 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -181,7 +181,6 @@ public interface ConsumeQueueInterface extends
FileQueueLifeCycle {
*/
void assignQueueOffset(QueueOffsetOperator queueOffsetAssigner,
MessageExtBrokerInner msg) throws RocksDBException;
-
/**
* Increase queue offset.
* @param queueOffsetAssigner the delegated queue offset assigner
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
index 268803dcca..e68880a828 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
@@ -183,6 +183,20 @@ public interface ConsumeQueueStoreInterface {
*/
void increaseQueueOffset(MessageExtBrokerInner msg, short messageNum);
+ /**
+ * Increase lmq offset
+ * @param queueKey
+ * @param messageNum
+ */
+ void increaseLmqOffset(String queueKey, short messageNum);
+
+ /**
+ * get lmq queue offset
+ * @param queueKey
+ * @return
+ */
+ long getLmqQueueOffset(String queueKey);
+
/**
* recover topicQueue table by minPhyOffset
* @param minPhyOffset
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java
b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java
similarity index 78%
rename from
store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java
rename to
store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java
index d6291d9087..44397a2fce 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java
@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.store.queue;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
@@ -27,7 +25,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.config.MessageStoreConfig;
-public class MultiDispatch {
+public class MultiDispatchUtils {
public static String lmqQueueKey(String queueName) {
StringBuilder keyBuilder = new StringBuilder();
@@ -60,17 +58,4 @@ public class MultiDispatch {
}
return true;
}
-
- public static List<DispatchRequest>
checkMultiDispatchQueue(MessageStoreConfig messageStoreConfig,
List<DispatchRequest> dispatchRequests) {
- if (!messageStoreConfig.isEnableMultiDispatch() || dispatchRequests ==
null || dispatchRequests.size() == 0) {
- return null;
- }
- List<DispatchRequest> result = new ArrayList<>();
- for (DispatchRequest dispatchRequest : dispatchRequests) {
- if (checkMultiDispatchQueue(messageStoreConfig, dispatchRequest)) {
- result.add(dispatchRequest);
- }
- }
- return dispatchRequests;
- }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
index 8da3748281..5b4bf994e0 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
@@ -71,9 +71,9 @@ public class QueueOffsetOperator {
return this.lmqTopicQueueTable.get(topicQueueKey);
}
- public void increaseLmqOffset(String topicQueueKey, short messageNum) {
- Long lmqOffset =
ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey,
k -> 0L);
- this.lmqTopicQueueTable.put(topicQueueKey, lmqOffset + messageNum);
+ public void increaseLmqOffset(String queueKey, short messageNum) {
+ Long lmqOffset =
ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, queueKey, k ->
0L);
+ this.lmqTopicQueueTable.put(queueKey, lmqOffset + messageNum);
}
public long currentQueueOffset(String topicQueueKey) {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
index 759be395d5..5a981bb4df 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
@@ -19,14 +19,10 @@ package org.apache.rocketmq.store.queue;
import java.nio.ByteBuffer;
import java.util.List;
-import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BoundaryType;
-import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -217,50 +213,12 @@ public class RocksDBConsumeQueue implements
ConsumeQueueInterface {
queueOffsetOperator.updateQueueOffset(topicQueueKey, queueOffset);
}
msg.setQueueOffset(queueOffset);
-
- // Handling the multi dispatch message. In the context of a light
message queue (as defined in RIP-28),
- // light message queues are constructed based on message properties,
which requires special handling of queue offset of the light message queue.
- if
(!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(),
msg.getTopic())) {
- return;
- }
- String multiDispatchQueue =
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
- if (StringUtils.isBlank(multiDispatchQueue)) {
- return;
- }
- String[] queues =
multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
- Long[] queueOffsets = new Long[queues.length];
- for (int i = 0; i < queues.length; i++) {
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() &&
MixAll.isLmq(queues[i])) {
- String key = MultiDispatch.lmqQueueKey(queues[i]);
- queueOffsets[i] =
queueOffsetOperator.getLmqTopicQueueNextOffset(key);
- }
- }
- MessageAccessor.putProperty(msg,
MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
- StringUtils.join(queueOffsets,
MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
- msg.removeWaitStorePropertyString();
}
@Override
public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator,
MessageExtBrokerInner msg, short messageNum) {
String topicQueueKey = getTopic() + "-" + getQueueId();
queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
-
- // Handling the multi dispatch message. In the context of a light
message queue (as defined in RIP-28),
- // light message queues are constructed based on message properties,
which requires special handling of queue offset of the light message queue.
- if
(!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(),
msg.getTopic())) {
- return;
- }
- String multiDispatchQueue =
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
- if (StringUtils.isBlank(multiDispatchQueue)) {
- return;
- }
- String[] queues =
multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
- for (int i = 0; i < queues.length; i++) {
- if (this.messageStore.getMessageStoreConfig().isEnableLmq() &&
MixAll.isLmq(queues[i])) {
- String key = MultiDispatch.lmqQueueKey(queues[i]);
- queueOffsetOperator.increaseLmqOffset(key, (short) 1);
- }
- }
}
@Override
diff --git
a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
index 87bfe85da2..3748571496 100644
--- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
@@ -44,7 +44,7 @@ public class AppendCallbackTest {
AppendMessageCallback callback;
- MessageExtEncoder batchEncoder = new MessageExtEncoder(10 * 1024 * 1024);
+ MessageExtEncoder batchEncoder;
@Before
public void init() throws Exception {
@@ -53,12 +53,14 @@ public class AppendCallbackTest {
messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4);
messageStoreConfig.setMaxHashSlotNum(100);
messageStoreConfig.setMaxIndexNum(100 * 10);
+ messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024);
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") +
File.separator + "unitteststore");
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") +
File.separator + "unitteststore" + File.separator + "commitlog");
//too much reference
DefaultMessageStore messageStore = new
DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new
ConcurrentHashMap<>());
CommitLog commitLog = new CommitLog(messageStore);
- callback = commitLog.new DefaultAppendMessageCallback();
+ callback = commitLog.new
DefaultAppendMessageCallback(messageStoreConfig);
+ batchEncoder = new MessageExtEncoder(messageStoreConfig);
}
@After
diff --git
a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
index c8ed4d74db..d882fc9d9b 100644
--- a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
@@ -56,6 +56,7 @@ public class AppendPropCRCTest {
messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4);
messageStoreConfig.setMaxHashSlotNum(100);
messageStoreConfig.setMaxIndexNum(100 * 10);
+ messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024);
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") +
File.separator + "unitteststore");
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") +
File.separator + "unitteststore" + File.separator + "commitlog");
messageStoreConfig.setForceVerifyPropCRC(true);
@@ -63,8 +64,8 @@ public class AppendPropCRCTest {
//too much reference
DefaultMessageStore messageStore = new
DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new
ConcurrentHashMap<>());
commitLog = new CommitLog(messageStore);
- encoder = new MessageExtEncoder(10 * 1024 * 1024, true);
- callback = commitLog.new DefaultAppendMessageCallback();
+ encoder = new MessageExtEncoder(messageStoreConfig);
+ callback = commitLog.new
DefaultAppendMessageCallback(messageStoreConfig);
}
@After
diff --git
a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
index 2447bbf68f..eae5eaa07a 100644
--- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
@@ -28,20 +28,19 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.queue.MultiDispatch;
+import org.apache.rocketmq.store.queue.MultiDispatchUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.rocksdb.RocksDBException;
-import static
org.apache.rocketmq.store.config.StorePathConfigHelper.getStorePathConsumeQueue;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MultiDispatchTest {
- private ConsumeQueue consumeQueue;
+ private MultiDispatch multiDispatch;
private DefaultMessageStore messageStore;
@@ -61,8 +60,7 @@ public class MultiDispatchTest {
BrokerConfig brokerConfig = new BrokerConfig();
//too much reference
messageStore = new DefaultMessageStore(messageStoreConfig, null, null,
brokerConfig, new ConcurrentHashMap<>());
- consumeQueue = new ConsumeQueue("xxx", 0,
-
getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()),
messageStoreConfig.getMappedFileSizeConsumeQueue(), messageStore);
+ multiDispatch = new MultiDispatch(messageStore);
}
@After
@@ -74,14 +72,14 @@ public class MultiDispatchTest {
public void lmqQueueKey() {
MessageExtBrokerInner messageExtBrokerInner =
mock(MessageExtBrokerInner.class);
when(messageExtBrokerInner.getQueueId()).thenReturn(2);
- String ret = MultiDispatch.lmqQueueKey("%LMQ%lmq123");
+ String ret = MultiDispatchUtils.lmqQueueKey("%LMQ%lmq123");
assertEquals(ret, "%LMQ%lmq123-0");
}
@Test
public void wrapMultiDispatch() throws RocksDBException {
MessageExtBrokerInner messageExtBrokerInner = buildMessageMultiQueue();
- messageStore.assignOffset(messageExtBrokerInner);
+ multiDispatch.wrapMultiDispatch(messageExtBrokerInner);
assertEquals(messageExtBrokerInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET),
"0,0");
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
index df3c31c6ed..e113b18f1e 100644
--- a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
@@ -86,7 +86,7 @@ public class CompactionLogTest {
int compactionCqFileSize = 1024;
- private static MessageExtEncoder encoder = new MessageExtEncoder(1024);
+ private static MessageExtEncoder encoder = new MessageExtEncoder(1024, new
MessageStoreConfig());
private static SocketAddress storeHost;
private static SocketAddress bornHost;