This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 4f708bd9c76 Forget to update memory usage when invalid message (#16835)
4f708bd9c76 is described below

commit 4f708bd9c76b0ae8ade9679688f998e342b9465f
Author: ZhangJian He <[email protected]>
AuthorDate: Fri Jul 29 10:43:50 2022 +0800

    Forget to update memory usage when invalid message (#16835)
    
    release memory usage when invalid message.
    Only need to release memory usage here, no need to release semaphore. Both 
add testcases.
    
    coauthored by @pengxiangrui127.
    
    - add unit tests for this change
    
    Check the box below or label this PR directly.
    
    Need to update docs?
    
    - [x] `doc-not-needed`
    bug fix, no need doc
    
    (cherry picked from commit 57b008a411463bce4c26350177dae4346f7b84d2)
---
 .../client/impl/ProducerMemoryLimitTest.java       | 27 +++++++++++++++++++++-
 .../client/impl/BatchMessageContainerImpl.java     |  2 ++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index 77e3ee811a7..741262bafe2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -27,7 +27,7 @@ import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-
+import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
 
@@ -47,6 +47,31 @@ public class ProducerMemoryLimitTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @Test(timeOut = 10_000)
+    public void testProducerInvalidMessageMemoryRelease() throws Exception {
+        initClientWithMemoryLimit();
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
+                .topic("testProducerMemoryLimit")
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+                .batchingMaxBytes(10240)
+                .enableBatching(true)
+                .create();
+        this.stopBroker();
+        try {
+            Field field = ClientCnx.class.getDeclaredField("maxMessageSize");
+            field.setAccessible(true);
+            field.set(producer.getClientCnx(), 8);
+            producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
+            throw new IllegalStateException("can not reach here");
+        } catch (PulsarClientException.InvalidMessageException ex) {
+            PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+            final MemoryLimitController memoryLimitController = 
clientImpl.getMemoryLimitController();
+            Assert.assertEquals(memoryLimitController.currentUsage(), 0);
+        }
+    }
+
     @Test(timeOut = 10_000)
     public void testProducerTimeoutMemoryRelease() throws Exception {
         initClientWithMemoryLimit();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index e0ab2d942ca..b5acc2c30e9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -200,6 +200,8 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
         ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, 
getCompressedBatchMetadataAndPayload());
         if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
             producer.semaphoreRelease(messages.size());
+            messages.forEach(msg -> producer.client.getMemoryLimitController()
+                    .releaseMemory(msg.getUncompressedSize()));
             discard(new PulsarClientException.InvalidMessageException(
                     "Message size is bigger than " + 
ClientCnx.getMaxMessageSize() + " bytes"));
             return null;

Reply via email to