This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 905f54a732bd2172e45b486a618aa96a3e58e4f6
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Nov 9 23:16:16 2021 -0800

    [pulsar-client] Fix pending queue-size stats for batch messages (#12704)
    
    (cherry picked from commit 9a3e7ecb326d045a10c3438f10bda63002d4603c)
---
 .../pulsar/client/api/SimpleProducerConsumerStatTest.java      |  7 ++++++-
 .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java  | 10 +++++++++-
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 480d835..8f93494 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -33,7 +33,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.gson.Gson;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.slf4j.Logger;
@@ -46,6 +45,7 @@ import org.testng.annotations.Test;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 
@@ -76,6 +76,11 @@ public class SimpleProducerConsumerStatTest extends 
ProducerConsumerBase {
         return new Object[][] { { 0, 0 }, { 0, 2 }, { 1000, 0 }, { 1000, 2 } };
     }
 
+    @DataProvider(name = "batchingEnabled")
+    public Object[][] batchingEnabled() {
+        return new Object[][] { { true }, { false } };
+    }
+
     @Test(dataProvider = "batch_with_timeout")
     public void testSyncProducerAndConsumer(int batchMessageDelayMs, int 
ackTimeoutSec) throws Exception {
         log.info("-- Starting {} test --", methodName);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index e28721c..cc978a1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.function.Consumer;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Message;
@@ -1946,7 +1947,14 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     }
 
     public int getPendingQueueSize() {
-        return pendingMessages.size();
+        if (!isBatchMessagingEnabled()) {
+            return pendingMessages.size();
+        }
+        MutableInt size = new MutableInt(0);
+        pendingMessages.forEach(op -> {
+            size.add(Math.max(op.numMessagesInBatch, 1));
+        });
+        return size.getValue();
     }
 
     @Override

Reply via email to