This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 59339c42b54856b54a877489af1e8b8418095045 Author: lixinyang <[email protected]> AuthorDate: Wed Aug 3 12:09:13 2022 +0800 [fix][client]Fix MaxQueueSize semaphore release leak in createOpSendMsg (#16915) (cherry picked from commit d95f6cf366f66bc1e38711bc59cd456c8f53f888) --- .../apache/pulsar/client/impl/ProducerSemaphoreTest.java | 13 ++++++++++++- .../pulsar/client/impl/BatchMessageContainerImpl.java | 1 + 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java index b095b1b90a9..8e2876d0623 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java @@ -66,7 +66,7 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer() .topic("testProducerSemaphoreAcquire") .maxPendingMessages(pendingQueueSize) - .enableBatching(false) + .enableBatching(true) .create(); this.stopBroker(); @@ -79,6 +79,17 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { } catch (PulsarClientException.InvalidMessageException ex) { Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); } + + producer.conf.setBatchingEnabled(false); + try { + try (MockedStatic<ClientCnx> mockedStatic = Mockito.mockStatic(ClientCnx.class)) { + mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2); + producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8)); + } + throw new IllegalStateException("can not reach here"); + } catch (PulsarClientException.InvalidMessageException ex) { + Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize); + } } @Test(timeOut = 30000) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 2d08e098de4..bf03d80ce0e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -200,6 +200,7 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { public OpSendMsg createOpSendMsg() throws IOException { ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) { + producer.semaphoreRelease(messages.size()); messages.forEach(msg -> producer.client.getMemoryLimitController() .releaseMemory(msg.getUncompressedSize())); discard(new PulsarClientException.InvalidMessageException(
