This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2620871584e [fix][client] Fix memory leak when message size exceeds
max message size and batching is enabled (#23967)
2620871584e is described below
commit 2620871584ef7c76f8f90fc6c8a341ecc52ecb61
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Feb 13 01:33:55 2025 +0200
[fix][client] Fix memory leak when message size exceeds max message size
and batching is enabled (#23967)
---
.../java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java | 2 ++
1 file changed, 2 insertions(+)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index a3cd84981fb..e112e1b866c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -273,6 +273,7 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
// handle mgs size check as non-batched in
`ProducerImpl.isMessageSizeExceeded`
if (op.getMessageHeaderAndPayloadSize() > getMaxMessageSize()) {
+ cmd.release();
producer.semaphoreRelease(1);
producer.client.getMemoryLimitController().releaseMemory(
messages.get(0).getUncompressedSize() +
batchAllocatedSizeBytes);
@@ -286,6 +287,7 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
getCompressedBatchMetadataAndPayload());
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
if (encryptedPayload.readableBytes() > getMaxMessageSize()) {
+ encryptedPayload.release();
producer.semaphoreRelease(messages.size());
messages.forEach(msg -> producer.client.getMemoryLimitController()
.releaseMemory(msg.getUncompressedSize()));