This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 44a273d Log producer batchSize and msgSize percentiles (#14229)
44a273d is described below
commit 44a273d5dbad913553ce9c9c02c2a072892b9564
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Tue Feb 15 22:30:46 2022 +0800
Log producer batchSize and msgSize percentiles (#14229)
---
.../client/impl/ProducerStatsRecorderImpl.java | 33 ++++++++++++++++++++++
1 file changed, 33 insertions(+)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index 989da06..29b8cf2 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -53,10 +53,14 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
private static final DecimalFormat DEC = new DecimalFormat("0.000");
private static final DecimalFormat THROUGHPUT_FORMAT = new
DecimalFormat("0.00");
private final transient DoublesSketch ds;
+ private final transient DoublesSketch batchSizeDs;
+ private final transient DoublesSketch msgSizeDs;
private volatile double sendMsgsRate;
private volatile double sendBytesRate;
private volatile double[] latencyPctValues = new
double[PERCENTILES.length];
+ private volatile double[] batchSizePctValues = new
double[PERCENTILES.length];
+ private volatile double[] msgSizePctValues = new
double[PERCENTILES.length];
private static final double[] PERCENTILES = { 0.5, 0.75, 0.95, 0.99,
0.999, 1.0 };
@@ -70,6 +74,8 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
totalSendFailed = new LongAdder();
totalAcksReceived = new LongAdder();
ds = DoublesSketch.builder().build(256);
+ batchSizeDs = DoublesSketch.builder().build(256);
+ msgSizeDs = DoublesSketch.builder().build(256);
}
public ProducerStatsRecorderImpl(PulsarClientImpl pulsarClient,
ProducerConfigurationData conf,
@@ -86,6 +92,8 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
totalSendFailed = new LongAdder();
totalAcksReceived = new LongAdder();
ds = DoublesSketch.builder().build(256);
+ batchSizeDs = DoublesSketch.builder().build(256);
+ msgSizeDs = DoublesSketch.builder().build(256);
init(conf);
}
@@ -147,6 +155,16 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
ds.reset();
}
+ synchronized (batchSizeDs) {
+ batchSizePctValues = batchSizeDs.getQuantiles(PERCENTILES);
+ batchSizeDs.reset();
+ }
+
+ synchronized (msgSizeDs) {
+ msgSizePctValues = msgSizeDs.getQuantiles(PERCENTILES);
+ msgSizeDs.reset();
+ }
+
sendMsgsRate = currentNumMsgsSent / elapsed;
sendBytesRate = currentNumBytesSent / elapsed;
@@ -161,6 +179,9 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
log.info("[{}] [{}] Pending messages: {} --- Publish throughput:
{} msg/s --- {} Mbit/s --- "
+ "Latency: med: {} ms - 95pct: {} ms - 99pct: {}
ms - 99.9pct: {} ms - max: {} ms --- "
+ + "BatchSize: med: {} - 95pct: {} - 99pct: {} -
99.9pct: {} - max: {} --- "
+ + "MsgSize: med: {} bytes - 95pct: {} bytes -
99pct: {} bytes - 99.9pct: {} bytes "
+ + "- max: {} bytes --- "
+ "Ack received rate: {} ack/s --- Failed
messages: {}", producer.getTopic(),
producer.getProducerName(), producer.getPendingQueueSize(),
THROUGHPUT_FORMAT.format(sendMsgsRate),
@@ -168,6 +189,12 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
DEC.format(latencyPctValues[0]),
DEC.format(latencyPctValues[2]),
DEC.format(latencyPctValues[3]),
DEC.format(latencyPctValues[4]),
DEC.format(latencyPctValues[5]),
+ DEC.format(batchSizePctValues[0]),
DEC.format(batchSizePctValues[2]),
+ DEC.format(batchSizePctValues[3]),
DEC.format(batchSizePctValues[4]),
+ DEC.format(batchSizePctValues[5]),
+ DEC.format(msgSizePctValues[0]),
DEC.format(msgSizePctValues[2]),
+ DEC.format(msgSizePctValues[3]),
DEC.format(msgSizePctValues[4]),
+ DEC.format(msgSizePctValues[5]),
THROUGHPUT_FORMAT.format(currentNumAcksReceived /
elapsed), currentNumSendFailedMsgs);
}
}
@@ -176,6 +203,12 @@ public class ProducerStatsRecorderImpl implements
ProducerStatsRecorder {
public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
numMsgsSent.add(numMsgs);
numBytesSent.add(totalMsgsSize);
+ synchronized (batchSizeDs) {
+ batchSizeDs.update(numMsgs);
+ }
+ synchronized (msgSizeDs) {
+ msgSizeDs.update(totalMsgsSize);
+ }
}
@Override