This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e3b8178871 [ISSUE #6570] Fix the issue that expectLogicOffset is 
greater than currentLogicOffset in consumeQueue build when the message is 
illegal  (#6641)
e3b8178871 is described below

commit e3b8178871158e1a981915ac40c11e3a5e451922
Author: rongtong <[email protected]>
AuthorDate: Wed Apr 26 11:24:47 2023 +0800

    [ISSUE #6570] Fix the issue that expectLogicOffset is greater than 
currentLogicOffset in consumeQueue build when the message is illegal  (#6641)
    
    * Fix the issue that expectLogicOffset is greater than currentLogicOffset 
in consumeQueue build when the message is illegal
    
    * Add new UT
    
    * Fix bug that UT can not pass
    
    * Polish the variable name
    
    * Polish the comment
    
    * Add more comments
---
 .../java/org/apache/rocketmq/store/CommitLog.java  | 13 ++++-
 .../org/apache/rocketmq/store/ConsumeQueue.java    | 59 ++++++++++++++++------
 .../apache/rocketmq/store/DefaultMessageStore.java | 14 ++++-
 .../org/apache/rocketmq/store/MessageStore.java    | 11 +++-
 .../rocketmq/store/dledger/DLedgerCommitLog.java   |  9 +++-
 .../store/plugin/AbstractPluginMessageStore.java   |  9 +++-
 .../rocketmq/store/queue/BatchConsumeQueue.java    | 29 +++++++----
 .../store/queue/ConsumeQueueInterface.java         | 10 +++-
 .../rocketmq/store/queue/ConsumeQueueStore.java    | 27 ++++++----
 ...ffsetAssigner.java => QueueOffsetOperator.java} | 40 ++++++++-------
 .../rocketmq/store/DefaultMessageStoreTest.java    | 33 ++++++++++++
 .../apache/rocketmq/store/MultiDispatchTest.java   |  2 +-
 .../command/broker/GetBrokerConfigCommand.java     |  2 +-
 .../consumer/GetConsumerConfigSubCommand.java      |  2 +-
 14 files changed, 192 insertions(+), 68 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index d7e141d31c..75b4042dc3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -824,7 +824,7 @@ public class CommitLog implements Swappable {
                 needAssignOffset = false;
             }
             if (needAssignOffset) {
-                defaultMessageStore.assignOffset(msg, getMessageNum(msg));
+                defaultMessageStore.assignOffset(msg);
             }
 
             PutMessageResult encodeResult = 
putMessageThreadLocal.getEncoder().encode(msg);
@@ -892,6 +892,10 @@ public class CommitLog implements Swappable {
             } finally {
                 putMessageLock.unlock();
             }
+            // Increase queue offset when messages are successfully written
+            if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
+                this.defaultMessageStore.increaseOffset(msg, 
getMessageNum(msg));
+            }
         } finally {
             topicQueueLock.unlock(topicQueueKey);
         }
@@ -990,7 +994,7 @@ public class CommitLog implements Swappable {
 
         topicQueueLock.lock(topicQueueKey);
         try {
-            defaultMessageStore.assignOffset(messageExtBatch, (short) 
putMessageContext.getBatchSize());
+            defaultMessageStore.assignOffset(messageExtBatch);
 
             putMessageLock.lock();
             try {
@@ -1041,6 +1045,11 @@ public class CommitLog implements Swappable {
             } finally {
                 putMessageLock.unlock();
             }
+
+            // Increase queue offset when messages are successfully written
+            if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
+                this.defaultMessageStore.increaseOffset(messageExtBatch, 
(short) putMessageContext.getBatchSize());
+            }
         } finally {
             topicQueueLock.unlock(topicQueueKey);
         }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index d1c24ee35f..78d083e2cb 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -38,7 +38,7 @@ import org.apache.rocketmq.store.logfile.MappedFile;
 import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
-import org.apache.rocketmq.store.queue.QueueOffsetAssigner;
+import org.apache.rocketmq.store.queue.QueueOffsetOperator;
 import org.apache.rocketmq.store.queue.ReferredIterator;
 
 public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle 
{
@@ -54,8 +54,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, 
FileQueueLifeCycle {
      * │                                     Store Unit                        
            │
      * │                                                                       
            │
      * </pre>
-     * ConsumeQueue's store unit. Size:
-     * CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) = 20 Bytes
+     * ConsumeQueue's store unit. Size: CommitLog Physical Offset(8) + Body 
Size(4) + Tag HashCode(8) = 20 Bytes
      */
     public static final int CQ_STORE_UNIT_SIZE = 20;
     public static final int MSG_TAG_OFFSET_INDEX = 12;
@@ -785,13 +784,15 @@ public class ConsumeQueue implements 
ConsumeQueueInterface, FileQueueLifeCycle {
     }
 
     @Override
-    public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, 
MessageExtBrokerInner msg,
-        short messageNum) {
+    public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator, 
MessageExtBrokerInner msg) {
         String topicQueueKey = getTopic() + "-" + getQueueId();
-        long queueOffset = 
queueOffsetAssigner.assignQueueOffset(topicQueueKey, messageNum);
+        long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey);
         msg.setQueueOffset(queueOffset);
-        // For LMQ
-        if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch() || 
msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+
+
+        // Handling the multi dispatch message. In the context of a light 
message queue (as defined in RIP-28),
+        // light message queues are constructed based on message properties, 
which requires special handling of queue offset of the light message queue.
+        if (!isNeedHandleMultiDispatch(msg)) {
             return;
         }
         String multiDispatchQueue = 
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
@@ -803,7 +804,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, 
FileQueueLifeCycle {
         for (int i = 0; i < queues.length; i++) {
             String key = queueKey(queues[i], msg);
             if (messageStore.getMessageStoreConfig().isEnableLmq() && 
MixAll.isLmq(key)) {
-                queueOffsets[i] = queueOffsetAssigner.assignLmqOffset(key, 
(short) 1);
+                queueOffsets[i] = queueOffsetOperator.getLmqOffset(key);
             }
         }
         MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
@@ -811,6 +812,34 @@ public class ConsumeQueue implements 
ConsumeQueueInterface, FileQueueLifeCycle {
         removeWaitStorePropertyString(msg);
     }
 
+    @Override
+    public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, 
MessageExtBrokerInner msg,
+        short messageNum) {
+        String topicQueueKey = getTopic() + "-" + getQueueId();
+        queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
+
+        // Handling the multi dispatch message. In the context of a light 
message queue (as defined in RIP-28),
+        // light message queues are constructed based on message properties, 
which requires special handling of queue offset of the light message queue.
+        if (!isNeedHandleMultiDispatch(msg)) {
+            return;
+        }
+        String multiDispatchQueue = 
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+        if (StringUtils.isBlank(multiDispatchQueue)) {
+            return;
+        }
+        String[] queues = 
multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+        for (int i = 0; i < queues.length; i++) {
+            String key = queueKey(queues[i], msg);
+            if (messageStore.getMessageStoreConfig().isEnableLmq() && 
MixAll.isLmq(key)) {
+                queueOffsetOperator.increaseLmqOffset(key, (short) 1);
+            }
+        }
+    }
+
+    public boolean isNeedHandleMultiDispatch(MessageExtBrokerInner msg) {
+        return messageStore.getMessageStoreConfig().isEnableMultiDispatch() && 
!msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
+    }
+
     public String queueKey(String queueName, MessageExtBrokerInner msgInner) {
         StringBuilder keyBuilder = new StringBuilder();
         keyBuilder.append(queueName);
@@ -968,7 +997,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, 
FileQueueLifeCycle {
         private int relativePos = 0;
 
         public ConsumeQueueIterator(SelectMappedBufferResult sbr) {
-            this.sbr =  sbr;
+            this.sbr = sbr;
             if (sbr != null && sbr.getByteBuffer() != null) {
                 relativePos = sbr.getByteBuffer().position();
             }
@@ -988,11 +1017,11 @@ public class ConsumeQueue implements 
ConsumeQueueInterface, FileQueueLifeCycle {
             if (!hasNext()) {
                 return null;
             }
-            long queueOffset = (sbr.getStartOffset() + 
sbr.getByteBuffer().position() -  relativePos) / CQ_STORE_UNIT_SIZE;
+            long queueOffset = (sbr.getStartOffset() + 
sbr.getByteBuffer().position() - relativePos) / CQ_STORE_UNIT_SIZE;
             CqUnit cqUnit = new CqUnit(queueOffset,
-                    sbr.getByteBuffer().getLong(),
-                    sbr.getByteBuffer().getInt(),
-                    sbr.getByteBuffer().getLong());
+                sbr.getByteBuffer().getLong(),
+                sbr.getByteBuffer().getInt(),
+                sbr.getByteBuffer().getLong());
 
             if (isExtAddr(cqUnit.getTagsCode())) {
                 ConsumeQueueExt.CqExtUnit cqExtUnit = new 
ConsumeQueueExt.CqExtUnit();
@@ -1003,7 +1032,7 @@ public class ConsumeQueue implements 
ConsumeQueueInterface, FileQueueLifeCycle {
                 } else {
                     // can't find ext content.Client will filter messages by 
tag also.
                     log.error("[BUG] can't find consume queue extend file 
content! addr={}, offsetPy={}, sizePy={}, topic={}",
-                            cqUnit.getTagsCode(), cqUnit.getPos(), 
cqUnit.getPos(), getTopic());
+                        cqUnit.getTagsCode(), cqUnit.getPos(), 
cqUnit.getPos(), getTopic());
                 }
             }
             return cqUnit;
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 434aca5430..e1bdc6e711 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2053,11 +2053,21 @@ public class DefaultMessageStore implements 
MessageStore {
     }
 
     @Override
-    public void assignOffset(MessageExtBrokerInner msg, short messageNum) {
+    public void assignOffset(MessageExtBrokerInner msg) {
         final int tranType = 
MessageSysFlag.getTransactionValue(msg.getSysFlag());
 
         if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == 
MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
-            this.consumeQueueStore.assignQueueOffset(msg, messageNum);
+            this.consumeQueueStore.assignQueueOffset(msg);
+        }
+    }
+
+
+    @Override
+    public void increaseOffset(MessageExtBrokerInner msg, short messageNum) {
+        final int tranType = 
MessageSysFlag.getTransactionValue(msg.getSysFlag());
+
+        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == 
MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
+            this.consumeQueueStore.increaseQueueOffset(msg, messageNum);
         }
     }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index a7da245551..3db0c18f7f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -725,13 +725,20 @@ public interface MessageStore {
     boolean isSyncMaster();
 
     /**
-     * Assign an queue offset and increase it. If there is a race condition, 
you need to lock/unlock this method
+     * Assign a message to queue offset. If there is a race condition, you 
need to lock/unlock this method
      * yourself.
      *
      * @param msg        message
+     */
+    void assignOffset(MessageExtBrokerInner msg);
+
+    /**
+     * Increase queue offset in memory table. If there is a race condition, 
you need to lock/unlock this method
+     *
+     * @param msg        message
      * @param messageNum message num
      */
-    void assignOffset(MessageExtBrokerInner msg, short messageNum);
+    void increaseOffset(MessageExtBrokerInner msg, short messageNum);
 
     /**
      * Get master broker message store in process in broker container
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 39906eae09..ec5e86d704 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -435,7 +435,7 @@ public class DLedgerCommitLog extends CommitLog {
         String topicQueueKey = msg.getTopic() + "-" + msg.getQueueId();
         topicQueueLock.lock(topicQueueKey);
         try {
-            defaultMessageStore.assignOffset(msg, getMessageNum(msg));
+            defaultMessageStore.assignOffset(msg);
 
             encodeResult = this.messageSerializer.serialize(msg);
             if (encodeResult.status != AppendMessageStatus.PUT_OK) {
@@ -475,6 +475,8 @@ public class DLedgerCommitLog extends CommitLog {
             if (elapsedTimeInLock > 500) {
                 log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, 
bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, 
appendResult);
             }
+
+            defaultMessageStore.increaseOffset(msg, getMessageNum(msg));
         } finally {
             topicQueueLock.unlock(topicQueueKey);
         }
@@ -556,7 +558,7 @@ public class DLedgerCommitLog extends CommitLog {
         int batchNum = encodeResult.batchData.size();
         topicQueueLock.lock(encodeResult.queueOffsetKey);
         try {
-            defaultMessageStore.assignOffset(messageExtBatch, (short) 
batchNum);
+            defaultMessageStore.assignOffset(messageExtBatch);
 
             putMessageLock.lock(); //spin or ReentrantLock ,depending on store 
config
             msgIdBuilder.setLength(0);
@@ -616,6 +618,9 @@ public class DLedgerCommitLog extends CommitLog {
                 log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, 
bodyLength={} AppendMessageResult={}",
                     elapsedTimeInLock, messageExtBatch.getBody().length, 
appendResult);
             }
+
+            defaultMessageStore.increaseOffset(messageExtBatch, (short) 
batchNum);
+
         } finally {
             topicQueueLock.unlock(encodeResult.queueOffsetKey);
         }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 89c3e53b6b..25e947512f 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -588,8 +588,13 @@ public abstract class AbstractPluginMessageStore 
implements MessageStore {
     }
 
     @Override
-    public void assignOffset(MessageExtBrokerInner msg, short messageNum) {
-        next.assignOffset(msg, messageNum);
+    public void assignOffset(MessageExtBrokerInner msg) {
+        next.assignOffset(msg);
+    }
+
+    @Override
+    public void increaseOffset(MessageExtBrokerInner msg, short messageNum) {
+        next.increaseOffset(msg, messageNum);
     }
 
     @Override
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index ba9b22ae8b..8fec1bf7b0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -353,7 +353,7 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
     @Override
     public void truncateDirtyLogicFiles(long phyOffset) {
 
-        long oldMinOffset =  minOffsetInQueue;
+        long oldMinOffset = minOffsetInQueue;
         long oldMaxOffset = maxOffsetInQueue;
 
         int logicFileSize = this.mappedFileSize;
@@ -515,10 +515,10 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
     }
 
     @Override
-    public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, 
MessageExtBrokerInner msg, short messageNum) {
+    public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator, 
MessageExtBrokerInner msg) {
         String topicQueueKey = getTopic() + "-" + getQueueId();
 
-        long queueOffset = 
queueOffsetAssigner.assignBatchQueueOffset(topicQueueKey, messageNum);
+        long queueOffset = 
queueOffsetOperator.getBatchQueueOffset(topicQueueKey);
 
         if (MessageSysFlag.check(msg.getSysFlag(), 
MessageSysFlag.INNER_BATCH_FLAG)) {
             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE, 
String.valueOf(queueOffset));
@@ -527,7 +527,15 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
         msg.setQueueOffset(queueOffset);
     }
 
-    public boolean putBatchMessagePositionInfo(final long offset, final int 
size, final long tagsCode, final long storeTime,
+    @Override
+    public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, 
MessageExtBrokerInner msg,
+        short messageNum) {
+        String topicQueueKey = getTopic() + "-" + getQueueId();
+        queueOffsetOperator.increaseBatchQueueOffset(topicQueueKey, 
messageNum);
+    }
+
+    public boolean putBatchMessagePositionInfo(final long offset, final int 
size, final long tagsCode,
+        final long storeTime,
         final long msgBaseOffset, final short batchSize) {
 
         if (offset <= this.maxMsgPhyOffsetInCommitLog) {
@@ -696,6 +704,7 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
 
     /**
      * Find the message whose timestamp is the smallest, greater than or equal 
to the given time.
+     *
      * @param timestamp
      * @return
      */
@@ -794,8 +803,8 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
                 }
             } else {
                 //The max timestamp of this file is smaller than the given 
timestamp, so double check the previous file
-                if (i + 1 <=  mappedFileNum - 1) {
-                    mappedFile =  mappedFileQueue.getMappedFiles().get(i + 1);
+                if (i + 1 <= mappedFileNum - 1) {
+                    mappedFile = mappedFileQueue.getMappedFiles().get(i + 1);
                     targetBcq = mappedFile;
                     break;
                 } else {
@@ -812,7 +821,8 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
      * Find the offset of which the value is equal or larger than the given 
targetValue.
      * If there are many values equal to the target, then find the earliest 
one.
      */
-    public static int binarySearchRight(ByteBuffer byteBuffer, int left, int 
right, final int unitSize, final int unitShift,
+    public static int binarySearchRight(ByteBuffer byteBuffer, int left, int 
right, final int unitSize,
+        final int unitShift,
         long targetValue) {
         int mid = -1;
         while (left <= right) {
@@ -830,7 +840,7 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
                 if (tmpValue >= targetValue) {
                     return mid;
                 } else {
-                    left =  mid + unitSize;
+                    left = mid + unitSize;
                 }
             } else {
                 //mid is actually in the mid
@@ -846,7 +856,7 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
 
     /**
      * Here is vulnerable, the min value of the bytebuffer must be smaller or 
equal then the given value.
-     * Otherwise it may get -1
+     * Otherwise, it may get -1
      */
     protected int binarySearch(ByteBuffer byteBuffer, int left, int right, 
final int unitSize, final int unitShift,
         long targetValue) {
@@ -989,6 +999,7 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
 
     /**
      * Batch msg offset (deep logic offset)
+     *
      * @return max deep offset
      */
     @Override
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index 7931dc45a9..d7213fa37a 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -143,9 +143,17 @@ public interface ConsumeQueueInterface extends 
FileQueueLifeCycle {
      * Assign queue offset.
      * @param queueOffsetAssigner the delegated queue offset assigner
      * @param msg message itself
+     */
+    void assignQueueOffset(QueueOffsetOperator queueOffsetAssigner, 
MessageExtBrokerInner msg);
+
+
+    /**
+     * Increase queue offset.
+     * @param queueOffsetAssigner the delegated queue offset assigner
+     * @param msg message itself
      * @param messageNum message number
      */
-    void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, 
MessageExtBrokerInner msg, short messageNum);
+    void increaseQueueOffset(QueueOffsetOperator queueOffsetAssigner, 
MessageExtBrokerInner msg, short messageNum);
 
     /**
      * Estimate number of records matching given filter.
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 7d7878f129..8d38503b37 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -61,7 +61,7 @@ public class ConsumeQueueStore {
 
     protected final DefaultMessageStore messageStore;
     protected final MessageStoreConfig messageStoreConfig;
-    protected final QueueOffsetAssigner queueOffsetAssigner = new 
QueueOffsetAssigner();
+    protected final QueueOffsetOperator queueOffsetOperator = new 
QueueOffsetOperator();
     protected final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* 
queueId */, ConsumeQueueInterface>> consumeQueueTable;
 
     public ConsumeQueueStore(DefaultMessageStore messageStore, 
MessageStoreConfig messageStoreConfig) {
@@ -87,7 +87,7 @@ public class ConsumeQueueStore {
      * Apply the dispatched request and build the consume queue. This function 
should be idempotent.
      *
      * @param consumeQueue consume queue
-     * @param request      dispatch request
+     * @param request dispatch request
      */
     public void putMessagePositionInfoWrapper(ConsumeQueueInterface 
consumeQueue, DispatchRequest request) {
         consumeQueue.putMessagePositionInfoWrapper(request);
@@ -362,34 +362,39 @@ public class ConsumeQueueStore {
     }
 
     public Long getMaxOffset(String topic, int queueId) {
-        return this.queueOffsetAssigner.currentQueueOffset(topic + "-" + 
queueId);
+        return this.queueOffsetOperator.currentQueueOffset(topic + "-" + 
queueId);
     }
 
     public void setTopicQueueTable(ConcurrentMap<String, Long> 
topicQueueTable) {
-        this.queueOffsetAssigner.setTopicQueueTable(topicQueueTable);
-        this.queueOffsetAssigner.setLmqTopicQueueTable(topicQueueTable);
+        this.queueOffsetOperator.setTopicQueueTable(topicQueueTable);
+        this.queueOffsetOperator.setLmqTopicQueueTable(topicQueueTable);
     }
 
     public ConcurrentMap getTopicQueueTable() {
-        return this.queueOffsetAssigner.getTopicQueueTable();
+        return this.queueOffsetOperator.getTopicQueueTable();
     }
 
     public void setBatchTopicQueueTable(ConcurrentMap<String, Long> 
batchTopicQueueTable) {
-        this.queueOffsetAssigner.setBatchTopicQueueTable(batchTopicQueueTable);
+        this.queueOffsetOperator.setBatchTopicQueueTable(batchTopicQueueTable);
     }
 
-    public void assignQueueOffset(MessageExtBrokerInner msg, short messageNum) 
{
+    public void assignQueueOffset(MessageExtBrokerInner msg) {
         ConsumeQueueInterface consumeQueue = 
findOrCreateConsumeQueue(msg.getTopic(), msg.getQueueId());
-        consumeQueue.assignQueueOffset(this.queueOffsetAssigner, msg, 
messageNum);
+        consumeQueue.assignQueueOffset(this.queueOffsetOperator, msg);
+    }
+
+    public void increaseQueueOffset(MessageExtBrokerInner msg, short 
messageNum) {
+        ConsumeQueueInterface consumeQueue = 
findOrCreateConsumeQueue(msg.getTopic(), msg.getQueueId());
+        consumeQueue.increaseQueueOffset(this.queueOffsetOperator, msg, 
messageNum);
     }
 
     public void updateQueueOffset(String topic, int queueId, long offset) {
         String topicQueueKey = topic + "-" + queueId;
-        this.queueOffsetAssigner.updateQueueOffset(topicQueueKey, offset);
+        this.queueOffsetOperator.updateQueueOffset(topicQueueKey, offset);
     }
 
     public void removeTopicQueueTable(String topic, Integer queueId) {
-        this.queueOffsetAssigner.remove(topic, queueId);
+        this.queueOffsetOperator.remove(topic, queueId);
     }
 
     public ConcurrentMap<String, ConcurrentMap<Integer, 
ConsumeQueueInterface>> getConsumeQueueTable() {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
similarity index 69%
rename from 
store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
rename to 
store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
index fe8586f6dd..2545bbf523 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
@@ -28,47 +28,49 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
 /**
- * QueueOffsetAssigner is a component for assigning offsets for queues.
+ * QueueOffsetOperator is a component for operating offsets for queues.
  */
-public class QueueOffsetAssigner {
+public class QueueOffsetOperator {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
     private ConcurrentMap<String, Long> topicQueueTable = new 
ConcurrentHashMap<>(1024);
     private ConcurrentMap<String, Long> batchTopicQueueTable = new 
ConcurrentHashMap<>(1024);
     private ConcurrentMap<String/* topic-queueid */, Long/* offset */> 
lmqTopicQueueTable = new ConcurrentHashMap<>(1024);
 
-    public long assignQueueOffset(String topicQueueKey, short messageNum) {
+    public long getQueueOffset(String topicQueueKey) {
+        return ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, 
topicQueueKey, k -> 0L);
+    }
+
+    public void increaseQueueOffset(String topicQueueKey, short messageNum) {
         Long queueOffset = 
ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k 
-> 0L);
-        this.topicQueueTable.put(topicQueueKey, queueOffset + messageNum);
-        return queueOffset;
+        topicQueueTable.put(topicQueueKey, queueOffset + messageNum);
     }
 
     public void updateQueueOffset(String topicQueueKey, long offset) {
         this.topicQueueTable.put(topicQueueKey, offset);
     }
 
-    public long assignBatchQueueOffset(String topicQueueKey, short messageNum) 
{
-        Long topicOffset = 
ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, 
topicQueueKey, k -> 0L);
-        this.batchTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
-        return topicOffset;
+    public long getBatchQueueOffset(String topicQueueKey) {
+        return 
ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, 
topicQueueKey, k -> 0L);
     }
 
-    public long assignLmqOffset(String topicQueueKey, short messageNum) {
-        Long topicOffset = 
ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey, 
k -> 0L);
-        this.lmqTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
-        return topicOffset;
+    public void increaseBatchQueueOffset(String topicQueueKey, short 
messageNum) {
+        Long batchQueueOffset = 
ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, 
topicQueueKey, k -> 0L);
+        this.batchTopicQueueTable.put(topicQueueKey, batchQueueOffset + 
messageNum);
     }
 
-    public long currentQueueOffset(String topicQueueKey) {
-        return this.topicQueueTable.get(topicQueueKey);
+    public long getLmqOffset(String topicQueueKey) {
+        return ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, 
topicQueueKey, k -> 0L);
     }
 
-    public long currentBatchQueueOffset(String topicQueueKey) {
-        return this.batchTopicQueueTable.get(topicQueueKey);
+    public void increaseLmqOffset(String topicQueueKey, short messageNum) {
+        Long lmqOffset = 
ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey, 
k -> 0L);
+        this.lmqTopicQueueTable.put(topicQueueKey, lmqOffset + messageNum);
     }
 
-    public long currentLmqOffset(String topicQueueKey) {
-        return this.lmqTopicQueueTable.get(topicQueueKey);
+    public long currentQueueOffset(String topicQueueKey) {
+        Long currentQueueOffset = this.topicQueueTable.get(topicQueueKey);
+        return currentQueueOffset == null ? 0L : currentQueueOffset;
     }
 
     public synchronized void remove(String topic, Integer queueId) {
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java 
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 2f22de4d11..151bfa8f04 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -29,6 +29,7 @@ import java.net.UnknownHostException;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.OverlappingFileLockException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -63,6 +64,7 @@ import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -369,6 +371,17 @@ public class DefaultMessageStoreTest {
         assertThat(storeTime).isEqualTo(-1);
     }
 
+    @Test
+    public void testPutMessage_whenMessagePropertyIsTooLong() {
+        String topicName = "messagePropertyIsTooLongTest";
+        MessageExtBrokerInner illegalMessage = 
buildSpecifyLengthPropertyMessage("123".getBytes(StandardCharsets.UTF_8), 
topicName, Short.MAX_VALUE + 1);
+        
assertEquals(messageStore.putMessage(illegalMessage).getPutMessageStatus(), 
PutMessageStatus.PROPERTIES_SIZE_EXCEEDED);
+        assertEquals(0L, messageStore.getQueueStore().getMaxOffset(topicName, 
0).longValue());
+        MessageExtBrokerInner normalMessage = 
buildSpecifyLengthPropertyMessage("123".getBytes(StandardCharsets.UTF_8), 
topicName, 100);
+        
assertEquals(messageStore.putMessage(normalMessage).getPutMessageStatus(), 
PutMessageStatus.PUT_OK);
+        assertEquals(1L, messageStore.getQueueStore().getMaxOffset(topicName, 
0).longValue());
+    }
+
     private DefaultMessageStore getDefaultMessageStore() {
         return (DefaultMessageStore) this.messageStore;
     }
@@ -437,6 +450,26 @@ public class DefaultMessageStoreTest {
         return msg;
     }
 
+    private MessageExtBrokerInner buildSpecifyLengthPropertyMessage(byte[] 
messageBody, String topic, int length) {
+        StringBuilder stringBuilder = new StringBuilder();
+        Random random = new Random();
+        for (int i = 0; i < length; i++) {
+            stringBuilder.append(random.nextInt(10));
+        }
+        MessageExtBrokerInner msg = new MessageExtBrokerInner();
+        msg.putUserProperty("test", stringBuilder.toString());
+        msg.setTopic(topic);
+        msg.setTags("TAG1");
+        msg.setKeys("Hello");
+        msg.setBody(messageBody);
+        msg.setQueueId(0);
+        msg.setBornTimestamp(System.currentTimeMillis());
+        msg.setStoreHost(storeHost);
+        msg.setBornHost(bornHost);
+        
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+        return msg;
+    }
+
     private MessageExtBrokerInner buildIPv6HostMessage(byte[] messageBody, 
String topic) {
         MessageExtBrokerInner msg = new MessageExtBrokerInner();
         msg.setTopic(topic);
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java 
b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
index 3ae4b2be56..85626a332e 100644
--- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
@@ -79,7 +79,7 @@ public class MultiDispatchTest {
     @Test
     public void wrapMultiDispatch() {
         MessageExtBrokerInner messageExtBrokerInner = buildMessageMultiQueue();
-        messageStore.assignOffset(messageExtBrokerInner, (short) 1);
+        messageStore.assignOffset(messageExtBrokerInner);
         
assertEquals(messageExtBrokerInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET),
 "0,0");
     }
 
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
index b9cfdf9b65..5d86c10e45 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
@@ -45,7 +45,7 @@ public class GetBrokerConfigCommand implements SubCommand {
 
     @Override
     public String commandDesc() {
-        return "Get broker config by cluster or special broker!";
+        return "Get broker config by cluster or special broker";
     }
 
     @Override
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
index 3392ae1fb0..6095e76685 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
@@ -43,7 +43,7 @@ public class GetConsumerConfigSubCommand implements 
SubCommand {
 
     @Override
     public String commandDesc() {
-        return "Get consumer config by subscription group name!";
+        return "Get consumer config by subscription group name";
     }
 
     @Override


Reply via email to