This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 905f54a732bd2172e45b486a618aa96a3e58e4f6 Author: Rajan Dhabalia <[email protected]> AuthorDate: Tue Nov 9 23:16:16 2021 -0800 [pulsar-client] Fix pending queue-size stats for batch messages (#12704) (cherry picked from commit 9a3e7ecb326d045a10c3438f10bda63002d4603c) --- .../pulsar/client/api/SimpleProducerConsumerStatTest.java | 7 ++++++- .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 10 +++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java index 480d835..8f93494 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java @@ -33,7 +33,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.gson.Gson; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.client.admin.PulsarAdminException; import org.slf4j.Logger; @@ -46,6 +45,7 @@ import org.testng.annotations.Test; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.RateLimiter; +import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; @@ -76,6 +76,11 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase { return new Object[][] { { 0, 0 }, { 0, 2 }, { 1000, 0 }, { 1000, 2 } }; } + @DataProvider(name = "batchingEnabled") + public Object[][] batchingEnabled() { + return new Object[][] { { true }, { false } }; + } + @Test(dataProvider = "batch_with_timeout") public void testSyncProducerAndConsumer(int batchMessageDelayMs, int ackTimeoutSec) throws Exception { log.info("-- Starting {} test --", methodName); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index e28721c..cc978a1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.function.Consumer; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Message; @@ -1946,7 +1947,14 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } public int getPendingQueueSize() { - return pendingMessages.size(); + if (!isBatchMessagingEnabled()) { + return pendingMessages.size(); + } + MutableInt size = new MutableInt(0); + pendingMessages.forEach(op -> { + size.add(Math.max(op.numMessagesInBatch, 1)); + }); + return size.getValue(); } @Override
