This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e3b8178871 [ISSUE #6570] Fix the issue that expectLogicOffset is
greater than currentLogicOffset in consumeQueue build when the message is
illegal (#6641)
e3b8178871 is described below
commit e3b8178871158e1a981915ac40c11e3a5e451922
Author: rongtong <[email protected]>
AuthorDate: Wed Apr 26 11:24:47 2023 +0800
[ISSUE #6570] Fix the issue that expectLogicOffset is greater than
currentLogicOffset in consumeQueue build when the message is illegal (#6641)
* Fix the issue that expectLogicOffset is greater than currentLogicOffset
in consumeQueue build when the message is illegal
* Add new UT
* Fix bug that UT can not pass
* Polish the variable name
* Polish the comment
* Add more comments
---
.../java/org/apache/rocketmq/store/CommitLog.java | 13 ++++-
.../org/apache/rocketmq/store/ConsumeQueue.java | 59 ++++++++++++++++------
.../apache/rocketmq/store/DefaultMessageStore.java | 14 ++++-
.../org/apache/rocketmq/store/MessageStore.java | 11 +++-
.../rocketmq/store/dledger/DLedgerCommitLog.java | 9 +++-
.../store/plugin/AbstractPluginMessageStore.java | 9 +++-
.../rocketmq/store/queue/BatchConsumeQueue.java | 29 +++++++----
.../store/queue/ConsumeQueueInterface.java | 10 +++-
.../rocketmq/store/queue/ConsumeQueueStore.java | 27 ++++++----
...ffsetAssigner.java => QueueOffsetOperator.java} | 40 ++++++++-------
.../rocketmq/store/DefaultMessageStoreTest.java | 33 ++++++++++++
.../apache/rocketmq/store/MultiDispatchTest.java | 2 +-
.../command/broker/GetBrokerConfigCommand.java | 2 +-
.../consumer/GetConsumerConfigSubCommand.java | 2 +-
14 files changed, 192 insertions(+), 68 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index d7e141d31c..75b4042dc3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -824,7 +824,7 @@ public class CommitLog implements Swappable {
needAssignOffset = false;
}
if (needAssignOffset) {
- defaultMessageStore.assignOffset(msg, getMessageNum(msg));
+ defaultMessageStore.assignOffset(msg);
}
PutMessageResult encodeResult =
putMessageThreadLocal.getEncoder().encode(msg);
@@ -892,6 +892,10 @@ public class CommitLog implements Swappable {
} finally {
putMessageLock.unlock();
}
+ // Increase queue offset when messages are successfully written
+ if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
+ this.defaultMessageStore.increaseOffset(msg,
getMessageNum(msg));
+ }
} finally {
topicQueueLock.unlock(topicQueueKey);
}
@@ -990,7 +994,7 @@ public class CommitLog implements Swappable {
topicQueueLock.lock(topicQueueKey);
try {
- defaultMessageStore.assignOffset(messageExtBatch, (short)
putMessageContext.getBatchSize());
+ defaultMessageStore.assignOffset(messageExtBatch);
putMessageLock.lock();
try {
@@ -1041,6 +1045,11 @@ public class CommitLog implements Swappable {
} finally {
putMessageLock.unlock();
}
+
+ // Increase queue offset when messages are successfully written
+ if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
+ this.defaultMessageStore.increaseOffset(messageExtBatch,
(short) putMessageContext.getBatchSize());
+ }
} finally {
topicQueueLock.unlock(topicQueueKey);
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index d1c24ee35f..78d083e2cb 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -38,7 +38,7 @@ import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
-import org.apache.rocketmq.store.queue.QueueOffsetAssigner;
+import org.apache.rocketmq.store.queue.QueueOffsetOperator;
import org.apache.rocketmq.store.queue.ReferredIterator;
public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle
{
@@ -54,8 +54,7 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
* │ Store Unit
│
* │
│
* </pre>
- * ConsumeQueue's store unit. Size:
- * CommitLog Physical Offset(8) + Body Size(4) + Tag HashCode(8) = 20 Bytes
+ * ConsumeQueue's store unit. Size: CommitLog Physical Offset(8) + Body
Size(4) + Tag HashCode(8) = 20 Bytes
*/
public static final int CQ_STORE_UNIT_SIZE = 20;
public static final int MSG_TAG_OFFSET_INDEX = 12;
@@ -785,13 +784,15 @@ public class ConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCycle {
}
@Override
- public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner,
MessageExtBrokerInner msg,
- short messageNum) {
+ public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator,
MessageExtBrokerInner msg) {
String topicQueueKey = getTopic() + "-" + getQueueId();
- long queueOffset =
queueOffsetAssigner.assignQueueOffset(topicQueueKey, messageNum);
+ long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey);
msg.setQueueOffset(queueOffset);
- // For LMQ
- if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch() ||
msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+
+
+ // Handling the multi dispatch message. In the context of a light
message queue (as defined in RIP-28),
+ // light message queues are constructed based on message properties,
which requires special handling of queue offset of the light message queue.
+ if (!isNeedHandleMultiDispatch(msg)) {
return;
}
String multiDispatchQueue =
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
@@ -803,7 +804,7 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
for (int i = 0; i < queues.length; i++) {
String key = queueKey(queues[i], msg);
if (messageStore.getMessageStoreConfig().isEnableLmq() &&
MixAll.isLmq(key)) {
- queueOffsets[i] = queueOffsetAssigner.assignLmqOffset(key,
(short) 1);
+ queueOffsets[i] = queueOffsetOperator.getLmqOffset(key);
}
}
MessageAccessor.putProperty(msg,
MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
@@ -811,6 +812,34 @@ public class ConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCycle {
removeWaitStorePropertyString(msg);
}
+ @Override
+ public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator,
MessageExtBrokerInner msg,
+ short messageNum) {
+ String topicQueueKey = getTopic() + "-" + getQueueId();
+ queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
+
+ // Handling the multi dispatch message. In the context of a light
message queue (as defined in RIP-28),
+ // light message queues are constructed based on message properties,
which requires special handling of queue offset of the light message queue.
+ if (!isNeedHandleMultiDispatch(msg)) {
+ return;
+ }
+ String multiDispatchQueue =
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+ if (StringUtils.isBlank(multiDispatchQueue)) {
+ return;
+ }
+ String[] queues =
multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+ for (int i = 0; i < queues.length; i++) {
+ String key = queueKey(queues[i], msg);
+ if (messageStore.getMessageStoreConfig().isEnableLmq() &&
MixAll.isLmq(key)) {
+ queueOffsetOperator.increaseLmqOffset(key, (short) 1);
+ }
+ }
+ }
+
+ public boolean isNeedHandleMultiDispatch(MessageExtBrokerInner msg) {
+ return messageStore.getMessageStoreConfig().isEnableMultiDispatch() &&
!msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
+ }
+
public String queueKey(String queueName, MessageExtBrokerInner msgInner) {
StringBuilder keyBuilder = new StringBuilder();
keyBuilder.append(queueName);
@@ -968,7 +997,7 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
private int relativePos = 0;
public ConsumeQueueIterator(SelectMappedBufferResult sbr) {
- this.sbr = sbr;
+ this.sbr = sbr;
if (sbr != null && sbr.getByteBuffer() != null) {
relativePos = sbr.getByteBuffer().position();
}
@@ -988,11 +1017,11 @@ public class ConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCycle {
if (!hasNext()) {
return null;
}
- long queueOffset = (sbr.getStartOffset() +
sbr.getByteBuffer().position() - relativePos) / CQ_STORE_UNIT_SIZE;
+ long queueOffset = (sbr.getStartOffset() +
sbr.getByteBuffer().position() - relativePos) / CQ_STORE_UNIT_SIZE;
CqUnit cqUnit = new CqUnit(queueOffset,
- sbr.getByteBuffer().getLong(),
- sbr.getByteBuffer().getInt(),
- sbr.getByteBuffer().getLong());
+ sbr.getByteBuffer().getLong(),
+ sbr.getByteBuffer().getInt(),
+ sbr.getByteBuffer().getLong());
if (isExtAddr(cqUnit.getTagsCode())) {
ConsumeQueueExt.CqExtUnit cqExtUnit = new
ConsumeQueueExt.CqExtUnit();
@@ -1003,7 +1032,7 @@ public class ConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCycle {
} else {
// can't find ext content.Client will filter messages by
tag also.
log.error("[BUG] can't find consume queue extend file
content! addr={}, offsetPy={}, sizePy={}, topic={}",
- cqUnit.getTagsCode(), cqUnit.getPos(),
cqUnit.getPos(), getTopic());
+ cqUnit.getTagsCode(), cqUnit.getPos(),
cqUnit.getPos(), getTopic());
}
}
return cqUnit;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 434aca5430..e1bdc6e711 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2053,11 +2053,21 @@ public class DefaultMessageStore implements
MessageStore {
}
@Override
- public void assignOffset(MessageExtBrokerInner msg, short messageNum) {
+ public void assignOffset(MessageExtBrokerInner msg) {
final int tranType =
MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType ==
MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
- this.consumeQueueStore.assignQueueOffset(msg, messageNum);
+ this.consumeQueueStore.assignQueueOffset(msg);
+ }
+ }
+
+
+ @Override
+ public void increaseOffset(MessageExtBrokerInner msg, short messageNum) {
+ final int tranType =
MessageSysFlag.getTransactionValue(msg.getSysFlag());
+
+ if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType ==
MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
+ this.consumeQueueStore.increaseQueueOffset(msg, messageNum);
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index a7da245551..3db0c18f7f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -725,13 +725,20 @@ public interface MessageStore {
boolean isSyncMaster();
/**
- * Assign an queue offset and increase it. If there is a race condition,
you need to lock/unlock this method
+ * Assign a message to queue offset. If there is a race condition, you
need to lock/unlock this method
* yourself.
*
* @param msg message
+ */
+ void assignOffset(MessageExtBrokerInner msg);
+
+ /**
+ * Increase queue offset in memory table. If there is a race condition,
you need to lock/unlock this method
+ *
+ * @param msg message
* @param messageNum message num
*/
- void assignOffset(MessageExtBrokerInner msg, short messageNum);
+ void increaseOffset(MessageExtBrokerInner msg, short messageNum);
/**
* Get master broker message store in process in broker container
diff --git
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 39906eae09..ec5e86d704 100644
---
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -435,7 +435,7 @@ public class DLedgerCommitLog extends CommitLog {
String topicQueueKey = msg.getTopic() + "-" + msg.getQueueId();
topicQueueLock.lock(topicQueueKey);
try {
- defaultMessageStore.assignOffset(msg, getMessageNum(msg));
+ defaultMessageStore.assignOffset(msg);
encodeResult = this.messageSerializer.serialize(msg);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
@@ -475,6 +475,8 @@ public class DLedgerCommitLog extends CommitLog {
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={},
bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length,
appendResult);
}
+
+ defaultMessageStore.increaseOffset(msg, getMessageNum(msg));
} finally {
topicQueueLock.unlock(topicQueueKey);
}
@@ -556,7 +558,7 @@ public class DLedgerCommitLog extends CommitLog {
int batchNum = encodeResult.batchData.size();
topicQueueLock.lock(encodeResult.queueOffsetKey);
try {
- defaultMessageStore.assignOffset(messageExtBatch, (short)
batchNum);
+ defaultMessageStore.assignOffset(messageExtBatch);
putMessageLock.lock(); //spin or ReentrantLock ,depending on store
config
msgIdBuilder.setLength(0);
@@ -616,6 +618,9 @@ public class DLedgerCommitLog extends CommitLog {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={},
bodyLength={} AppendMessageResult={}",
elapsedTimeInLock, messageExtBatch.getBody().length,
appendResult);
}
+
+ defaultMessageStore.increaseOffset(messageExtBatch, (short)
batchNum);
+
} finally {
topicQueueLock.unlock(encodeResult.queueOffsetKey);
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
index 89c3e53b6b..25e947512f 100644
---
a/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java
@@ -588,8 +588,13 @@ public abstract class AbstractPluginMessageStore
implements MessageStore {
}
@Override
- public void assignOffset(MessageExtBrokerInner msg, short messageNum) {
- next.assignOffset(msg, messageNum);
+ public void assignOffset(MessageExtBrokerInner msg) {
+ next.assignOffset(msg);
+ }
+
+ @Override
+ public void increaseOffset(MessageExtBrokerInner msg, short messageNum) {
+ next.increaseOffset(msg, messageNum);
}
@Override
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index ba9b22ae8b..8fec1bf7b0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -353,7 +353,7 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
@Override
public void truncateDirtyLogicFiles(long phyOffset) {
- long oldMinOffset = minOffsetInQueue;
+ long oldMinOffset = minOffsetInQueue;
long oldMaxOffset = maxOffsetInQueue;
int logicFileSize = this.mappedFileSize;
@@ -515,10 +515,10 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
}
@Override
- public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner,
MessageExtBrokerInner msg, short messageNum) {
+ public void assignQueueOffset(QueueOffsetOperator queueOffsetOperator,
MessageExtBrokerInner msg) {
String topicQueueKey = getTopic() + "-" + getQueueId();
- long queueOffset =
queueOffsetAssigner.assignBatchQueueOffset(topicQueueKey, messageNum);
+ long queueOffset =
queueOffsetOperator.getBatchQueueOffset(topicQueueKey);
if (MessageSysFlag.check(msg.getSysFlag(),
MessageSysFlag.INNER_BATCH_FLAG)) {
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE,
String.valueOf(queueOffset));
@@ -527,7 +527,15 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
msg.setQueueOffset(queueOffset);
}
- public boolean putBatchMessagePositionInfo(final long offset, final int
size, final long tagsCode, final long storeTime,
+ @Override
+ public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator,
MessageExtBrokerInner msg,
+ short messageNum) {
+ String topicQueueKey = getTopic() + "-" + getQueueId();
+ queueOffsetOperator.increaseBatchQueueOffset(topicQueueKey,
messageNum);
+ }
+
+ public boolean putBatchMessagePositionInfo(final long offset, final int
size, final long tagsCode,
+ final long storeTime,
final long msgBaseOffset, final short batchSize) {
if (offset <= this.maxMsgPhyOffsetInCommitLog) {
@@ -696,6 +704,7 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
/**
* Find the message whose timestamp is the smallest, greater than or equal
to the given time.
+ *
* @param timestamp
* @return
*/
@@ -794,8 +803,8 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
}
} else {
//The max timestamp of this file is smaller than the given
timestamp, so double check the previous file
- if (i + 1 <= mappedFileNum - 1) {
- mappedFile = mappedFileQueue.getMappedFiles().get(i + 1);
+ if (i + 1 <= mappedFileNum - 1) {
+ mappedFile = mappedFileQueue.getMappedFiles().get(i + 1);
targetBcq = mappedFile;
break;
} else {
@@ -812,7 +821,8 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
* Find the offset of which the value is equal or larger than the given
targetValue.
* If there are many values equal to the target, then find the earliest
one.
*/
- public static int binarySearchRight(ByteBuffer byteBuffer, int left, int
right, final int unitSize, final int unitShift,
+ public static int binarySearchRight(ByteBuffer byteBuffer, int left, int
right, final int unitSize,
+ final int unitShift,
long targetValue) {
int mid = -1;
while (left <= right) {
@@ -830,7 +840,7 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
if (tmpValue >= targetValue) {
return mid;
} else {
- left = mid + unitSize;
+ left = mid + unitSize;
}
} else {
//mid is actually in the mid
@@ -846,7 +856,7 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
/**
* Here is vulnerable, the min value of the bytebuffer must be smaller or
equal then the given value.
- * Otherwise it may get -1
+ * Otherwise, it may get -1
*/
protected int binarySearch(ByteBuffer byteBuffer, int left, int right,
final int unitSize, final int unitShift,
long targetValue) {
@@ -989,6 +999,7 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
/**
* Batch msg offset (deep logic offset)
+ *
* @return max deep offset
*/
@Override
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index 7931dc45a9..d7213fa37a 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -143,9 +143,17 @@ public interface ConsumeQueueInterface extends
FileQueueLifeCycle {
* Assign queue offset.
* @param queueOffsetAssigner the delegated queue offset assigner
* @param msg message itself
+ */
+ void assignQueueOffset(QueueOffsetOperator queueOffsetAssigner,
MessageExtBrokerInner msg);
+
+
+ /**
+ * Increase queue offset.
+ * @param queueOffsetAssigner the delegated queue offset assigner
+ * @param msg message itself
* @param messageNum message number
*/
- void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner,
MessageExtBrokerInner msg, short messageNum);
+ void increaseQueueOffset(QueueOffsetOperator queueOffsetAssigner,
MessageExtBrokerInner msg, short messageNum);
/**
* Estimate number of records matching given filter.
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 7d7878f129..8d38503b37 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -61,7 +61,7 @@ public class ConsumeQueueStore {
protected final DefaultMessageStore messageStore;
protected final MessageStoreConfig messageStoreConfig;
- protected final QueueOffsetAssigner queueOffsetAssigner = new
QueueOffsetAssigner();
+ protected final QueueOffsetOperator queueOffsetOperator = new
QueueOffsetOperator();
protected final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/*
queueId */, ConsumeQueueInterface>> consumeQueueTable;
public ConsumeQueueStore(DefaultMessageStore messageStore,
MessageStoreConfig messageStoreConfig) {
@@ -87,7 +87,7 @@ public class ConsumeQueueStore {
* Apply the dispatched request and build the consume queue. This function
should be idempotent.
*
* @param consumeQueue consume queue
- * @param request dispatch request
+ * @param request dispatch request
*/
public void putMessagePositionInfoWrapper(ConsumeQueueInterface
consumeQueue, DispatchRequest request) {
consumeQueue.putMessagePositionInfoWrapper(request);
@@ -362,34 +362,39 @@ public class ConsumeQueueStore {
}
public Long getMaxOffset(String topic, int queueId) {
- return this.queueOffsetAssigner.currentQueueOffset(topic + "-" +
queueId);
+ return this.queueOffsetOperator.currentQueueOffset(topic + "-" +
queueId);
}
public void setTopicQueueTable(ConcurrentMap<String, Long>
topicQueueTable) {
- this.queueOffsetAssigner.setTopicQueueTable(topicQueueTable);
- this.queueOffsetAssigner.setLmqTopicQueueTable(topicQueueTable);
+ this.queueOffsetOperator.setTopicQueueTable(topicQueueTable);
+ this.queueOffsetOperator.setLmqTopicQueueTable(topicQueueTable);
}
public ConcurrentMap getTopicQueueTable() {
- return this.queueOffsetAssigner.getTopicQueueTable();
+ return this.queueOffsetOperator.getTopicQueueTable();
}
public void setBatchTopicQueueTable(ConcurrentMap<String, Long>
batchTopicQueueTable) {
- this.queueOffsetAssigner.setBatchTopicQueueTable(batchTopicQueueTable);
+ this.queueOffsetOperator.setBatchTopicQueueTable(batchTopicQueueTable);
}
- public void assignQueueOffset(MessageExtBrokerInner msg, short messageNum)
{
+ public void assignQueueOffset(MessageExtBrokerInner msg) {
ConsumeQueueInterface consumeQueue =
findOrCreateConsumeQueue(msg.getTopic(), msg.getQueueId());
- consumeQueue.assignQueueOffset(this.queueOffsetAssigner, msg,
messageNum);
+ consumeQueue.assignQueueOffset(this.queueOffsetOperator, msg);
+ }
+
+ public void increaseQueueOffset(MessageExtBrokerInner msg, short
messageNum) {
+ ConsumeQueueInterface consumeQueue =
findOrCreateConsumeQueue(msg.getTopic(), msg.getQueueId());
+ consumeQueue.increaseQueueOffset(this.queueOffsetOperator, msg,
messageNum);
}
public void updateQueueOffset(String topic, int queueId, long offset) {
String topicQueueKey = topic + "-" + queueId;
- this.queueOffsetAssigner.updateQueueOffset(topicQueueKey, offset);
+ this.queueOffsetOperator.updateQueueOffset(topicQueueKey, offset);
}
public void removeTopicQueueTable(String topic, Integer queueId) {
- this.queueOffsetAssigner.remove(topic, queueId);
+ this.queueOffsetOperator.remove(topic, queueId);
}
public ConcurrentMap<String, ConcurrentMap<Integer,
ConsumeQueueInterface>> getConsumeQueueTable() {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
similarity index 69%
rename from
store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
rename to
store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
index fe8586f6dd..2545bbf523 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
@@ -28,47 +28,49 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
/**
- * QueueOffsetAssigner is a component for assigning offsets for queues.
+ * QueueOffsetOperator is a component for operating offsets for queues.
*/
-public class QueueOffsetAssigner {
+public class QueueOffsetOperator {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private ConcurrentMap<String, Long> topicQueueTable = new
ConcurrentHashMap<>(1024);
private ConcurrentMap<String, Long> batchTopicQueueTable = new
ConcurrentHashMap<>(1024);
private ConcurrentMap<String/* topic-queueid */, Long/* offset */>
lmqTopicQueueTable = new ConcurrentHashMap<>(1024);
- public long assignQueueOffset(String topicQueueKey, short messageNum) {
+ public long getQueueOffset(String topicQueueKey) {
+ return ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable,
topicQueueKey, k -> 0L);
+ }
+
+ public void increaseQueueOffset(String topicQueueKey, short messageNum) {
Long queueOffset =
ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k
-> 0L);
- this.topicQueueTable.put(topicQueueKey, queueOffset + messageNum);
- return queueOffset;
+ topicQueueTable.put(topicQueueKey, queueOffset + messageNum);
}
public void updateQueueOffset(String topicQueueKey, long offset) {
this.topicQueueTable.put(topicQueueKey, offset);
}
- public long assignBatchQueueOffset(String topicQueueKey, short messageNum)
{
- Long topicOffset =
ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable,
topicQueueKey, k -> 0L);
- this.batchTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
- return topicOffset;
+ public long getBatchQueueOffset(String topicQueueKey) {
+ return
ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable,
topicQueueKey, k -> 0L);
}
- public long assignLmqOffset(String topicQueueKey, short messageNum) {
- Long topicOffset =
ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey,
k -> 0L);
- this.lmqTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
- return topicOffset;
+ public void increaseBatchQueueOffset(String topicQueueKey, short
messageNum) {
+ Long batchQueueOffset =
ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable,
topicQueueKey, k -> 0L);
+ this.batchTopicQueueTable.put(topicQueueKey, batchQueueOffset +
messageNum);
}
- public long currentQueueOffset(String topicQueueKey) {
- return this.topicQueueTable.get(topicQueueKey);
+ public long getLmqOffset(String topicQueueKey) {
+ return ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable,
topicQueueKey, k -> 0L);
}
- public long currentBatchQueueOffset(String topicQueueKey) {
- return this.batchTopicQueueTable.get(topicQueueKey);
+ public void increaseLmqOffset(String topicQueueKey, short messageNum) {
+ Long lmqOffset =
ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey,
k -> 0L);
+ this.lmqTopicQueueTable.put(topicQueueKey, lmqOffset + messageNum);
}
- public long currentLmqOffset(String topicQueueKey) {
- return this.lmqTopicQueueTable.get(topicQueueKey);
+ public long currentQueueOffset(String topicQueueKey) {
+ Long currentQueueOffset = this.topicQueueTable.get(topicQueueKey);
+ return currentQueueOffset == null ? 0L : currentQueueOffset;
}
public synchronized void remove(String topic, Integer queueId) {
diff --git
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 2f22de4d11..151bfa8f04 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -29,6 +29,7 @@ import java.net.UnknownHostException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@@ -63,6 +64,7 @@ import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@RunWith(MockitoJUnitRunner.class)
@@ -369,6 +371,17 @@ public class DefaultMessageStoreTest {
assertThat(storeTime).isEqualTo(-1);
}
+ @Test
+ public void testPutMessage_whenMessagePropertyIsTooLong() {
+ String topicName = "messagePropertyIsTooLongTest";
+ MessageExtBrokerInner illegalMessage =
buildSpecifyLengthPropertyMessage("123".getBytes(StandardCharsets.UTF_8),
topicName, Short.MAX_VALUE + 1);
+
assertEquals(messageStore.putMessage(illegalMessage).getPutMessageStatus(),
PutMessageStatus.PROPERTIES_SIZE_EXCEEDED);
+ assertEquals(0L, messageStore.getQueueStore().getMaxOffset(topicName,
0).longValue());
+ MessageExtBrokerInner normalMessage =
buildSpecifyLengthPropertyMessage("123".getBytes(StandardCharsets.UTF_8),
topicName, 100);
+
assertEquals(messageStore.putMessage(normalMessage).getPutMessageStatus(),
PutMessageStatus.PUT_OK);
+ assertEquals(1L, messageStore.getQueueStore().getMaxOffset(topicName,
0).longValue());
+ }
+
private DefaultMessageStore getDefaultMessageStore() {
return (DefaultMessageStore) this.messageStore;
}
@@ -437,6 +450,26 @@ public class DefaultMessageStoreTest {
return msg;
}
+ private MessageExtBrokerInner buildSpecifyLengthPropertyMessage(byte[]
messageBody, String topic, int length) {
+ StringBuilder stringBuilder = new StringBuilder();
+ Random random = new Random();
+ for (int i = 0; i < length; i++) {
+ stringBuilder.append(random.nextInt(10));
+ }
+ MessageExtBrokerInner msg = new MessageExtBrokerInner();
+ msg.putUserProperty("test", stringBuilder.toString());
+ msg.setTopic(topic);
+ msg.setTags("TAG1");
+ msg.setKeys("Hello");
+ msg.setBody(messageBody);
+ msg.setQueueId(0);
+ msg.setBornTimestamp(System.currentTimeMillis());
+ msg.setStoreHost(storeHost);
+ msg.setBornHost(bornHost);
+
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+ return msg;
+ }
+
private MessageExtBrokerInner buildIPv6HostMessage(byte[] messageBody,
String topic) {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic(topic);
diff --git
a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
index 3ae4b2be56..85626a332e 100644
--- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
@@ -79,7 +79,7 @@ public class MultiDispatchTest {
@Test
public void wrapMultiDispatch() {
MessageExtBrokerInner messageExtBrokerInner = buildMessageMultiQueue();
- messageStore.assignOffset(messageExtBrokerInner, (short) 1);
+ messageStore.assignOffset(messageExtBrokerInner);
assertEquals(messageExtBrokerInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET),
"0,0");
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
index b9cfdf9b65..5d86c10e45 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
@@ -45,7 +45,7 @@ public class GetBrokerConfigCommand implements SubCommand {
@Override
public String commandDesc() {
- return "Get broker config by cluster or special broker!";
+ return "Get broker config by cluster or special broker";
}
@Override
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
index 3392ae1fb0..6095e76685 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommand.java
@@ -43,7 +43,7 @@ public class GetConsumerConfigSubCommand implements
SubCommand {
@Override
public String commandDesc() {
- return "Get consumer config by subscription group name!";
+ return "Get consumer config by subscription group name";
}
@Override