This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 519dbe1eacd09ab98825920bbe5f7c6cf27e2c1a Author: Ruguo Yu <[email protected]> AuthorDate: Mon Sep 20 20:34:06 2021 +0800 [testclient] Call printAggregatedStats method before client exit (#11985) (cherry picked from commit 3c770a18a807498634124161f95bc4f0888d5315) --- .../proxy/socket/client/PerformanceClient.java | 32 ++++++++++++++++++++++ .../pulsar/testclient/ManagedLedgerWriter.java | 26 ++++++++++++++---- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java index 3902d5f..0e6f84b 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java @@ -67,6 +67,8 @@ public class PerformanceClient { static AtomicInteger msgSent = new AtomicInteger(0); private static final LongAdder messagesSent = new LongAdder(); private static final LongAdder bytesSent = new LongAdder(); + private static final LongAdder totalMessagesSent = new LongAdder(); + private static final LongAdder totalBytesSent = new LongAdder(); private JCommander jc; @Parameters(commandDescription = "Test pulsar websocket producer performance.") @@ -263,6 +265,8 @@ public class PerformanceClient { producersMap.get(topic).getSocket().sendMsg(String.valueOf(totalSent++), sizeOfMessage); messagesSent.increment(); bytesSent.add(sizeOfMessage); + totalMessagesSent.increment(); + totalBytesSent.add(sizeOfMessage); } } @@ -328,6 +332,11 @@ public class PerformanceClient { PerformanceClient test = new PerformanceClient(); Arguments arguments = test.loadArguments(args); PerfClientUtils.printJVMInformation(log); + long start = System.nanoTime(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + printAggregatedThroughput(start); + printAggregatedStats(); + })); test.runPerformanceTest(arguments.numMessages, arguments.msgRate, arguments.numTopics, arguments.msgSize, arguments.proxyURL, arguments.topics.get(0), arguments.authPluginClassName, arguments.authParams); } @@ -350,8 +359,31 @@ public class PerformanceClient { } + private static void printAggregatedThroughput(long start) { + double elapsed = (System.nanoTime() - start) / 1e9; + double rate = totalMessagesSent.sum() / elapsed; + double throughput = totalBytesSent.sum() / elapsed / 1024 / 1024 * 8; + log.info( + "Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s", + totalMessagesSent, + totalFormat.format(rate), + totalFormat.format(throughput)); + } + + private static void printAggregatedStats() { + Histogram reportHistogram = SimpleTestProducerSocket.recorder.getIntervalHistogram(); + + log.info( + "Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}", + dec.format(reportHistogram.getMean()), reportHistogram.getValueAtPercentile(50), + reportHistogram.getValueAtPercentile(95), reportHistogram.getValueAtPercentile(99), + reportHistogram.getValueAtPercentile(99.9), reportHistogram.getValueAtPercentile(99.99), + reportHistogram.getValueAtPercentile(99.999), reportHistogram.getMaxValue()); + } + static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8); static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7); + static final DecimalFormat totalFormat = new DecimalFormat("0.000"); private static final Logger log = LoggerFactory.getLogger(PerformanceClient.class); } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java index 8d4c8c2..244be81 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java @@ -72,6 +72,8 @@ public class ManagedLedgerWriter { private static final LongAdder messagesSent = new LongAdder(); private static final LongAdder bytesSent = new LongAdder(); + private static final LongAdder totalMessagesSent = new LongAdder(); + private static final LongAdder totalBytesSent = new LongAdder(); private static Recorder recorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5); private static Recorder cumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMillis(120000), 5); @@ -211,11 +213,11 @@ public class ManagedLedgerWriter { log.info("Created {} managed ledgers", managedLedgers.size()); - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { - printAggregatedStats(); - } - }); + long start = System.nanoTime(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + printAggregatedThroughput(start); + printAggregatedStats(); + })); Collections.shuffle(managedLedgers); AtomicBoolean isDone = new AtomicBoolean(); @@ -245,6 +247,8 @@ public class ManagedLedgerWriter { long sendTime = (Long) (ctx); messagesSent.increment(); bytesSent.add(payloadData.length); + totalMessagesSent.increment(); + totalBytesSent.add(payloadData.length); long latencyMicros = NANOSECONDS.toMicros(System.nanoTime() - sendTime); recorder.recordValue(latencyMicros); @@ -376,6 +380,17 @@ public class ManagedLedgerWriter { return map; } + private static void printAggregatedThroughput(long start) { + double elapsed = (System.nanoTime() - start) / 1e9; + double rate = totalMessagesSent.sum() / elapsed; + double throughput = totalBytesSent.sum() / elapsed / 1024 / 1024 * 8; + log.info( + "Aggregated throughput stats --- {} records sent --- {} msg/s --- {} Mbit/s", + totalMessagesSent, + totalFormat.format(rate), + totalFormat.format(throughput)); + } + private static void printAggregatedStats() { Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram(); @@ -393,5 +408,6 @@ public class ManagedLedgerWriter { static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8); static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7); + static final DecimalFormat totalFormat = new DecimalFormat("0.000"); private static final Logger log = LoggerFactory.getLogger(ManagedLedgerWriter.class); }
