This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9a3e7ec [pulsar-client] Fix pending queue-size stats for batch
messages (#12704)
9a3e7ec is described below
commit 9a3e7ecb326d045a10c3438f10bda63002d4603c
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Nov 9 23:16:16 2021 -0800
[pulsar-client] Fix pending queue-size stats for batch messages (#12704)
---
.../client/api/SimpleProducerConsumerStatTest.java | 16 ++++++++++++----
.../java/org/apache/pulsar/client/impl/ProducerImpl.java | 10 +++++++++-
2 files changed, 21 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 695b9b9..40d34a7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -33,9 +33,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.gson.Gson;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
@@ -46,6 +46,7 @@ import org.testng.annotations.Test;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
@@ -78,6 +79,11 @@ public class SimpleProducerConsumerStatTest extends
ProducerConsumerBase {
return new Object[][] { { 0, 0 }, { 0, 2 }, { 1000, 0 }, { 1000, 2 } };
}
+ @DataProvider(name = "batchingEnabled")
+ public Object[][] batchingEnabled() {
+ return new Object[][] { { true }, { false } };
+ }
+
@Test(dataProvider = "batch_with_timeout")
public void testSyncProducerAndConsumer(int batchMessageDelayMs, int
ackTimeoutSec) throws Exception {
log.info("-- Starting {} test --", methodName);
@@ -427,14 +433,14 @@ public class SimpleProducerConsumerStatTest extends
ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
- @Test
- public void testProducerPendingQueueSizeStats() throws Exception {
+ @Test(dataProvider = "batchingEnabled")
+ public void testProducerPendingQueueSizeStats(boolean batchingEnabled)
throws Exception {
log.info("-- Starting {} test --", methodName);
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
.topic("persistent://my-property/tp1/my-ns/my-topic1");
@Cleanup
- Producer<byte[]> producer =
producerBuilder.enableBatching(false).create();
+ Producer<byte[]> producer =
producerBuilder.enableBatching(batchingEnabled).create();
stopBroker();
@@ -443,6 +449,8 @@ public class SimpleProducerConsumerStatTest extends
ProducerConsumerBase {
String message = "my-message-" + i;
producer.sendAsync(message.getBytes());
}
+ Awaitility.await().timeout(2, TimeUnit.MINUTES)
+ .until(() -> producer.getStats().getPendingQueueSize() ==
numMessages);
assertEquals(producer.getStats().getPendingQueueSize(), numMessages);
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 5e0579f..40c9b99 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
@@ -1924,7 +1925,14 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
public int getPendingQueueSize() {
- return pendingMessages.size();
+ if (!isBatchMessagingEnabled()) {
+ return pendingMessages.size();
+ }
+ MutableInt size = new MutableInt(0);
+ pendingMessages.forEach(op -> {
+ size.add(Math.max(op.numMessagesInBatch, 1));
+ });
+ return size.getValue();
}
@Override