This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ef9e773ef454db3db4ed41de20858c2c0b2dadaf Author: Yunze Xu <[email protected]> AuthorDate: Fri Dec 24 10:56:17 2021 +0800 Fix semaphore and memory leak when chunks failed to enqueue (#13454) ### Motivation When a large message is sent by chunks, each chunk needs to reserve a spot of the semaphore. However, when it failed, the already reserved memory from limiter and spots from semaphore are not released. ### Modifications - Release the semaphore and memory when `canEnqueueRequest` returns false for chunks. - Add `testChunksEnqueueFailed` to cover this case. It sends a large message whose number of chunks is greater than the `maxPendingMessages` so that the first time `canEnqueueRequest` returns true while the following `canEnqueueRequest` calls will return false. (cherry picked from commit 2e2cd57f984b601a878dd11e1d27f4c169a84e5b) --- .../pulsar/client/impl/MessageChunkingTest.java | 43 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ProducerImpl.java | 2 + 2 files changed, 45 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java index d4eab77..40191ef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; @@ -27,17 +28,20 @@ import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -47,6 +51,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.impl.MessageImpl.SchemaState; import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -369,6 +374,44 @@ public class MessageChunkingTest extends ProducerConsumerBase { producer = null; // clean reference of mocked producer } + @Test + public void testChunksEnqueueFailed() throws Exception { + final String topicName = "persistent://my-property/my-ns/test-chunks-enqueue-failed"; + log.info("-- Starting {} test --", methodName); + this.conf.setMaxMessageSize(5); + + final MemoryLimitController controller = ((PulsarClientImpl) pulsarClient).getMemoryLimitController(); + assertEquals(controller.currentUsage(), 0); + + final int maxPendingMessages = 10; + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .maxPendingMessages(maxPendingMessages) + .enableChunking(true) + .enableBatching(false) + .create(); + assertTrue(producer instanceof ProducerImpl); + Semaphore semaphore = ((ProducerImpl<byte[]>) producer).getSemaphore().orElse(null); + assertNotNull(semaphore); + assertEquals(semaphore.availablePermits(), maxPendingMessages); + producer.send(createMessagePayload(1).getBytes()); + try { + producer.send(createMessagePayload(100).getBytes(StandardCharsets.UTF_8)); + fail("It should fail with ProducerQueueIsFullError"); + } catch (PulsarClientException e) { + assertTrue(e instanceof PulsarClientException.ProducerQueueIsFullError); + assertEquals(controller.currentUsage(), 0); + assertEquals(semaphore.availablePermits(), maxPendingMessages); + } + } + + @Override + protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) { + clientBuilder.memoryLimit(10000L, SizeUnit.BYTES); + } + private String createMessagePayload(int size) { StringBuilder str = new StringBuilder(); Random rand = new Random(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 3afd660..a5baba7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -456,6 +456,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne // chunked message also sent individually so, try to acquire send-permits for (int i = 0; i < (totalChunks - 1); i++) { if (!canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) { + client.getMemoryLimitController().releaseMemory(uncompressedSize); + semaphoreRelease(i + 1); return; } }
