This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 1b7ca30a51e65e3d2c4d3b0ef8e64788788d6dca Author: gosonzhang <[email protected]> AuthorDate: Tue Jan 12 14:30:06 2021 +0800 [TUBEMQ-509] Adjust the packet length check when data is loaded --- .../apache/tubemq/server/broker/metadata/ClusterConfigHolder.java | 8 -------- .../apache/tubemq/server/broker/msgstore/disk/FileSegment.java | 5 ++--- .../apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java | 3 +-- .../org/apache/tubemq/server/broker/utils/DataStoreUtils.java | 7 +++---- 4 files changed, 6 insertions(+), 17 deletions(-) diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java index c72427d..134e2ef 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/metadata/ClusterConfigHolder.java @@ -33,10 +33,6 @@ public class ClusterConfigHolder { + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE); private static AtomicInteger minMemCacheSize = new AtomicInteger(TBaseConstants.META_MIN_MEM_BUFFER_SIZE); - private static AtomicInteger maxMsgStoreLength = - new AtomicInteger(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE - + TBaseConstants.META_MAX_MESSAGE_HEADER_SIZE - + DataStoreUtils.STORE_DATA_HEADER_LEN); public ClusterConfigHolder() { @@ -58,7 +54,6 @@ public class ClusterConfigHolder { maxMsgSize.set(tmpMaxSize); minMemCacheSize.set(tmpMaxSize + (tmpMaxSize % 4 + 1) * TBaseConstants.META_MESSAGE_SIZE_ADJUST); - maxMsgStoreLength.set(tmpMaxSize + DataStoreUtils.STORE_DATA_HEADER_LEN); } } } @@ -76,7 +71,4 @@ public class ClusterConfigHolder { return minMemCacheSize.get(); } - public static int getMaxMsgStoreLength() { - return maxMsgStoreLength.get(); - } } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java index 6fda352..949d149 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/FileSegment.java @@ -25,7 +25,6 @@ import java.nio.channels.FileChannel; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.tubemq.corebase.utils.CheckSum; -import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder; import org.apache.tubemq.server.broker.utils.DataStoreUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -366,7 +365,7 @@ public class FileSegment implements Segment { itemNext = validBytes + DataStoreUtils.STORE_DATA_HEADER_LEN + itemMsglen; if ((itemMsgToken != DataStoreUtils.STORE_DATA_TOKER_BEGIN_VALUE) || (itemMsglen <= 0) - || (itemMsglen > ClusterConfigHolder.getMaxMsgSize()) + || (itemMsglen > DataStoreUtils.MAX_MSG_DATA_STORE_SIZE) || (itemNext > totalBytes)) { next = -1; break; @@ -438,7 +437,7 @@ public class FileSegment implements Segment { if ((itemMsgPartId < 0) || (itemMsgOffset < 0) || (itemMsglen <= 0) - || (itemMsglen > ClusterConfigHolder.getMaxMsgStoreLength()) + || (itemMsglen > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN) || (itemNext > totalBytes)) { next = -1; break; diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java index fe23dd3..400b666 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java @@ -35,7 +35,6 @@ import org.apache.tubemq.corebase.TErrCodeConstants; import org.apache.tubemq.corebase.protobuf.generated.ClientBroker; import org.apache.tubemq.corebase.utils.ServiceStatusHolder; import org.apache.tubemq.server.broker.BrokerConfig; -import org.apache.tubemq.server.broker.metadata.ClusterConfigHolder; import org.apache.tubemq.server.broker.msgstore.MessageStore; import org.apache.tubemq.server.broker.stats.CountItem; import org.apache.tubemq.server.broker.utils.DataStoreUtils; @@ -274,7 +273,7 @@ public class MsgFileStore implements Closeable { // skip when mismatch condition if (curIndexDataOffset < 0 || curIndexDataSize <= 0 - || curIndexDataSize > ClusterConfigHolder.getMaxMsgStoreLength() + || curIndexDataSize > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN || curIndexDataOffset < curDataMinOffset) { readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN; continue; diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/DataStoreUtils.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/DataStoreUtils.java index 6f0fa13..6d79bf4 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/DataStoreUtils.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/DataStoreUtils.java @@ -49,9 +49,6 @@ public class DataStoreUtils { // + data 0 // public static final int MAX_MSG_TRANSFER_SIZE = 1024 * 1024; - public static final int MAX_MSG_DATA_STORE_SIZE = - TBaseConstants.META_MAX_MESSAGE_DATA_SIZE * 2; - public static final int MAX_READ_BUFFER_ADJUST = MAX_MSG_DATA_STORE_SIZE * 10; public static final int STORE_DATA_PREFX_LEN = 48; public static final int STORE_DATA_HEADER_LEN = STORE_DATA_PREFX_LEN + 4; @@ -85,7 +82,9 @@ public class DataStoreUtils { public static final int INDEX_POS_KEY_CODE = 16; public static final int INDEX_POS_TIME_RECV = 20; - + public static final int MAX_MSG_DATA_STORE_SIZE = + TBaseConstants.META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT + + TBaseConstants.META_MB_UNIT_SIZE * 8; public static final int STORE_MAX_MESSAGE_STORE_LEN = STORE_DATA_HEADER_LEN + MAX_MSG_DATA_STORE_SIZE;
