This is an automated email from the ASF dual-hosted git repository.

daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 708c5cc0c5f [fix][client] fix incomingMessageSize and client memory 
usage is negative (#23624)
708c5cc0c5f is described below

commit 708c5cc0c5f86d6c6bbdb438067122074f4de994
Author: ken <[email protected]>
AuthorDate: Fri Nov 22 09:51:02 2024 +0800

    [fix][client] fix incomingMessageSize and client memory usage is negative 
(#23624)
    
    Co-authored-by: fanjianye <[email protected]>
---
 .../client/api/SimpleProducerConsumerTest.java     | 56 +++++++++++++++++++
 .../impl/AutoScaledReceiverQueueSizeTest.java      | 62 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  5 ++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  2 +
 4 files changed, 125 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 78d28e4b228..9e35b4f262e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4252,6 +4252,62 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         });
     }
 
+    @Test(timeOut = 100000)
+    public void testNegativeIncomingMessageSize() throws Exception {
+        final String topicName = 
"persistent://my-property/my-ns/testIncomingMessageSize-" +
+                UUID.randomUUID().toString();
+        final String subName = "my-sub";
+
+        admin.topics().createPartitionedTopic(topicName, 3);
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        final int messages = 1000;
+        List<CompletableFuture<MessageId>> messageIds = new 
ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            messageIds.add(producer.newMessage().key(i + "").value(("Message-" 
+ i).getBytes()).sendAsync());
+        }
+        FutureUtil.waitForAll(messageIds).get();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+
+        Awaitility.await().untilAsserted(() -> {
+            long size = ((ConsumerBase<byte[]>) 
consumer).getIncomingMessageSize();
+            log.info("Check the incoming message size should greater that 0, 
current size is {}", size);
+            Assert.assertTrue(size > 0);
+        });
+
+
+        for (int i = 0; i < messages; i++) {
+            consumer.receive();
+        }
+
+
+        Awaitility.await().untilAsserted(() -> {
+            long size = ((ConsumerBase<byte[]>) 
consumer).getIncomingMessageSize();
+            log.info("Check the incoming message size should be 0, current 
size is {}", size);
+            Assert.assertEquals(size, 0);
+        });
+
+
+        MultiTopicsConsumerImpl multiTopicsConsumer = 
(MultiTopicsConsumerImpl) consumer;
+        List<ConsumerImpl<byte[]>> list = multiTopicsConsumer.getConsumers();
+        for (ConsumerImpl<byte[]> subConsumer : list) {
+            long size = subConsumer.getIncomingMessageSize();
+            log.info("Check the sub consumer incoming message size should be 
0, current size is {}", size);
+            Assert.assertEquals(size, 0);
+        }
+    }
 
     @Data
     @EqualsAndHashCode
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
index 858e43e8465..5359158bf72 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
@@ -20,14 +20,22 @@ package org.apache.pulsar.client.impl;
 
 
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -257,4 +265,58 @@ public class AutoScaledReceiverQueueSizeTest extends 
MockedPulsarServiceBaseTest
         Awaitility.await().until(() -> consumer.getCurrentReceiverQueueSize() 
== currentSize * 2);
         log.info("getCurrentReceiverQueueSize={}", 
consumer.getCurrentReceiverQueueSize());
     }
+
+    @Test
+    public void testNegativeClientMemory() throws Exception {
+        final String topicName = "persistent://public/default/testMemory-" +
+                UUID.randomUUID().toString();
+        final String subName = "my-sub";
+
+        admin.topics().createPartitionedTopic(topicName, 3);
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        final int messages = 1000;
+        List<CompletableFuture<MessageId>> messageIds = new 
ArrayList<>(messages);
+        for (int i = 0; i < messages; i++) {
+            messageIds.add(producer.newMessage().key(i + "").value(("Message-" 
+ i).getBytes()).sendAsync());
+        }
+        FutureUtil.waitForAll(messageIds).get();
+
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .autoScaledReceiverQueueSizeEnabled(true)
+                .subscribe();
+
+
+        Awaitility.await().untilAsserted(() -> {
+            long size = ((ConsumerBase<byte[]>) 
consumer).getIncomingMessageSize();
+            log.info("Check the incoming message size should greater that 0, 
current size is {}", size);
+            Assert.assertTrue(size > 0);
+        });
+
+
+        for (int i = 0; i < messages; i++) {
+            consumer.receive();
+        }
+
+        Awaitility.await().untilAsserted(() -> {
+            long size = ((ConsumerBase<byte[]>) 
consumer).getIncomingMessageSize();
+            log.info("Check the incoming message size should be 0, current 
size is {}", size);
+            Assert.assertEquals(size, 0);
+        });
+
+
+        MemoryLimitController controller = 
((PulsarClientImpl)pulsarClient).getMemoryLimitController();
+        Assert.assertEquals(controller.currentUsage(), 0);
+        Assert.assertEquals(controller.currentUsagePercent(), 0);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 0fc906b6e7a..8c10577bc86 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -1232,6 +1232,11 @@ public abstract class ConsumerBase<T> extends 
HandlerState implements Consumer<T
         getMemoryLimitController().ifPresent(limiter -> 
limiter.releaseMemory(message.size()));
     }
 
+    protected void increaseIncomingMessageSize(final Message<?> message) {
+        INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, message.size());
+        getMemoryLimitController().ifPresent(limiter -> 
limiter.forceReserveMemory(message.size()));
+    }
+
     public long getIncomingMessageSize() {
         return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 004adab56f5..ffdf4cfdc8b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1668,6 +1668,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return;
         }
 
+        // increase incomingMessageSize here because the size would be 
decreased in messageProcessed() next step
+        increaseIncomingMessageSize(message);
         // increase permits for available message-queue
         messageProcessed(message);
         // call interceptor and complete received callback

Reply via email to