This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c833ff6e9a [ISSUE #7538] fix wrong cachedMsgSize if msg body is
changed in consumer callback
c833ff6e9a is described below
commit c833ff6e9a022704b86bee90729c6710bb5c37a6
Author: yuz10 <[email protected]>
AuthorDate: Fri Feb 2 12:34:13 2024 +0800
[ISSUE #7538] fix wrong cachedMsgSize if msg body is changed in consumer
callback
---
.../rocketmq/client/impl/consumer/ProcessQueue.java | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index ebc208a8d8..33e698b00c 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -198,10 +198,12 @@ public class ProcessQueue {
MessageExt prev =
msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
- msgSize.addAndGet(0 - msg.getBody().length);
+ msgSize.addAndGet(-msg.getBody().length);
}
}
- msgCount.addAndGet(removedCnt);
+ if (msgCount.addAndGet(removedCnt) == 0) {
+ msgSize.set(0);
+ }
if (!msgTreeMap.isEmpty()) {
result = msgTreeMap.firstKey();
@@ -264,9 +266,12 @@ public class ProcessQueue {
this.treeMapLock.writeLock().lockInterruptibly();
try {
Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
- msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
- for (MessageExt msg :
this.consumingMsgOrderlyTreeMap.values()) {
- msgSize.addAndGet(0 - msg.getBody().length);
+ if
(msgCount.addAndGet(-this.consumingMsgOrderlyTreeMap.size()) == 0) {
+ msgSize.set(0);
+ } else {
+ for (MessageExt msg :
this.consumingMsgOrderlyTreeMap.values()) {
+ msgSize.addAndGet(-msg.getBody().length);
+ }
}
this.consumingMsgOrderlyTreeMap.clear();
if (offset != null) {
@@ -426,8 +431,8 @@ public class ProcessQueue {
info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
info.setCachedMsgCount(this.msgTreeMap.size());
- info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 *
1024)));
}
+ info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 *
1024)));
if (!this.consumingMsgOrderlyTreeMap.isEmpty()) {
info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey());