This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 91349f30b9 [ISSUE #7437] Add the CRC check of commitlog (#7468)
91349f30b9 is described below
commit 91349f30b96db2e16b71d65a535d81f11b60bda5
Author: guyinyou <[email protected]>
AuthorDate: Wed Oct 25 14:54:00 2023 +0800
[ISSUE #7437] Add the CRC check of commitlog (#7468)
* Added CRC32 check for full data
* add unit test
* add MessageExtBrokerInnerTest
* fix codestyle
* fix codestyle
---------
Co-authored-by: guyinyou <[email protected]>
---
.../java/org/apache/rocketmq/common/UtilAll.java | 14 ++
.../rocketmq/common/message/MessageConst.java | 2 +
.../rocketmq/common/message/MessageDecoder.java | 32 +++-
.../common/message/MessageExtBrokerInner.java | 49 +++++
.../rocketmq/common/MessageExtBrokerInnerTest.java | 93 ++++++++++
.../java/org/apache/rocketmq/store/CommitLog.java | 87 ++++++++-
.../apache/rocketmq/store/MessageExtEncoder.java | 38 +++-
.../rocketmq/store/config/MessageStoreConfig.java | 23 +++
.../apache/rocketmq/store/AppendPropCRCTest.java | 200 +++++++++++++++++++++
.../apache/rocketmq/store/BatchPutMessageTest.java | 2 +-
.../rocketmq/store/MessageExtBrokerInnerTest.java | 105 +++++++++++
.../store/ha/autoswitch/AutoSwitchHATest.java | 2 +-
.../file/CompositeQueueFlatFileTest.java | 2 +-
.../tieredstore/util/MessageBufferUtilTest.java | 2 +-
14 files changed, 630 insertions(+), 21 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index d2b7c374b7..95b6b09b41 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -307,6 +307,20 @@ public class UtilAll {
return (int) (crc32.getValue() & 0x7FFFFFFF);
}
+ public static int crc32(ByteBuffer byteBuffer) {
+ CRC32 crc32 = new CRC32();
+ crc32.update(byteBuffer);
+ return (int) (crc32.getValue() & 0x7FFFFFFF);
+ }
+
+ public static int crc32(ByteBuffer[] byteBuffers) {
+ CRC32 crc32 = new CRC32();
+ for (ByteBuffer buffer : byteBuffers) {
+ crc32.update(buffer);
+ }
+ return (int) (crc32.getValue() & 0x7FFFFFFF);
+ }
+
public static String bytes2string(byte[] src) {
char[] hexChars = new char[src.length * 2];
for (int j = 0; j < src.length; j++) {
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index 87fed7c192..24f7bdb99a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -97,6 +97,7 @@ public class MessageConst {
public static final String PROPERTY_TIMER_DEL_UNIQKEY =
"TIMER_DEL_UNIQKEY";
public static final String PROPERTY_TIMER_DELAY_LEVEL =
"TIMER_DELAY_LEVEL";
public static final String PROPERTY_TIMER_DELAY_MS = "TIMER_DELAY_MS";
+ public static final String PROPERTY_CRC32 = "__CRC32#";
/**
* properties for DLQ
@@ -155,5 +156,6 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP);
STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_TOPIC);
STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
+ STRING_HASH_SET.add(PROPERTY_CRC32);
}
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index 6de0b69fb7..b053f82759 100644
---
a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++
b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.common.message;
+import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
@@ -152,6 +153,34 @@ public class MessageDecoder {
return null;
}
+ public static void createCrc32(final ByteBuffer input, int crc32) {
+
input.put(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8));
+ input.put((byte) NAME_VALUE_SEPARATOR);
+ for (int i = 0; i < 10; i++) {
+ byte b = '0';
+ if (crc32 > 0) {
+ b += (byte) (crc32 % 10);
+ crc32 /= 10;
+ }
+ input.put(b);
+ }
+ input.put((byte) PROPERTY_SEPARATOR);
+ }
+
+ public static void createCrc32(final ByteBuf input, int crc32) {
+
input.writeBytes(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8));
+ input.writeByte((byte) NAME_VALUE_SEPARATOR);
+ for (int i = 0; i < 10; i++) {
+ byte b = '0';
+ if (crc32 > 0) {
+ b += (byte) (crc32 % 10);
+ crc32 /= 10;
+ }
+ input.writeByte(b);
+ }
+ input.writeByte((byte) PROPERTY_SEPARATOR);
+ }
+
public static MessageExt decode(ByteBuffer byteBuffer) {
return decode(byteBuffer, true, true, false);
}
@@ -601,9 +630,6 @@ public class MessageDecoder {
sb.append(value);
sb.append(PROPERTY_SEPARATOR);
}
- if (sb.length() > 0) {
- sb.deleteCharAt(sb.length() - 1);
- }
return sb.toString();
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
index 91599653c5..4e5d3419a3 100644
---
a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
+++
b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
@@ -20,6 +20,9 @@ import java.nio.ByteBuffer;
import org.apache.rocketmq.common.TopicFilterType;
+import static
org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
+import static
org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
+
public class MessageExtBrokerInner extends MessageExt {
private static final long serialVersionUID = 7256001576878700634L;
private String propertiesString;
@@ -55,6 +58,52 @@ public class MessageExtBrokerInner extends MessageExt {
this.propertiesString = propertiesString;
}
+
+ public void deleteProperty(String name) {
+ super.clearProperty(name);
+ if (propertiesString != null) {
+ int idx0 = 0;
+ int idx1;
+ int idx2;
+ idx1 = propertiesString.indexOf(name, idx0);
+ if (idx1 != -1) {
+ // cropping may be required
+ StringBuilder stringBuilder = new
StringBuilder(propertiesString.length());
+ while (true) {
+ int startIdx = idx0;
+ while (true) {
+ idx1 = propertiesString.indexOf(name, startIdx);
+ if (idx1 == -1) {
+ break;
+ }
+ startIdx = idx1 + name.length();
+ if (idx1 == 0 || propertiesString.charAt(idx1 - 1) ==
PROPERTY_SEPARATOR) {
+ if (propertiesString.length() > idx1 +
name.length()
+ && propertiesString.charAt(idx1 +
name.length()) == NAME_VALUE_SEPARATOR) {
+ break;
+ }
+ }
+ }
+ if (idx1 == -1) {
+ // there are no characters that need to be skipped.
Append all remaining characters.
+ stringBuilder.append(propertiesString, idx0,
propertiesString.length());
+ break;
+ }
+ // there are characters that need to be cropped
+ stringBuilder.append(propertiesString, idx0, idx1);
+ // move idx2 to the end of the cropped character
+ idx2 = propertiesString.indexOf(PROPERTY_SEPARATOR, idx1 +
name.length() + 1);
+ // all subsequent characters will be cropped
+ if (idx2 == -1) {
+ break;
+ }
+ idx0 = idx2 + 1;
+ }
+ this.setPropertiesString(stringBuilder.toString());
+ }
+ }
+ }
+
public long getTagsCode() {
return tagsCode;
}
diff --git
a/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java
b/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java
new file mode 100644
index 0000000000..77d69e5ad7
--- /dev/null
+++
b/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MessageExtBrokerInnerTest {
+ @Test
+ public void testDeleteProperty() {
+ MessageExtBrokerInner messageExtBrokerInner = new
MessageExtBrokerInner();
+ String propertiesString = "";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+ propertiesString =
"KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB");
+
+ propertiesString =
"KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA");
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 93102799b7..3d3ee86b8f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -73,6 +73,10 @@ public class CommitLog implements Swappable {
protected static final Logger log =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// End of file empty MAGIC CODE cbd43194
public final static int BLANK_MAGIC_CODE = -875286124;
+ /**
+ * CRC32 Format: [PROPERTY_CRC32 + NAME_VALUE_SEPARATOR + 10-digit
fixed-length string + PROPERTY_SEPARATOR]
+ */
+ public static final int CRC32_RESERVED_LEN =
MessageConst.PROPERTY_CRC32.length() + 1 + 10 + 1;
protected final MappedFileQueue mappedFileQueue;
protected final DefaultMessageStore defaultMessageStore;
@@ -96,6 +100,8 @@ public class CommitLog implements Swappable {
protected int commitLogSize;
+ private final boolean enabledAppendPropCRC;
+
public CommitLog(final DefaultMessageStore messageStore) {
String storePath =
messageStore.getMessageStoreConfig().getStorePathCommitLog();
if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) {
@@ -117,7 +123,9 @@ public class CommitLog implements Swappable {
putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
@Override
protected PutMessageThreadLocal initialValue() {
- return new
PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
+ return new PutMessageThreadLocal(
+
defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(),
+
defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC());
}
};
this.putMessageLock =
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new
PutMessageReentrantLock() : new PutMessageSpinLock();
@@ -127,6 +135,8 @@ public class CommitLog implements Swappable {
this.topicQueueLock = new
TopicQueueLock(messageStore.getMessageStoreConfig().getTopicQueueLockNum());
this.commitLogSize =
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
+
+ this.enabledAppendPropCRC =
messageStore.getMessageStoreConfig().isEnabledAppendPropCRC();
}
public void setFullStorePaths(Set<String> fullStorePaths) {
@@ -470,10 +480,16 @@ public class CommitLog implements Swappable {
byteBuffer.get(bytesContent, 0, bodyLen);
if (checkCRC) {
- int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
- if (crc != bodyCRC) {
- log.warn("CRC check failed. bodyCRC={},
currentCRC={}", crc, bodyCRC);
- return new DispatchRequest(-1, false/* success */);
+ /**
+ * When the forceVerifyPropCRC = false,
+ * use original bodyCrc validation.
+ */
+ if
(!this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) {
+ int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
+ if (crc != bodyCRC) {
+ log.warn("CRC check failed. bodyCRC={},
currentCRC={}", crc, bodyCRC);
+ return new DispatchRequest(-1, false/* success
*/);
+ }
}
}
} else {
@@ -531,6 +547,43 @@ public class CommitLog implements Swappable {
}
}
+ if (checkCRC) {
+ /**
+ * When the forceVerifyPropCRC = true,
+ * Crc verification needs to be performed on the entire
message data (excluding the length reserved at the tail)
+ */
+ if
(this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) {
+ int expectedCRC = -1;
+ if (propertiesMap != null) {
+ String crc32Str =
propertiesMap.get(MessageConst.PROPERTY_CRC32);
+ if (crc32Str != null) {
+ expectedCRC = 0;
+ for (int i = crc32Str.length() - 1; i >= 0; i--) {
+ int num = crc32Str.charAt(i) - '0';
+ expectedCRC *= 10;
+ expectedCRC += num;
+ }
+ }
+ }
+ if (expectedCRC > 0) {
+ ByteBuffer tmpBuffer = byteBuffer.duplicate();
+ tmpBuffer.position(tmpBuffer.position() - totalSize);
+ tmpBuffer.limit(tmpBuffer.position() + totalSize -
CommitLog.CRC32_RESERVED_LEN);
+ int crc = UtilAll.crc32(tmpBuffer);
+ if (crc != expectedCRC) {
+ log.warn(
+ "CommitLog#checkAndDispatchMessage: failed to
check message CRC, expected "
+ + "CRC={}, actual CRC={}", bodyCRC, crc);
+ return new DispatchRequest(-1, false/* success */);
+ }
+ } else {
+ log.warn(
+ "CommitLog#checkAndDispatchMessage: failed to
check message CRC, not found CRC in properties");
+ return new DispatchRequest(-1, false/* success */);
+ }
+ }
+ }
+
int readLength = MessageExtEncoder.calMsgLength(messageVersion,
sysFlag, bodyLen, topicLen, propertiesLength);
if (totalSize != readLength) {
doNothingForDeadCode(reconsumeTimes);
@@ -846,9 +899,12 @@ public class CommitLog implements Swappable {
if
(!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
msg.setStoreTimestamp(System.currentTimeMillis());
}
-
// Set the message body CRC (consider the most appropriate setting on
the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
+ if (enabledAppendPropCRC) {
+ // delete crc32 properties if exist
+ msg.deleteProperty(MessageConst.PROPERTY_CRC32);
+ }
// Back to Results
AppendMessageResult result = null;
@@ -1764,6 +1820,7 @@ public class CommitLog implements Swappable {
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
// Store the message content
private final ByteBuffer msgStoreItemMemory;
+ private final int crc32ReservedLength = CommitLog.CRC32_RESERVED_LEN;
DefaultAppendMessageCallback() {
this.msgStoreItemMemory =
ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
@@ -1837,6 +1894,15 @@ public class CommitLog implements Swappable {
pos += 8 + 4 + 8 + ipLen;
// refresh store time stamp in lock
preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
+ if (enabledAppendPropCRC) {
+ // 18 CRC32
+ int checkSize = msgLen - crc32ReservedLength;
+ ByteBuffer tmpBuffer = preEncodeBuffer.duplicate();
+ tmpBuffer.limit(tmpBuffer.position() + checkSize);
+ int crc32 = UtilAll.crc32(tmpBuffer);
+ tmpBuffer.limit(tmpBuffer.position() + crc32ReservedLength);
+ MessageDecoder.createCrc32(tmpBuffer, crc32);
+ }
final long beginTimeMills =
CommitLog.this.defaultMessageStore.now();
CommitLog.this.getMessageStore().getPerfCounter().startTick("WRITE_MEMORY_TIME_MS");
@@ -1918,6 +1984,15 @@ public class CommitLog implements Swappable {
pos += 8 + 4 + 8 + bornHostLength;
// refresh store time stamp in lock
messagesByteBuff.putLong(pos,
messageExtBatch.getStoreTimestamp());
+ if (enabledAppendPropCRC) {
+ //append crc32
+ int checkSize = msgLen - crc32ReservedLength;
+ ByteBuffer tmpBuffer = messagesByteBuff.duplicate();
+ tmpBuffer.position(msgPos).limit(msgPos + checkSize);
+ int crc32 = UtilAll.crc32(tmpBuffer);
+ messagesByteBuff.position(msgPos + checkSize);
+ MessageDecoder.createCrc32(messagesByteBuff, crc32);
+ }
putMessageContext.getPhyPos()[index++] = wroteOffset +
totalMsgLen - msgLen;
queueOffset++;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
index ee609a337b..c1d8087285 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.store;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
+import java.nio.ByteBuffer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -29,8 +30,6 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-
public class MessageExtEncoder {
protected static final Logger log =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private ByteBuf byteBuf;
@@ -38,7 +37,13 @@ public class MessageExtEncoder {
private int maxMessageBodySize;
// The maximum length of the full message.
private int maxMessageSize;
+ private final int crc32ReservedLength;
+
public MessageExtEncoder(final int maxMessageBodySize) {
+ this(maxMessageBodySize, false);
+ }
+
+ public MessageExtEncoder(final int maxMessageBodySize, boolean
enabledAppendPropCRC) {
ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
//Reserve 64kb for encoding buffer outside body
int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 *
1024 ?
@@ -46,6 +51,7 @@ public class MessageExtEncoder {
byteBuf = alloc.directBuffer(maxMessageSize);
this.maxMessageBodySize = maxMessageBodySize;
this.maxMessageSize = maxMessageSize;
+ this.crc32ReservedLength = enabledAppendPropCRC ?
CommitLog.CRC32_RESERVED_LEN : 0;
}
public static int calMsgLength(MessageVersion messageVersion,
@@ -81,10 +87,13 @@ public class MessageExtEncoder {
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null :
msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
- final int propertiesLength = propertiesData == null ? 0 :
propertiesData.length;
+ boolean needAppendLastPropertySeparator = crc32ReservedLength > 0 &&
propertiesData != null && propertiesData.length > 0
+ && propertiesData[propertiesData.length - 1] !=
MessageDecoder.PROPERTY_SEPARATOR;
+
+ final int propertiesLength = (propertiesData == null ? 0 :
propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) +
crc32ReservedLength;
if (propertiesLength > Short.MAX_VALUE) {
- log.warn("putMessage message properties length too long.
length={}", propertiesData.length);
+ log.warn("putMessage message properties length too long.
length={}", propertiesLength);
return new
PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
@@ -160,8 +169,14 @@ public class MessageExtEncoder {
// 17 PROPERTIES
this.byteBuf.writeShort((short) propertiesLength);
- if (propertiesLength > 0)
+ if (propertiesLength > crc32ReservedLength) {
this.byteBuf.writeBytes(propertiesData);
+ }
+ if (needAppendLastPropertySeparator) {
+ this.byteBuf.writeByte((byte) MessageDecoder.PROPERTY_SEPARATOR);
+ }
+ // 18 CRC32
+ this.byteBuf.writerIndex(this.byteBuf.writerIndex() +
crc32ReservedLength);
return null;
}
@@ -213,10 +228,11 @@ public class MessageExtEncoder {
final byte[] topicData =
messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
- final int topicLengthSize =
messageExtBatch.getVersion().getTopicLengthSize();
int totalPropLen = needAppendLastPropertySeparator ?
- propertiesLen + batchPropLen + topicLengthSize : propertiesLen
+ batchPropLen;
+ propertiesLen + batchPropLen + 1 : propertiesLen +
batchPropLen;
+ // properties need to add crc32
+ totalPropLen += crc32ReservedLength;
final int msgLen = calMsgLength(
messageExtBatch.getVersion(), messageExtBatch.getSysFlag(),
bodyLen, topicLength, totalPropLen);
@@ -278,6 +294,7 @@ public class MessageExtEncoder {
}
this.byteBuf.writeBytes(batchPropData, 0, batchPropLen);
}
+ this.byteBuf.writerIndex(this.byteBuf.writerIndex() +
crc32ReservedLength);
}
putMessageContext.setBatchSize(batchSize);
putMessageContext.setPhyPos(new long[batchSize]);
@@ -304,8 +321,13 @@ public class MessageExtEncoder {
static class PutMessageThreadLocal {
private final MessageExtEncoder encoder;
private final StringBuilder keyBuilder;
+
PutMessageThreadLocal(int size) {
- encoder = new MessageExtEncoder(size);
+ this(size, false);
+ }
+
+ PutMessageThreadLocal(int size, boolean enabledAppendPropCRC) {
+ encoder = new MessageExtEncoder(size, enabledAppendPropCRC);
keyBuilder = new StringBuilder();
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 028facbdc6..8cb3ea6e9e 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -270,6 +270,12 @@ public class MessageStoreConfig {
*/
private boolean autoMessageVersionOnTopicLen = true;
+ /**
+ * It cannot be changed after the broker is started.
+ * Modifications need to be restarted to take effect.
+ */
+ private boolean enabledAppendPropCRC = false;
+ private boolean forceVerifyPropCRC = false;
private int travelCqFileNumWhenGetMessage = 1;
// Sleep interval between to corrections
private int correctLogicMinOffsetSleepInterval = 1;
@@ -405,6 +411,14 @@ public class MessageStoreConfig {
private int topicQueueLockNum = 32;
+ public boolean isEnabledAppendPropCRC() {
+ return enabledAppendPropCRC;
+ }
+
+ public void setEnabledAppendPropCRC(boolean enabledAppendPropCRC) {
+ this.enabledAppendPropCRC = enabledAppendPropCRC;
+ }
+
public boolean isDebugLockEnable() {
return debugLockEnable;
}
@@ -640,6 +654,15 @@ public class MessageStoreConfig {
this.checkCRCOnRecover = checkCRCOnRecover;
}
+ public boolean isForceVerifyPropCRC() {
+ return forceVerifyPropCRC;
+ }
+
+ public void setForceVerifyPropCRC(boolean forceVerifyPropCRC) {
+ this.forceVerifyPropCRC = forceVerifyPropCRC;
+ }
+
+
public String getStorePathCommitLog() {
if (storePathCommitLog == null) {
return storePathRootDir + File.separator + "commitlog";
diff --git
a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
new file mode 100644
index 0000000000..c8ed4d74db
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AppendPropCRCTest {
+
+ AppendMessageCallback callback;
+
+ MessageExtEncoder encoder;
+
+ CommitLog commitLog;
+
+ @Before
+ public void init() throws Exception {
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setMappedFileSizeCommitLog(1024 * 8);
+ messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4);
+ messageStoreConfig.setMaxHashSlotNum(100);
+ messageStoreConfig.setMaxIndexNum(100 * 10);
+
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") +
File.separator + "unitteststore");
+
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") +
File.separator + "unitteststore" + File.separator + "commitlog");
+ messageStoreConfig.setForceVerifyPropCRC(true);
+ messageStoreConfig.setEnabledAppendPropCRC(true);
+ //too much reference
+ DefaultMessageStore messageStore = new
DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new
ConcurrentHashMap<>());
+ commitLog = new CommitLog(messageStore);
+ encoder = new MessageExtEncoder(10 * 1024 * 1024, true);
+ callback = commitLog.new DefaultAppendMessageCallback();
+ }
+
+ @After
+ public void destroy() {
+ UtilAll.deleteFile(new File(System.getProperty("user.home") +
File.separator + "unitteststore"));
+ }
+
+ @Test
+ public void testAppendMessageSucc() throws Exception {
+ String topic = "test-topic";
+ int queue = 0;
+ int msgNum = 10;
+ int propertiesLen = 0;
+ Message msg = new Message();
+ msg.setBody("body".getBytes());
+ msg.setTopic(topic);
+ msg.setTags("abc");
+ msg.putUserProperty("a", "aaaaaaaa");
+ msg.putUserProperty("b", "bbbbbbbb");
+ msg.putUserProperty("c", "cccccccc");
+ msg.putUserProperty("d", "dddddddd");
+ msg.putUserProperty("e", "eeeeeeee");
+ msg.putUserProperty("f", "ffffffff");
+
+ MessageExtBrokerInner messageExtBrokerInner = new
MessageExtBrokerInner();
+ messageExtBrokerInner.setTopic(topic);
+ messageExtBrokerInner.setQueueId(queue);
+ messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis());
+ messageExtBrokerInner.setBornHost(new InetSocketAddress("127.0.0.1",
123));
+ messageExtBrokerInner.setStoreHost(new InetSocketAddress("127.0.0.1",
124));
+ messageExtBrokerInner.setBody(msg.getBody());
+
messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+ propertiesLen = messageExtBrokerInner.getPropertiesString().length();
+
+ ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
+ for (int i = 0; i < msgNum; i++) {
+ encoder.encode(messageExtBrokerInner);
+ messageExtBrokerInner.setEncodedBuff(encoder.getEncoderBuffer());
+ AppendMessageResult allresult = callback.doAppend(0, buff, 1024 *
10, messageExtBrokerInner, null);
+ assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
+ }
+ // Expected to pass when message is not modified
+ buff.flip();
+ for (int i = 0; i < msgNum - 1; i++) {
+ DispatchRequest request =
commitLog.checkMessageAndReturnSize(buff, true, false);
+ assertTrue(request.isSuccess());
+ }
+ // Modify the properties of the last message and expect the
verification to fail.
+ int idx = buff.limit() - (propertiesLen / 2);
+ buff.put(idx, (byte) (buff.get(idx) + 1));
+ DispatchRequest request = commitLog.checkMessageAndReturnSize(buff,
true, false);
+ assertFalse(request.isSuccess());
+ }
+
+ @Test
+ public void testAppendMessageBatchSucc() throws Exception {
+ List<Message> messages = new ArrayList<>();
+ String topic = "test-topic";
+ int queue = 0;
+ int propertiesLen = 0;
+ for (int i = 0; i < 10; i++) {
+ Message msg = new Message();
+ msg.setBody("body".getBytes());
+ msg.setTopic(topic);
+ msg.setTags("abc");
+ msg.putUserProperty("a", "aaaaaaaa");
+ msg.putUserProperty("b", "bbbbbbbb");
+ msg.putUserProperty("c", "cccccccc");
+ msg.putUserProperty("d", "dddddddd");
+ msg.putUserProperty("e", "eeeeeeee");
+ msg.putUserProperty("f", "ffffffff");
+ String propertiesString =
MessageDecoder.messageProperties2String(msg.getProperties());
+ propertiesLen = propertiesString.length();
+ messages.add(msg);
+ }
+ MessageExtBatch messageExtBatch = new MessageExtBatch();
+ messageExtBatch.setTopic(topic);
+ messageExtBatch.setQueueId(queue);
+ messageExtBatch.setBornTimestamp(System.currentTimeMillis());
+ messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 123));
+ messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
+ messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
+
+ PutMessageContext putMessageContext = new PutMessageContext(topic +
"-" + queue);
+ messageExtBatch.setEncodedBuff(encoder.encode(messageExtBatch,
putMessageContext));
+ ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
+ //encounter end of file when append half of the data
+ AppendMessageResult allresult =
+ callback.doAppend(0, buff, 1024 * 10, messageExtBatch,
putMessageContext);
+
+ assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
+ assertEquals(0, allresult.getWroteOffset());
+ assertEquals(0, allresult.getLogicsOffset());
+ assertEquals(buff.position(), allresult.getWroteBytes());
+
+ assertEquals(messages.size(), allresult.getMsgNum());
+
+ Set<String> msgIds = new HashSet<>();
+ for (String msgId : allresult.getMsgId().split(",")) {
+ assertEquals(32, msgId.length());
+ msgIds.add(msgId);
+ }
+ assertEquals(messages.size(), msgIds.size());
+
+ List<MessageExt> decodeMsgs = MessageDecoder.decodes((ByteBuffer)
buff.flip());
+ assertEquals(decodeMsgs.size(), decodeMsgs.size());
+ long queueOffset = decodeMsgs.get(0).getQueueOffset();
+ long storeTimeStamp = decodeMsgs.get(0).getStoreTimestamp();
+ for (int i = 0; i < messages.size(); i++) {
+ assertEquals(messages.get(i).getTopic(),
decodeMsgs.get(i).getTopic());
+ assertEquals(new String(messages.get(i).getBody()), new
String(decodeMsgs.get(i).getBody()));
+ assertEquals(messages.get(i).getTags(),
decodeMsgs.get(i).getTags());
+
+ assertEquals(messageExtBatch.getBornHostNameString(),
decodeMsgs.get(i).getBornHostNameString());
+
+ assertEquals(messageExtBatch.getBornTimestamp(),
decodeMsgs.get(i).getBornTimestamp());
+ assertEquals(storeTimeStamp,
decodeMsgs.get(i).getStoreTimestamp());
+ assertEquals(queueOffset++, decodeMsgs.get(i).getQueueOffset());
+ }
+
+ // Expected to pass when message is not modified
+ buff.flip();
+ for (int i = 0; i < messages.size() - 1; i++) {
+ DispatchRequest request =
commitLog.checkMessageAndReturnSize(buff, true, false);
+ assertTrue(request.isSuccess());
+ }
+ // Modify the properties of the last message and expect the
verification to fail.
+ int idx = buff.limit() - (propertiesLen / 2);
+ buff.put(idx, (byte) (buff.get(idx) + 1));
+ DispatchRequest request = commitLog.checkMessageAndReturnSize(buff,
true, false);
+ assertFalse(request.isSuccess());
+ }
+}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
index 43ca38eb48..768029ca1a 100644
--- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
@@ -108,7 +108,7 @@ public class BatchPutMessageTest {
short propertiesLength = (short) propertiesBytes.length;
final byte[] topicData =
msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
- msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength,
propertiesLength + batchPropLen + 1) + msgLengthArr[j - 1];
+ msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength,
propertiesLength + batchPropLen) + msgLengthArr[j - 1];
j++;
}
byte[] batchMessageBody = MessageDecoder.encodeMessages(messages);
diff --git
a/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java
b/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java
new file mode 100644
index 0000000000..415dc38117
--- /dev/null
+++
b/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MessageExtBrokerInnerTest {
+ @Test
+ public void testDeleteProperty() {
+ MessageExtBrokerInner messageExtBrokerInner = new
MessageExtBrokerInner();
+ String propertiesString = "";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+ propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+ propertiesString =
"KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB");
+
+ propertiesString =
"KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001");
+
+ propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("KeyA");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA");
+
+ propertiesString = "__CRC32#\u0001";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("__CRC32#");
+ assertThat(messageExtBrokerInner.getPropertiesString()).isEmpty();
+
+ propertiesString = "__CRC32#";
+ messageExtBrokerInner.setPropertiesString(propertiesString);
+ messageExtBrokerInner.deleteProperty("__CRC32#");
+
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(propertiesString);
+ }
+
+}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index db5c5af4cd..7d659d2f6a 100644
---
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -465,7 +465,7 @@ public class AutoSwitchHATest {
// Step2: check flag SynchronizingSyncStateSet
Assert.assertTrue(masterHAService.isSynchronizingSyncStateSet());
- Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1570);
+ Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1580);
Set<Long> syncStateSet = masterHAService.getSyncStateSet();
Assert.assertEquals(syncStateSet.size(), 2);
Assert.assertTrue(syncStateSet.contains(1L));
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
index 2e028ada32..5884243048 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
@@ -74,7 +74,7 @@ public class CompositeQueueFlatFileTest {
ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer();
AppendResult result = flatFile.appendCommitLog(message);
Assert.assertEquals(AppendResult.SUCCESS, result);
- Assert.assertEquals(122L,
flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
+ Assert.assertEquals(123L,
flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
Assert.assertEquals(0L,
flatFile.commitLog.getFlatFile().getFileToWrite().getCommitPosition());
flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq);
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
index 1f38d4f6c5..a413f2113e 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
@@ -47,7 +47,7 @@ public class MessageBufferUtilTest {
+ 8 //Prepared Transaction Offset
+ 4 + 0 //BODY
+ 2 + 0 //TOPIC
- + 2 + 30 //properties
+ + 2 + 31 //properties
+ 0;
public static ByteBuffer buildMockedMessageBuffer() {