This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 99d06b94fa7 [fix][client] Fix producer thread block forever on memory
limit controller (#21790)
99d06b94fa7 is described below
commit 99d06b94fa715b3f1062c4a3f616d5cc725e47a4
Author: wenbingshen <[email protected]>
AuthorDate: Tue Dec 26 19:10:59 2023 +0800
[fix][client] Fix producer thread block forever on memory limit controller
(#21790)
---
.../client/impl/ProducerMemoryLimitTest.java | 42 ++++++++++++++++++++--
.../client/impl/BatchMessageContainerImpl.java | 6 ++--
2 files changed, 43 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index 3ec784e248c..d776fdb0ed9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -23,7 +23,12 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import io.netty.buffer.ByteBufAllocator;
import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -35,9 +40,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-
@Test(groups = "broker-impl")
public class ProducerMemoryLimitTest extends ProducerConsumerBase {
@@ -191,6 +193,40 @@ public class ProducerMemoryLimitTest extends
ProducerConsumerBase {
Assert.assertEquals(memoryLimitController.currentUsage(), 0);
}
+ @Test(timeOut = 10_000)
+ public void testProducerBlockReserveMemory() throws Exception {
+ replacePulsarClient(PulsarClient.builder().
+ serviceUrl(lookupUrl.toString())
+ .memoryLimit(1, SizeUnit.KILO_BYTES));
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
+ .topic("testProducerMemoryLimit")
+ .sendTimeout(5, TimeUnit.SECONDS)
+ .compressionType(CompressionType.SNAPPY)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+ .maxPendingMessages(0)
+ .blockIfQueueFull(true)
+ .enableBatching(true)
+ .batchingMaxMessages(100)
+ .batchingMaxBytes(65536)
+ .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+ .create();
+ int msgCount = 5;
+ CountDownLatch cdl = new CountDownLatch(msgCount);
+ for (int i = 0; i < msgCount; i++) {
+
producer.sendAsync("memory-test".getBytes(StandardCharsets.UTF_8)).whenComplete(((messageId,
throwable) -> {
+ cdl.countDown();
+ }));
+ }
+
+ cdl.await();
+
+ producer.close();
+ PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+ final MemoryLimitController memoryLimitController =
clientImpl.getMemoryLimitController();
+ Assert.assertEquals(memoryLimitController.currentUsage(), 0);
+ }
+
private void initClientWithMemoryLimit() throws PulsarClientException {
replacePulsarClient(PulsarClient.builder().
serviceUrl(lookupUrl.toString())
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 9be7210a387..dfcbc42bcc6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -324,9 +324,11 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
protected void updateAndReserveBatchAllocatedSize(int updatedSizeBytes) {
int delta = updatedSizeBytes - batchAllocatedSizeBytes;
batchAllocatedSizeBytes = updatedSizeBytes;
- if (delta != 0) {
- if (producer != null) {
+ if (producer != null) {
+ if (delta > 0) {
producer.client.getMemoryLimitController().forceReserveMemory(delta);
+ } else if (delta < 0) {
+
producer.client.getMemoryLimitController().releaseMemory(-delta);
}
}
}