This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9610640e26b634df2061f9f6ff46c028ca75bd8e Author: lixinyang <[email protected]> AuthorDate: Sat Jul 30 19:31:07 2022 +0800 [fix][client]Fix client memory limit currentUsage leak and semaphore release duplicated in ProducerImpl (#16837) ### Motivation Fix client memory limit `currentUsage` leak in `ProducerImpl`. When our pulsar cluster occur some error, producer send message fail and we find the `currentUsage` always keep high value like the leaked, and cause the producer send rate is slow. And find producer semaphore release duplicated when `createOpSendMsg` occur some excrption. Follow 1 point only release the message count semaphore, but not release the memory limit. **memory limit currentUsage leak point** https://github.com/apache/pulsar/blob/c217b8f559292fd34c6a4fb4b30aab213720d962/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2031-L2033 **producer semaphore release duplicated** https://github.com/apache/pulsar/blob/4d64e2e66689381ebbb94fbfc03eb4e1dfba0405/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2116-L2120 ``` After the exception the memory limit leak occured. org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer XXXX-366-15151 can not send message to the topic persistent://XXXX/XXXX/XXXX within given timeout : createdAt 30.005 seconds ago, firstSentAt 30.005 seconds ago, lastSentAt 30.005 seconds ago, retryCount 1 at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1287) at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$18(ProducerImpl.java:1826) at java.base/java.util.ArrayDeque.forEach(ArrayDeque.java:889) at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1369) at org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:1816) at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$19(ProducerImpl.java:1848) at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834) ``` ### Modifications 1. add the `MemoryLimitController` release. ### Documentation - [X] `doc-not-needed` (cherry picked from commit 955dcd10ce28b996811e194c9ad852b06ab30aee) --- .../client/impl/ProducerMemoryLimitTest.java | 29 ++++++++++++++++++ .../pulsar/client/impl/ProducerSemaphoreTest.java | 35 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ProducerImpl.java | 5 ++-- 3 files changed, 66 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java index 264ec306413..e49c4f3b70c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java @@ -69,6 +69,35 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase { } + @Test(timeOut = 10_000) + public void testProducerBatchSendTimeoutMemoryRelease() throws Exception { + initClientWithMemoryLimit(); + @Cleanup + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer() + .topic("testProducerMemoryLimit") + .sendTimeout(5, TimeUnit.SECONDS) + .maxPendingMessages(0) + .enableBatching(true) + .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) + .batchingMaxBytes(12) + .create(); + this.stopBroker(); + try { + producer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync(); + try { + producer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync().get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + + throw new IllegalStateException("can not reach here"); + } catch (PulsarClientException.TimeoutException ex) { + PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient; + final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController(); + Assert.assertEquals(memoryLimitController.currentUsage(), 0); + } + } + @Test(timeOut = 10_000) public void testProducerCloseMemoryRelease() throws Exception { initClientWithMemoryLimit(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java index d325ed67852..7ea63aa674f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java @@ -18,6 +18,10 @@ */ package org.apache.pulsar.client.impl; +import static org.mockito.ArgumentMatchers.any; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.ProducerConsumerBase; @@ -206,4 +210,35 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); Assert.assertFalse(producer.isErrorStat()); } + + @Test(timeOut = 10_000) + public void testBatchMessageSendTimeoutProducerSemaphoreRelease() throws Exception { + final int pendingQueueSize = 10; + @Cleanup + ProducerImpl<byte[]> producer = + (ProducerImpl<byte[]>) pulsarClient.newProducer() + .topic("testProducerSemaphoreRelease") + .sendTimeout(5, TimeUnit.SECONDS) + .maxPendingMessages(pendingQueueSize) + .enableBatching(true) + .batchingMaxPublishDelay(500, TimeUnit.MILLISECONDS) + .batchingMaxBytes(12) + .create(); + this.stopBroker(); + try { + ProducerImpl<byte[]> spyProducer = Mockito.spy(producer); + Mockito.doThrow(new PulsarClientException.CryptoException("crypto error")).when(spyProducer) + .encryptMessage(any(),any()); + + Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer"); + batchMessageContainerField.setAccessible(true); + BatchMessageContainerImpl batchMessageContainer = (BatchMessageContainerImpl) batchMessageContainerField.get(spyProducer); + batchMessageContainer.setProducer(spyProducer); + spyProducer.send("semaphore-test".getBytes(StandardCharsets.UTF_8)); + + throw new IllegalStateException("can not reach here"); + } catch (PulsarClientException.TimeoutException ex) { + Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 10); + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 9dd2e01c375..bf8fb97cb21 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1787,8 +1787,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne return; } final int numMessagesInBatch = batchMessageContainer.getNumMessagesInBatch(); + final long currentBatchSize = batchMessageContainer.getCurrentBatchSize(); batchMessageContainer.discard(ex); semaphoreRelease(numMessagesInBatch); + client.getMemoryLimitController().releaseMemory(currentBatchSize); } @Override @@ -1830,10 +1832,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne for (OpSendMsg opSendMsg : opSendMsgs) { processOpSendMsg(opSendMsg); } - } catch (PulsarClientException e) { - semaphoreRelease(batchMessageContainer.getNumMessagesInBatch()); } catch (Throwable t) { - semaphoreRelease(batchMessageContainer.getNumMessagesInBatch()); log.warn("[{}] [{}] error while create opSendMsg by batch message container", topic, producerName, t); } }
