This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5e3f8ba5e5c [improve][client] support aggregate metrics for partition
topic stats (#18214)
5e3f8ba5e5c is described below
commit 5e3f8ba5e5c2b2375880b679abebb2b03e738cd8
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Oct 27 05:54:12 2022 -0700
[improve][client] support aggregate metrics for partition topic stats
(#18214)
---
.../pulsar/client/impl/ProducerStatsRecorderImpl.java | 17 +++++++++++++++--
.../client/impl/ProducerStatsRecorderImplTest.java | 13 +++++++++++++
2 files changed, 28 insertions(+), 2 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index 01ed84f5503..a7e541e5de1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -27,6 +27,7 @@ import io.netty.util.TimerTask;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -50,6 +51,8 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
private final LongAdder totalBytesSent;
private final LongAdder totalSendFailed;
private final LongAdder totalAcksReceived;
+ private final DoubleAdder sendMsgsRateAggregate;
+ private final DoubleAdder sendBytesRateAggregate;
private static final DecimalFormat DEC = new DecimalFormat("0.000");
private static final DecimalFormat THROUGHPUT_FORMAT = new
DecimalFormat("0.00");
private final transient DoublesSketch ds;
@@ -58,6 +61,7 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
private volatile double sendMsgsRate;
private volatile double sendBytesRate;
+ private int partitions = 0;
private volatile double[] latencyPctValues = new
double[PERCENTILES.length];
private volatile double[] batchSizePctValues = new
double[PERCENTILES.length];
private volatile double[] msgSizePctValues = new
double[PERCENTILES.length];
@@ -73,6 +77,8 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
totalBytesSent = new LongAdder();
totalSendFailed = new LongAdder();
totalAcksReceived = new LongAdder();
+ sendMsgsRateAggregate = new DoubleAdder();
+ sendBytesRateAggregate = new DoubleAdder();
ds = DoublesSketch.builder().build(256);
batchSizeDs = DoublesSketch.builder().build(256);
msgSizeDs = DoublesSketch.builder().build(256);
@@ -91,6 +97,8 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
totalBytesSent = new LongAdder();
totalSendFailed = new LongAdder();
totalAcksReceived = new LongAdder();
+ sendMsgsRateAggregate = new DoubleAdder();
+ sendBytesRateAggregate = new DoubleAdder();
ds = DoublesSketch.builder().build(256);
batchSizeDs = DoublesSketch.builder().build(256);
msgSizeDs = DoublesSketch.builder().build(256);
@@ -239,6 +247,7 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
totalBytesSent.reset();
totalSendFailed.reset();
totalAcksReceived.reset();
+ partitions = 0;
}
void updateCumulativeStats(ProducerStats stats) {
@@ -253,6 +262,10 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
totalBytesSent.add(stats.getTotalBytesSent());
totalSendFailed.add(stats.getTotalSendFailed());
totalAcksReceived.add(stats.getTotalAcksReceived());
+ // update rates
+ sendMsgsRateAggregate.add(stats.getSendMsgsRate());
+ sendBytesRateAggregate.add(stats.getSendBytesRate());
+ partitions++;
}
@Override
@@ -293,12 +306,12 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
@Override
public double getSendMsgsRate() {
- return sendMsgsRate;
+ return partitions != 0 ? sendMsgsRateAggregate.doubleValue() /
partitions : sendMsgsRate;
}
@Override
public double getSendBytesRate() {
- return sendBytesRate;
+ return partitions != 0 ? sendBytesRateAggregate.doubleValue() /
partitions : sendBytesRate;
}
@Override
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
index daf4aa4473b..28f47105f86 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java
@@ -26,7 +26,9 @@ import org.testng.annotations.Test;
import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -74,4 +76,15 @@ public class ProducerStatsRecorderImplTest {
recorder.cancelStatsTimeout();
assertEquals(1000.0, recorder.getSendLatencyMillisMax(), 0.5);
}
+
+ @Test
+ public void testPartitionTopicAggegationStats() {
+ ProducerStatsRecorderImpl recorder1 = spy(new
ProducerStatsRecorderImpl());
+ ProducerStatsRecorderImpl recorder2 = new ProducerStatsRecorderImpl();
+ when(recorder1.getSendMsgsRate()).thenReturn(1000.0);
+ when(recorder1.getSendBytesRate()).thenReturn(1000.0);
+ recorder2.updateCumulativeStats(recorder1);
+ assertTrue(recorder2.getSendBytesRate() > 0);
+ assertTrue(recorder2.getSendMsgsRate() > 0);
+ }
}