This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 58946da1f4e09bfec4e67ca39416123beefcc700 Author: lixinyang <[email protected]> AuthorDate: Mon Sep 5 10:35:30 2022 +0800 [fix][broker]Fix memoryLimitController currentUsage and MaxQueueSize semaphore leak when batchMessageContainer add message exception (#17276) * fix memoryLimitController currentUsage and MaxQueueSize semaphore leak when batchMessageContainer add message * fix unit test Co-authored-by: nicklixinyang <[email protected]> --- .../client/impl/ProducerMemoryLimitTest.java | 40 ++++++++++++++++++++++ .../pulsar/client/impl/ProducerSemaphoreTest.java | 38 ++++++++++++++++++++ .../client/impl/BatchMessageContainerImpl.java | 2 ++ .../client/impl/BatchMessageContainerImplTest.java | 13 +++++++ 4 files changed, 93 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java index 8bbfa19c509..2a981fcb6df 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java @@ -18,6 +18,11 @@ */ package org.apache.pulsar.client.impl; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import io.netty.buffer.ByteBufAllocator; +import java.lang.reflect.Field; import lombok.Cleanup; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; @@ -125,6 +130,41 @@ public class ProducerMemoryLimitTest extends ProducerConsumerBase { } } + @Test(timeOut = 10_000) + public void testBatchMessageOOMMemoryRelease() throws Exception { + initClientWithMemoryLimit(); + @Cleanup + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer() + .topic("testProducerMemoryLimit") + .sendTimeout(5, TimeUnit.SECONDS) + .maxPendingMessages(0) + .enableBatching(true) + .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) + .batchingMaxBytes(12) + .create(); + this.stopBroker(); + + try { + ProducerImpl<byte[]> spyProducer = Mockito.spy(producer); + final ByteBufAllocator mockAllocator = mock(ByteBufAllocator.class); + doAnswer((ignore) -> { + throw new OutOfMemoryError("memory-test"); + }).when(mockAllocator).buffer(anyInt()); + + final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(mockAllocator); + Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer"); + batchMessageContainerField.setAccessible(true); + batchMessageContainerField.set(spyProducer, batchMessageContainer); + + spyProducer.send("memory-test".getBytes(StandardCharsets.UTF_8)); + Assert.fail("can not reach here"); + } catch (PulsarClientException ex) { + PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient; + final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController(); + Assert.assertEquals(memoryLimitController.currentUsage(), 0); + } + } + @Test(timeOut = 10_000) public void testProducerCloseMemoryRelease() throws Exception { initClientWithMemoryLimit(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java index 2bc81c48f9b..f04f94d11b1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java @@ -19,6 +19,10 @@ package org.apache.pulsar.client.impl; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import io.netty.buffer.ByteBufAllocator; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; @@ -274,4 +278,38 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 10); } } + + @Test(timeOut = 10_000) + public void testBatchMessageOOMProducerSemaphoreRelease() throws Exception { + final int pendingQueueSize = 10; + @Cleanup + ProducerImpl<byte[]> producer = + (ProducerImpl<byte[]>) pulsarClient.newProducer() + .topic("testProducerSemaphoreRelease") + .sendTimeout(5, TimeUnit.SECONDS) + .maxPendingMessages(pendingQueueSize) + .enableBatching(true) + .batchingMaxPublishDelay(500, TimeUnit.MILLISECONDS) + .batchingMaxBytes(12) + .create(); + this.stopBroker(); + + try { + ProducerImpl<byte[]> spyProducer = Mockito.spy(producer); + final ByteBufAllocator mockAllocator = mock(ByteBufAllocator.class); + doAnswer((ignore) -> { + throw new OutOfMemoryError("semaphore-test"); + }).when(mockAllocator).buffer(anyInt()); + + final BatchMessageContainerImpl batchMessageContainer = new BatchMessageContainerImpl(mockAllocator); + Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer"); + batchMessageContainerField.setAccessible(true); + batchMessageContainerField.set(spyProducer, batchMessageContainer); + + spyProducer.send("semaphore-test".getBytes(StandardCharsets.UTF_8)); + Assert.fail("can not reach here"); + } catch (PulsarClientException ex) { + Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + } + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 44fad489dac..43c229d6e81 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -108,6 +108,8 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { } } catch (Throwable e) { log.error("construct first message failed, exception is ", e); + producer.semaphoreRelease(getNumMessagesInBatch()); + producer.client.getMemoryLimitController().releaseMemory(msg.getUncompressedSize()); discard(new PulsarClientException(e)); return false; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java index a4f0205c2cf..1e640301f89 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertTrue; import io.netty.buffer.ByteBufAllocator; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicBoolean; @@ -31,6 +32,7 @@ import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.junit.Assert; import org.testng.annotations.Test; public class BatchMessageContainerImplTest { @@ -41,6 +43,17 @@ public class BatchMessageContainerImplTest { final ProducerImpl<?> producer = mock(ProducerImpl.class); final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData(); producerConfigurationData.setCompressionType(CompressionType.NONE); + PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); + MemoryLimitController memoryLimitController = mock(MemoryLimitController.class); + when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController); + try { + Field clientFiled = HandlerState.class.getDeclaredField("client"); + clientFiled.setAccessible(true); + clientFiled.set(producer, pulsarClient); + } catch (Exception e){ + Assert.fail(e.getMessage()); + } + when(producer.getConfiguration()).thenReturn(producerConfigurationData); final ByteBufAllocator mockAllocator = mock(ByteBufAllocator.class); doAnswer((ignore) -> {
