This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 44a273d  Log producer batchSize and msgSize percentiles (#14229)
44a273d is described below

commit 44a273d5dbad913553ce9c9c02c2a072892b9564
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Tue Feb 15 22:30:46 2022 +0800

    Log producer batchSize and msgSize percentiles (#14229)
---
 .../client/impl/ProducerStatsRecorderImpl.java     | 33 ++++++++++++++++++++++
 1 file changed, 33 insertions(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index 989da06..29b8cf2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -53,10 +53,14 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
     private static final DecimalFormat DEC = new DecimalFormat("0.000");
     private static final DecimalFormat THROUGHPUT_FORMAT = new 
DecimalFormat("0.00");
     private final transient DoublesSketch ds;
+    private final transient DoublesSketch batchSizeDs;
+    private final transient DoublesSketch msgSizeDs;
 
     private volatile double sendMsgsRate;
     private volatile double sendBytesRate;
     private volatile double[] latencyPctValues = new 
double[PERCENTILES.length];
+    private volatile double[] batchSizePctValues = new 
double[PERCENTILES.length];
+    private volatile double[] msgSizePctValues = new 
double[PERCENTILES.length];
 
     private static final double[] PERCENTILES = { 0.5, 0.75, 0.95, 0.99, 
0.999, 1.0 };
 
@@ -70,6 +74,8 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
         totalSendFailed = new LongAdder();
         totalAcksReceived = new LongAdder();
         ds = DoublesSketch.builder().build(256);
+        batchSizeDs = DoublesSketch.builder().build(256);
+        msgSizeDs = DoublesSketch.builder().build(256);
     }
 
     public ProducerStatsRecorderImpl(PulsarClientImpl pulsarClient, 
ProducerConfigurationData conf,
@@ -86,6 +92,8 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
         totalSendFailed = new LongAdder();
         totalAcksReceived = new LongAdder();
         ds = DoublesSketch.builder().build(256);
+        batchSizeDs = DoublesSketch.builder().build(256);
+        msgSizeDs = DoublesSketch.builder().build(256);
         init(conf);
     }
 
@@ -147,6 +155,16 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
             ds.reset();
         }
 
+        synchronized (batchSizeDs) {
+            batchSizePctValues = batchSizeDs.getQuantiles(PERCENTILES);
+            batchSizeDs.reset();
+        }
+
+        synchronized (msgSizeDs) {
+            msgSizePctValues = msgSizeDs.getQuantiles(PERCENTILES);
+            msgSizeDs.reset();
+        }
+
         sendMsgsRate = currentNumMsgsSent / elapsed;
         sendBytesRate = currentNumBytesSent / elapsed;
 
@@ -161,6 +179,9 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
 
             log.info("[{}] [{}] Pending messages: {} --- Publish throughput: 
{} msg/s --- {} Mbit/s --- "
                             + "Latency: med: {} ms - 95pct: {} ms - 99pct: {} 
ms - 99.9pct: {} ms - max: {} ms --- "
+                            + "BatchSize: med: {} - 95pct: {} - 99pct: {} - 
99.9pct: {} - max: {} --- "
+                            + "MsgSize: med: {} bytes - 95pct: {} bytes - 
99pct: {} bytes - 99.9pct: {} bytes "
+                            + "- max: {} bytes --- "
                             + "Ack received rate: {} ack/s --- Failed 
messages: {}", producer.getTopic(),
                     producer.getProducerName(), producer.getPendingQueueSize(),
                     THROUGHPUT_FORMAT.format(sendMsgsRate),
@@ -168,6 +189,12 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
                     DEC.format(latencyPctValues[0]), 
DEC.format(latencyPctValues[2]),
                     DEC.format(latencyPctValues[3]), 
DEC.format(latencyPctValues[4]),
                     DEC.format(latencyPctValues[5]),
+                    DEC.format(batchSizePctValues[0]), 
DEC.format(batchSizePctValues[2]),
+                    DEC.format(batchSizePctValues[3]), 
DEC.format(batchSizePctValues[4]),
+                    DEC.format(batchSizePctValues[5]),
+                    DEC.format(msgSizePctValues[0]), 
DEC.format(msgSizePctValues[2]),
+                    DEC.format(msgSizePctValues[3]), 
DEC.format(msgSizePctValues[4]),
+                    DEC.format(msgSizePctValues[5]),
                     THROUGHPUT_FORMAT.format(currentNumAcksReceived / 
elapsed), currentNumSendFailedMsgs);
         }
     }
@@ -176,6 +203,12 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
     public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
         numMsgsSent.add(numMsgs);
         numBytesSent.add(totalMsgsSize);
+        synchronized (batchSizeDs) {
+            batchSizeDs.update(numMsgs);
+        }
+        synchronized (msgSizeDs) {
+            msgSizeDs.update(totalMsgsSize);
+        }
     }
 
     @Override

Reply via email to