This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a3e7ec  [pulsar-client] Fix pending queue-size stats for batch 
messages (#12704)
9a3e7ec is described below

commit 9a3e7ecb326d045a10c3438f10bda63002d4603c
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Nov 9 23:16:16 2021 -0800

    [pulsar-client] Fix pending queue-size stats for batch messages (#12704)
---
 .../client/api/SimpleProducerConsumerStatTest.java       | 16 ++++++++++++----
 .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 10 +++++++++-
 2 files changed, 21 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 695b9b9..40d34a7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -33,9 +33,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.gson.Gson;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -46,6 +46,7 @@ import org.testng.annotations.Test;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 
@@ -78,6 +79,11 @@ public class SimpleProducerConsumerStatTest extends 
ProducerConsumerBase {
         return new Object[][] { { 0, 0 }, { 0, 2 }, { 1000, 0 }, { 1000, 2 } };
     }
 
+    @DataProvider(name = "batchingEnabled")
+    public Object[][] batchingEnabled() {
+        return new Object[][] { { true }, { false } };
+    }
+
     @Test(dataProvider = "batch_with_timeout")
     public void testSyncProducerAndConsumer(int batchMessageDelayMs, int 
ackTimeoutSec) throws Exception {
         log.info("-- Starting {} test --", methodName);
@@ -427,14 +433,14 @@ public class SimpleProducerConsumerStatTest extends 
ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
-    @Test
-    public void testProducerPendingQueueSizeStats() throws Exception {
+    @Test(dataProvider =  "batchingEnabled")
+    public void testProducerPendingQueueSizeStats(boolean batchingEnabled) 
throws Exception {
         log.info("-- Starting {} test --", methodName);
         ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
                 .topic("persistent://my-property/tp1/my-ns/my-topic1");
 
         @Cleanup
-        Producer<byte[]> producer = 
producerBuilder.enableBatching(false).create();
+        Producer<byte[]> producer = 
producerBuilder.enableBatching(batchingEnabled).create();
 
         stopBroker();
 
@@ -443,6 +449,8 @@ public class SimpleProducerConsumerStatTest extends 
ProducerConsumerBase {
             String message = "my-message-" + i;
             producer.sendAsync(message.getBytes());
         }
+        Awaitility.await().timeout(2, TimeUnit.MINUTES)
+                .until(() -> producer.getStats().getPendingQueueSize() == 
numMessages);
         assertEquals(producer.getStats().getPendingQueueSize(), numMessages);
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 5e0579f..40c9b99 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.function.Consumer;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Message;
@@ -1924,7 +1925,14 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     }
 
     public int getPendingQueueSize() {
-        return pendingMessages.size();
+        if (!isBatchMessagingEnabled()) {
+            return pendingMessages.size();
+        }
+        MutableInt size = new MutableInt(0);
+        pendingMessages.forEach(op -> {
+            size.add(Math.max(op.numMessagesInBatch, 1));
+        });
+        return size.getValue();
     }
 
     @Override

Reply via email to