This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/4.9.x by this push:
new fc6c899abc [ISSUE #7538] fix wrong cachedMsgSize if msg body is
changed in consumer callback (#7820)
fc6c899abc is described below
commit fc6c899abc8262251c7838af92e7c5d4267bcf87
Author: yuz10 <[email protected]>
AuthorDate: Tue Feb 6 09:04:56 2024 +0800
[ISSUE #7538] fix wrong cachedMsgSize if msg body is changed in consumer
callback (#7820)
---
.../rocketmq/client/impl/consumer/ProcessQueue.java | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index ba00aaef99..e0a3cd5ad2 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -202,10 +202,12 @@ public class ProcessQueue {
MessageExt prev =
msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
- msgSize.addAndGet(0 - msg.getBody().length);
+ msgSize.addAndGet(-msg.getBody().length);
}
}
- msgCount.addAndGet(removedCnt);
+ if (msgCount.addAndGet(removedCnt) == 0) {
+ msgSize.set(0);
+ }
if (!msgTreeMap.isEmpty()) {
result = msgTreeMap.firstKey();
@@ -268,9 +270,12 @@ public class ProcessQueue {
this.treeMapLock.writeLock().lockInterruptibly();
try {
Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
- msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
- for (MessageExt msg :
this.consumingMsgOrderlyTreeMap.values()) {
- msgSize.addAndGet(0 - msg.getBody().length);
+ if
(msgCount.addAndGet(-this.consumingMsgOrderlyTreeMap.size()) == 0) {
+ msgSize.set(0);
+ } else {
+ for (MessageExt msg :
this.consumingMsgOrderlyTreeMap.values()) {
+ msgSize.addAndGet(-msg.getBody().length);
+ }
}
this.consumingMsgOrderlyTreeMap.clear();
if (offset != null) {
@@ -409,8 +414,8 @@ public class ProcessQueue {
info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
info.setCachedMsgCount(this.msgTreeMap.size());
- info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 *
1024)));
}
+ info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 *
1024)));
if (!this.consumingMsgOrderlyTreeMap.isEmpty()) {
info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey());