This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 99d06b94fa7 [fix][client] Fix producer thread block forever on memory 
limit controller (#21790)
99d06b94fa7 is described below

commit 99d06b94fa715b3f1062c4a3f616d5cc725e47a4
Author: wenbingshen <[email protected]>
AuthorDate: Tue Dec 26 19:10:59 2023 +0800

    [fix][client] Fix producer thread block forever on memory limit controller 
(#21790)
---
 .../client/impl/ProducerMemoryLimitTest.java       | 42 ++++++++++++++++++++--
 .../client/impl/BatchMessageContainerImpl.java     |  6 ++--
 2 files changed, 43 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index 3ec784e248c..d776fdb0ed9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -23,7 +23,12 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import io.netty.buffer.ByteBufAllocator;
 import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -35,9 +40,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-
 @Test(groups = "broker-impl")
 public class ProducerMemoryLimitTest extends ProducerConsumerBase {
 
@@ -191,6 +193,40 @@ public class ProducerMemoryLimitTest extends 
ProducerConsumerBase {
         Assert.assertEquals(memoryLimitController.currentUsage(), 0);
     }
 
+    @Test(timeOut = 10_000)
+    public void testProducerBlockReserveMemory() throws Exception {
+        replacePulsarClient(PulsarClient.builder().
+                serviceUrl(lookupUrl.toString())
+                .memoryLimit(1, SizeUnit.KILO_BYTES));
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
+                .topic("testProducerMemoryLimit")
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .compressionType(CompressionType.SNAPPY)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .maxPendingMessages(0)
+                .blockIfQueueFull(true)
+                .enableBatching(true)
+                .batchingMaxMessages(100)
+                .batchingMaxBytes(65536)
+                .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+                .create();
+        int msgCount = 5;
+        CountDownLatch cdl = new CountDownLatch(msgCount);
+        for (int i = 0; i < msgCount; i++) {
+            
producer.sendAsync("memory-test".getBytes(StandardCharsets.UTF_8)).whenComplete(((messageId,
 throwable) -> {
+                cdl.countDown();
+            }));
+        }
+
+        cdl.await();
+
+        producer.close();
+        PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+        final MemoryLimitController memoryLimitController = 
clientImpl.getMemoryLimitController();
+        Assert.assertEquals(memoryLimitController.currentUsage(), 0);
+    }
+
     private void initClientWithMemoryLimit() throws PulsarClientException {
         replacePulsarClient(PulsarClient.builder().
                 serviceUrl(lookupUrl.toString())
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 9be7210a387..dfcbc42bcc6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -324,9 +324,11 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
     protected void updateAndReserveBatchAllocatedSize(int updatedSizeBytes) {
         int delta = updatedSizeBytes - batchAllocatedSizeBytes;
         batchAllocatedSizeBytes = updatedSizeBytes;
-        if (delta != 0) {
-            if (producer != null) {
+        if (producer != null) {
+            if (delta > 0) {
                 
producer.client.getMemoryLimitController().forceReserveMemory(delta);
+            } else if (delta < 0) {
+                
producer.client.getMemoryLimitController().releaseMemory(-delta);
             }
         }
     }

Reply via email to