This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 59d1495fef4 [fix][client] Fix failed to close consumer because of the
error: param memorySize is a negative value (#25805)
59d1495fef4 is described below
commit 59d1495fef46075986a92065bd64229fe0f354ea
Author: fengyubiao <[email protected]>
AuthorDate: Mon May 25 20:30:23 2026 +0800
[fix][client] Fix failed to close consumer because of the error: param
memorySize is a negative value (#25805)
---
.../java/org/apache/pulsar/client/impl/ConsumerBase.java | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index fe9ec2d59c4..ddf0d1a219d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -991,13 +991,17 @@ public abstract class ConsumerBase<T> extends
HandlerState implements Consumer<T
// synchronize redeliverUnacknowledgedMessages().
incomingQueueLock.lock();
try {
- if (canEnqueueMessage(message) && incomingMessages.offer(message))
{
- // After we have enqueued the messages on `incomingMessages`
queue, we cannot touch the message
- // instance anymore, since for pooled messages, this instance
was possibly already been released
- // and recycled.
+ if (canEnqueueMessage(message)) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
- getMemoryLimitController().ifPresent(limiter ->
limiter.forceReserveMemory(messageSize));
- updateAutoScaleReceiverQueueHint();
+ if (incomingMessages.offer(message)) {
+ // After we have enqueued the messages on
`incomingMessages` queue, we cannot touch the message
+ // instance anymore, since for pooled messages, this
instance was possibly already been released
+ // and recycled.
+ getMemoryLimitController().ifPresent(limiter ->
limiter.forceReserveMemory(messageSize));
+ updateAutoScaleReceiverQueueHint();
+ } else {
+ INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this,
-messageSize);
+ }
}
} finally {
incomingQueueLock.unlock();