This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 368d12516 [ISSUE #5162] Fix bug about DefaultMessageStore
maxFilterMessageCount calculating (#5171)
368d12516 is described below
commit 368d12516d102fa0161e2f2c4248c73d2d221024
Author: Nocturne <[email protected]>
AuthorDate: Thu Sep 29 21:03:10 2022 +0800
[ISSUE #5162] Fix bug about DefaultMessageStore maxFilterMessageCount
calculating (#5171)
* fix bug for comparing size with count
* add getUnitSize function
---
store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java | 5 +++++
.../main/java/org/apache/rocketmq/store/DefaultMessageStore.java | 4 ++--
.../java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java | 5 +++++
.../java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java | 6 ++++++
4 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index a40cbcd14..59231dcf4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -184,6 +184,11 @@ public class ConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCycle {
return totalSize;
}
+ @Override
+ public int getUnitSize() {
+ return CQ_STORE_UNIT_SIZE;
+ }
+
@Override
public long getOffsetInQueueByTime(final long timestamp) {
MappedFile mappedFile =
this.mappedFileQueue.getMappedFileByTime(timestamp);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 72e6abb25..628444331 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -729,7 +729,7 @@ public class DefaultMessageStore implements MessageStore {
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
} else {
- final int maxFilterMessageCount = Math.max(16000, maxMsgNums *
ConsumeQueue.CQ_STORE_UNIT_SIZE);
+ final int maxFilterMessageSize = Math.max(16000, maxMsgNums *
consumeQueue.getUnitSize());
final boolean diskFallRecorded =
this.messageStoreConfig.isDiskFallRecorded();
long maxPullSize = Math.max(maxTotalMsgSize, 100);
@@ -764,7 +764,7 @@ public class DefaultMessageStore implements MessageStore {
boolean isInDisk =
checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
- if (cqUnit.getQueueOffset() - offset >
maxFilterMessageCount) {
+ if ((cqUnit.getQueueOffset() - offset) *
consumeQueue.getUnitSize() > maxFilterMessageSize) {
break;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 4cd0088b1..99bfa552c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -926,6 +926,11 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCy
return this.mappedFileQueue.getTotalFileSize();
}
+ @Override
+ public int getUnitSize() {
+ return CQ_STORE_UNIT_SIZE;
+ }
+
@Override
public void destroy() {
this.maxMsgPhyOffsetInCommitLog = -1;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index c45592542..f36dda094 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -120,6 +120,12 @@ public interface ConsumeQueueInterface {
*/
long getTotalSize();
+ /**
+ * Get the unit size of this CQ which is different in different CQ impl
+ * @return cq unit size
+ */
+ int getUnitSize();
+
/**
* Correct min offset by min commit log offset.
* @param minCommitLogOffset min commit log offset