This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c73a29d13750fdb71526f8cfb6a63804d9359e37 Author: ken <[email protected]> AuthorDate: Fri Nov 22 09:51:02 2024 +0800 [fix][client] fix incomingMessageSize and client memory usage is negative (#23624) Co-authored-by: fanjianye <[email protected]> (cherry picked from commit 708c5cc0c5f86d6c6bbdb438067122074f4de994) --- .../client/api/SimpleProducerConsumerTest.java | 56 +++++++++++++++++++ .../impl/AutoScaledReceiverQueueSizeTest.java | 62 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerBase.java | 5 ++ .../apache/pulsar/client/impl/ConsumerImpl.java | 2 + 4 files changed, 125 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 78d28e4b228..9e35b4f262e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4252,6 +4252,62 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { }); } + @Test(timeOut = 100000) + public void testNegativeIncomingMessageSize() throws Exception { + final String topicName = "persistent://my-property/my-ns/testIncomingMessageSize-" + + UUID.randomUUID().toString(); + final String subName = "my-sub"; + + admin.topics().createPartitionedTopic(topicName, 3); + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .create(); + + final int messages = 1000; + List<CompletableFuture<MessageId>> messageIds = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + messageIds.add(producer.newMessage().key(i + "").value(("Message-" + i).getBytes()).sendAsync()); + } + FutureUtil.waitForAll(messageIds).get(); + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + + Awaitility.await().untilAsserted(() -> { + long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize(); + log.info("Check the incoming message size should greater that 0, current size is {}", size); + Assert.assertTrue(size > 0); + }); + + + for (int i = 0; i < messages; i++) { + consumer.receive(); + } + + + Awaitility.await().untilAsserted(() -> { + long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize(); + log.info("Check the incoming message size should be 0, current size is {}", size); + Assert.assertEquals(size, 0); + }); + + + MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer; + List<ConsumerImpl<byte[]>> list = multiTopicsConsumer.getConsumers(); + for (ConsumerImpl<byte[]> subConsumer : list) { + long size = subConsumer.getIncomingMessageSize(); + log.info("Check the sub consumer incoming message size should be 0, current size is {}", size); + Assert.assertEquals(size, 0); + } + } @Data @EqualsAndHashCode diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java index 858e43e8465..5359158bf72 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java @@ -20,14 +20,22 @@ package org.apache.pulsar.client.impl; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.BatchReceivePolicy; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -257,4 +265,58 @@ public class AutoScaledReceiverQueueSizeTest extends MockedPulsarServiceBaseTest Awaitility.await().until(() -> consumer.getCurrentReceiverQueueSize() == currentSize * 2); log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize()); } + + @Test + public void testNegativeClientMemory() throws Exception { + final String topicName = "persistent://public/default/testMemory-" + + UUID.randomUUID().toString(); + final String subName = "my-sub"; + + admin.topics().createPartitionedTopic(topicName, 3); + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .create(); + + final int messages = 1000; + List<CompletableFuture<MessageId>> messageIds = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + messageIds.add(producer.newMessage().key(i + "").value(("Message-" + i).getBytes()).sendAsync()); + } + FutureUtil.waitForAll(messageIds).get(); + + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .autoScaledReceiverQueueSizeEnabled(true) + .subscribe(); + + + Awaitility.await().untilAsserted(() -> { + long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize(); + log.info("Check the incoming message size should greater that 0, current size is {}", size); + Assert.assertTrue(size > 0); + }); + + + for (int i = 0; i < messages; i++) { + consumer.receive(); + } + + Awaitility.await().untilAsserted(() -> { + long size = ((ConsumerBase<byte[]>) consumer).getIncomingMessageSize(); + log.info("Check the incoming message size should be 0, current size is {}", size); + Assert.assertEquals(size, 0); + }); + + + MemoryLimitController controller = ((PulsarClientImpl)pulsarClient).getMemoryLimitController(); + Assert.assertEquals(controller.currentUsage(), 0); + Assert.assertEquals(controller.currentUsagePercent(), 0); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 0fc906b6e7a..8c10577bc86 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -1232,6 +1232,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(message.size())); } + protected void increaseIncomingMessageSize(final Message<?> message) { + INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size()); + getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(message.size())); + } + public long getIncomingMessageSize() { return INCOMING_MESSAGES_SIZE_UPDATER.get(this); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 004adab56f5..ffdf4cfdc8b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1668,6 +1668,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return; } + // increase incomingMessageSize here because the size would be decreased in messageProcessed() next step + increaseIncomingMessageSize(message); // increase permits for available message-queue messageProcessed(message); // call interceptor and complete received callback
