This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ad9efae Forget to update memory usage on producer close (#11906)
ad9efae is described below
commit ad9efae1abf9675f830052ab1d4697330b23a750
Author: Shoothzj <[email protected]>
AuthorDate: Tue Sep 7 00:14:38 2021 +0800
Forget to update memory usage on producer close (#11906)
---
.../pulsar/client/impl/ProducerMemoryLimitTest.java | 20 +++++++++++++++++++-
.../org/apache/pulsar/client/impl/ProducerImpl.java | 2 ++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index b6ec6a5..264ec30 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -59,7 +59,7 @@ public class ProducerMemoryLimitTest extends
ProducerConsumerBase {
.create();
this.stopBroker();
try {
- producer.send("memroy-test".getBytes(StandardCharsets.UTF_8));
+ producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
throw new IllegalStateException("can not reach here");
} catch (PulsarClientException.TimeoutException ex) {
PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
@@ -69,6 +69,24 @@ public class ProducerMemoryLimitTest extends
ProducerConsumerBase {
}
+ @Test(timeOut = 10_000)
+ public void testProducerCloseMemoryRelease() throws Exception {
+ initClientWithMemoryLimit();
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
+ .topic("testProducerMemoryLimit")
+ .sendTimeout(5, TimeUnit.SECONDS)
+ .maxPendingMessages(0)
+ .enableBatching(false)
+ .create();
+ this.stopBroker();
+ producer.sendAsync("memory-test".getBytes(StandardCharsets.UTF_8));
+ producer.close();
+ PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+ final MemoryLimitController memoryLimitController =
clientImpl.getMemoryLimitController();
+ Assert.assertEquals(memoryLimitController.currentUsage(), 0);
+ }
+
private void initClientWithMemoryLimit() throws PulsarClientException {
pulsarClient = PulsarClient.builder().
serviceUrl(lookupUrl.toString())
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 439ceff..50f2d5d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -884,6 +884,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
format("The producer %s of the topic %s was already closed
when closing the producers",
producerName, topic));
pendingMessages.forEach(msg -> {
+
client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
msg.sendComplete(ex);
msg.cmd.release();
msg.recycle();
@@ -907,6 +908,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
log.info("[{}] [{}] Closed Producer", topic, producerName);
setState(State.Closed);
pendingMessages.forEach(msg -> {
+
client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
msg.cmd.release();
msg.recycle();
});