This is an automated email from the ASF dual-hosted git repository.
shoothzj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 57b008a4114 Forget to update memory usage when invalid message (#16835)
57b008a4114 is described below
commit 57b008a411463bce4c26350177dae4346f7b84d2
Author: ZhangJian He <[email protected]>
AuthorDate: Fri Jul 29 10:43:50 2022 +0800
Forget to update memory usage when invalid message (#16835)
### Modifications
release memory usage when invalid message.
Only need to release memory usage here, no need to release semaphore. Both
add testcases.
coauthored by @pengxiangrui127.
### Verifying this change
- add unit tests for this change
### Documentation
Check the box below or label this PR directly.
Need to update docs?
- [x] `doc-not-needed`
bug fix, no need doc
---
.../client/impl/ProducerMemoryLimitTest.java | 27 ++++++++++++++++++++++
.../pulsar/client/impl/ProducerSemaphoreTest.java | 26 +++++++++++++++++++++
.../client/impl/BatchMessageContainerImpl.java | 2 ++
3 files changed, 55 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
index 264ec306413..0856dfc88b2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
@@ -23,6 +23,8 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -47,6 +49,31 @@ public class ProducerMemoryLimitTest extends
ProducerConsumerBase {
super.internalCleanup();
}
+ @Test(timeOut = 10_000)
+ public void testProducerInvalidMessageMemoryRelease() throws Exception {
+ initClientWithMemoryLimit();
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
+ .topic("testProducerMemoryLimit")
+ .sendTimeout(5, TimeUnit.SECONDS)
+ .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+ .batchingMaxBytes(10240)
+ .enableBatching(true)
+ .create();
+ this.stopBroker();
+ try {
+ try (MockedStatic<ClientCnx> mockedStatic =
Mockito.mockStatic(ClientCnx.class)) {
+ mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(8);
+ producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
+ }
+ throw new IllegalStateException("can not reach here");
+ } catch (PulsarClientException.InvalidMessageException ex) {
+ PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
+ final MemoryLimitController memoryLimitController =
clientImpl.getMemoryLimitController();
+ Assert.assertEquals(memoryLimitController.currentUsage(), 0);
+ }
+ }
+
@Test(timeOut = 10_000)
public void testProducerTimeoutMemoryRelease() throws Exception {
initClientWithMemoryLimit();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 78fc659a205..cc7b601e42a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -25,12 +25,15 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.util.FutureUtil;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -52,6 +55,29 @@ public class ProducerSemaphoreTest extends
ProducerConsumerBase {
super.internalCleanup();
}
+ @Test(timeOut = 10_000)
+ public void testProducerSemaphoreInvalidMessage() throws Exception {
+ final int pendingQueueSize = 100;
+
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
+ .topic("testProducerSemaphoreAcquire")
+ .maxPendingMessages(pendingQueueSize)
+ .enableBatching(false)
+ .create();
+
+ this.stopBroker();
+ try {
+ try (MockedStatic<ClientCnx> mockedStatic =
Mockito.mockStatic(ClientCnx.class)) {
+ mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2);
+
producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
+ }
+ throw new IllegalStateException("can not reach here");
+ } catch (PulsarClientException.InvalidMessageException ex) {
+
Assert.assertEquals(producer.getSemaphore().get().availablePermits(),
pendingQueueSize);
+ }
+ }
+
@Test(timeOut = 30000)
public void testProducerSemaphoreAcquireAndRelease() throws
PulsarClientException, ExecutionException, InterruptedException {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 02fb491d09d..0d107aa7ba9 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -199,6 +199,8 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
public OpSendMsg createOpSendMsg() throws IOException {
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
getCompressedBatchMetadataAndPayload());
if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+ messages.forEach(msg -> producer.client.getMemoryLimitController()
+ .releaseMemory(msg.getUncompressedSize()));
discard(new PulsarClientException.InvalidMessageException(
"Message size is bigger than " +
ClientCnx.getMaxMessageSize() + " bytes"));
return null;