This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5a369be528b296daf3afc6362d32c2a5dbaa4675 Author: Shoothzj <[email protected]> AuthorDate: Tue Sep 7 00:14:38 2021 +0800 Forget to update memory usage on producer close (#11906) (cherry picked from commit ad9efae1abf9675f830052ab1d4697330b23a750) --- .../pulsar/client/impl/ProducerMemoryLimitTest.java | 20 +++++++++++++++++++- .../org/apache/pulsar/client/impl/ProducerImpl.java | 2 ++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java index b6ec6a5..264ec30 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java @@ -59,7 +59,7 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase { .create(); this.stopBroker(); try { - producer.send("memroy-test".getBytes(StandardCharsets.UTF_8)); + producer.send("memory-test".getBytes(StandardCharsets.UTF_8)); throw new IllegalStateException("can not reach here"); } catch (PulsarClientException.TimeoutException ex) { PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient; @@ -69,6 +69,24 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase { } + @Test(timeOut = 10_000) + public void testProducerCloseMemoryRelease() throws Exception { + initClientWithMemoryLimit(); + @Cleanup + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer() + .topic("testProducerMemoryLimit") + .sendTimeout(5, TimeUnit.SECONDS) + .maxPendingMessages(0) + .enableBatching(false) + .create(); + this.stopBroker(); + producer.sendAsync("memory-test".getBytes(StandardCharsets.UTF_8)); + producer.close(); + PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient; + final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController(); + Assert.assertEquals(memoryLimitController.currentUsage(), 0); + } + private void initClientWithMemoryLimit() throws PulsarClientException { pulsarClient = PulsarClient.builder(). serviceUrl(lookupUrl.toString()) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 564682c..e531345 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -875,6 +875,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne format("The producer %s of the topic %s was already closed when closing the producers", producerName, topic)); pendingMessages.forEach(msg -> { + client.getMemoryLimitController().releaseMemory(msg.uncompressedSize); msg.sendComplete(ex); msg.cmd.release(); msg.recycle(); @@ -898,6 +899,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne log.info("[{}] [{}] Closed Producer", topic, producerName); setState(State.Closed); pendingMessages.forEach(msg -> { + client.getMemoryLimitController().releaseMemory(msg.uncompressedSize); msg.cmd.release(); msg.recycle(); });
