This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 7e915efe2a5 [fix][client] Fix memory leak when message size exceeds 
max message size and batching is enabled (#23967)
7e915efe2a5 is described below

commit 7e915efe2a5bd2b6d0da21e34fa49b5d60276895
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Feb 13 01:33:55 2025 +0200

    [fix][client] Fix memory leak when message size exceeds max message size 
and batching is enabled (#23967)
---
 .../java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java   | 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 6d7827c87ec..2011e22581c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -273,6 +273,7 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
 
             // handle mgs size check as non-batched in 
`ProducerImpl.isMessageSizeExceeded`
             if (op.getMessageHeaderAndPayloadSize() > getMaxMessageSize()) {
+                cmd.release();
                 producer.semaphoreRelease(1);
                 producer.client.getMemoryLimitController().releaseMemory(
                         messages.get(0).getUncompressedSize() + 
batchAllocatedSizeBytes);
@@ -286,6 +287,7 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
         ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, 
getCompressedBatchMetadataAndPayload());
         updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
         if (encryptedPayload.readableBytes() > getMaxMessageSize()) {
+            encryptedPayload.release();
             producer.semaphoreRelease(messages.size());
             messages.forEach(msg -> producer.client.getMemoryLimitController()
                     .releaseMemory(msg.getUncompressedSize()));

Reply via email to