This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 519dbe1eacd09ab98825920bbe5f7c6cf27e2c1a
Author: Ruguo Yu <[email protected]>
AuthorDate: Mon Sep 20 20:34:06 2021 +0800

    [testclient] Call printAggregatedStats method before client exit (#11985)
    
    
    (cherry picked from commit 3c770a18a807498634124161f95bc4f0888d5315)
---
 .../proxy/socket/client/PerformanceClient.java     | 32 ++++++++++++++++++++++
 .../pulsar/testclient/ManagedLedgerWriter.java     | 26 ++++++++++++++----
 2 files changed, 53 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
index 3902d5f..0e6f84b 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java
@@ -67,6 +67,8 @@ public class PerformanceClient {
     static AtomicInteger msgSent = new AtomicInteger(0);
     private static final LongAdder messagesSent = new LongAdder();
     private static final LongAdder bytesSent = new LongAdder();
+    private static final LongAdder totalMessagesSent = new LongAdder();
+    private static final LongAdder totalBytesSent = new LongAdder();
     private JCommander jc;
 
     @Parameters(commandDescription = "Test pulsar websocket producer 
performance.")
@@ -263,6 +265,8 @@ public class PerformanceClient {
                         
producersMap.get(topic).getSocket().sendMsg(String.valueOf(totalSent++), 
sizeOfMessage);
                         messagesSent.increment();
                         bytesSent.add(sizeOfMessage);
+                        totalMessagesSent.increment();
+                        totalBytesSent.add(sizeOfMessage);
                     }
                 }
 
@@ -328,6 +332,11 @@ public class PerformanceClient {
         PerformanceClient test = new PerformanceClient();
         Arguments arguments = test.loadArguments(args);
         PerfClientUtils.printJVMInformation(log);
+        long start = System.nanoTime();
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            printAggregatedThroughput(start);
+            printAggregatedStats();
+        }));
         test.runPerformanceTest(arguments.numMessages, arguments.msgRate, 
arguments.numTopics, arguments.msgSize,
                 arguments.proxyURL, arguments.topics.get(0), 
arguments.authPluginClassName, arguments.authParams);
     }
@@ -350,8 +359,31 @@ public class PerformanceClient {
 
     }
 
+    private static void printAggregatedThroughput(long start) {
+        double elapsed = (System.nanoTime() - start) / 1e9;
+        double rate = totalMessagesSent.sum() / elapsed;
+        double throughput = totalBytesSent.sum() / elapsed / 1024 / 1024 * 8;
+        log.info(
+                "Aggregated throughput stats --- {} records sent --- {} msg/s 
--- {} Mbit/s",
+                totalMessagesSent,
+                totalFormat.format(rate),
+                totalFormat.format(throughput));
+    }
+
+    private static void printAggregatedStats() {
+        Histogram reportHistogram = 
SimpleTestProducerSocket.recorder.getIntervalHistogram();
+
+        log.info(
+                "Aggregated latency stats --- Latency: mean: {} ms - med: {} - 
95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
+                dec.format(reportHistogram.getMean()), 
reportHistogram.getValueAtPercentile(50),
+                reportHistogram.getValueAtPercentile(95), 
reportHistogram.getValueAtPercentile(99),
+                reportHistogram.getValueAtPercentile(99.9), 
reportHistogram.getValueAtPercentile(99.99),
+                reportHistogram.getValueAtPercentile(99.999), 
reportHistogram.getMaxValue());
+    }
+
     static final DecimalFormat throughputFormat = new 
PaddingDecimalFormat("0.0", 8);
     static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+    static final DecimalFormat totalFormat = new DecimalFormat("0.000");
     private static final Logger log = 
LoggerFactory.getLogger(PerformanceClient.class);
 
 }
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 8d4c8c2..244be81 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -72,6 +72,8 @@ public class ManagedLedgerWriter {
 
     private static final LongAdder messagesSent = new LongAdder();
     private static final LongAdder bytesSent = new LongAdder();
+    private static final LongAdder totalMessagesSent = new LongAdder();
+    private static final LongAdder totalBytesSent = new LongAdder();
 
     private static Recorder recorder = new 
Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
     private static Recorder cumulativeRecorder = new 
Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
@@ -211,11 +213,11 @@ public class ManagedLedgerWriter {
 
         log.info("Created {} managed ledgers", managedLedgers.size());
 
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            public void run() {
-                printAggregatedStats();
-            }
-        });
+        long start = System.nanoTime();
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            printAggregatedThroughput(start);
+            printAggregatedStats();
+        }));
 
         Collections.shuffle(managedLedgers);
         AtomicBoolean isDone = new AtomicBoolean();
@@ -245,6 +247,8 @@ public class ManagedLedgerWriter {
                             long sendTime = (Long) (ctx);
                             messagesSent.increment();
                             bytesSent.add(payloadData.length);
+                            totalMessagesSent.increment();
+                            totalBytesSent.add(payloadData.length);
 
                             long latencyMicros = 
NANOSECONDS.toMicros(System.nanoTime() - sendTime);
                             recorder.recordValue(latencyMicros);
@@ -376,6 +380,17 @@ public class ManagedLedgerWriter {
         return map;
     }
 
+    private static void printAggregatedThroughput(long start) {
+        double elapsed = (System.nanoTime() - start) / 1e9;
+        double rate = totalMessagesSent.sum() / elapsed;
+        double throughput = totalBytesSent.sum() / elapsed / 1024 / 1024 * 8;
+        log.info(
+                "Aggregated throughput stats --- {} records sent --- {} msg/s 
--- {} Mbit/s",
+                totalMessagesSent,
+                totalFormat.format(rate),
+                totalFormat.format(throughput));
+    }
+
     private static void printAggregatedStats() {
         Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram();
 
@@ -393,5 +408,6 @@ public class ManagedLedgerWriter {
 
     static final DecimalFormat throughputFormat = new 
PaddingDecimalFormat("0.0", 8);
     static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+    static final DecimalFormat totalFormat = new DecimalFormat("0.000");
     private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerWriter.class);
 }

Reply via email to