This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e581d95  Added ManagedLedger perf tool (#1270)
e581d95 is described below

commit e581d95b5e260bef7c0bda697ded0c2eb5211a52
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Fri Feb 23 09:35:06 2018 -0800

    Added ManagedLedger perf tool (#1270)
---
 bin/pulsar-perf                                    |  15 +-
 .../pulsar/testclient/ManagedLedgerWriter.java     | 332 +++++++++++++++++++++
 2 files changed, 341 insertions(+), 6 deletions(-)

diff --git a/bin/pulsar-perf b/bin/pulsar-perf
index b128922..287012b 100755
--- a/bin/pulsar-perf
+++ b/bin/pulsar-perf
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -79,10 +79,13 @@ where command is one of:
     produce                 Run a producer
     consume                 Run a consumer
     read                    Run a topic reader
+
+    websocket-producer      Run a websocket producer
+
+    managed-ledger          Write directly on managed-ledgers
     monitor-brokers         Continuously receive broker data and/or load 
reports
     simulation-client       Run a simulation server acting as a Pulsar client
     simulation-controller   Run a simulation controller to give commands to 
servers
-    websocket-producer      Run a websocket producer
 
     help                           This help message
 
@@ -156,8 +159,8 @@ elif [ "$COMMAND" == "simulation-controller" ]; then
     exec $JAVA $OPTS org.apache.pulsar.testclient.LoadSimulationController "$@"
 elif [ "$COMMAND" == "websocket-producer" ]; then
     exec $JAVA $OPTS org.apache.pulsar.proxy.socket.client.PerformanceClient 
"$@"
-elif [ "$COMMAND" == "help" ]; then
-    pulsar_help;
+elif [ "$COMMAND" == "managed-ledger" ]; then
+    exec $JAVA $OPTS org.apache.pulsar.testclient.ManagedLedgerWriter "$@"
 else
-    exec $JAVA $OPTS $COMMAND $PULSAR_PERFTEST_CONF "$@"
+    pulsar_help;
 fi
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
new file mode 100644
index 0000000..0c8ad56
--- /dev/null
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.Recorder;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.util.concurrent.RateLimiter;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+public class ManagedLedgerWriter {
+
+    private static final ExecutorService executor = Executors
+            .newCachedThreadPool(new 
DefaultThreadFactory("pulsar-perf-managed-ledger-exec"));
+
+    private static final LongAdder messagesSent = new LongAdder();
+    private static final LongAdder bytesSent = new LongAdder();
+
+    private static Recorder recorder = new 
Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
+    private static Recorder cumulativeRecorder = new 
Recorder(TimeUnit.SECONDS.toMillis(120000), 5);
+
+    static class Arguments {
+
+        @Parameter(names = { "-h", "--help" }, description = "Help message", 
help = true)
+        boolean help;
+
+        @Parameter(names = { "-r", "--rate" }, description = "Write rate msg/s 
across managed ledgers")
+        public int msgRate = 100;
+
+        @Parameter(names = { "-s", "--size" }, description = "Message size")
+        public int msgSize = 1024;
+
+        @Parameter(names = { "-t", "--num-topic" }, description = "Number of 
managed ledgers")
+        public int numManagedLedgers = 1;
+
+        @Parameter(names = { "--threads" }, description = "Number of threads 
writing")
+        public int numThreads = 1;
+
+        @Parameter(names = { "-zk", "--zookeeperServers" }, description = 
"ZooKeeper connection string")
+        public String zookeeperServers;
+
+        @Parameter(names = { "-o", "--max-outstanding" }, description = "Max 
number of outstanding requests")
+        public int maxOutstanding = 1000;
+
+        @Parameter(names = { "-c",
+                "--max-connections" }, description = "Max number of TCP 
connections to a single bookie")
+        public int maxConnections = 1;
+
+        @Parameter(names = { "-m",
+                "--num-messages" }, description = "Number of messages to 
publish in total. If 0, it will keep publishing")
+        public long numMessages = 0;
+
+        @Parameter(names = { "-e", "--ensemble-size" }, description = "Ledger 
ensemble size")
+        public int ensembleSize = 1;
+
+        @Parameter(names = { "-w", "--write-quorum" }, description = "Ledger 
write quorum")
+        public int writeQuorum = 1;
+
+        @Parameter(names = { "-a", "--ack-quorum" }, description = "Ledger ack 
quorum")
+        public int ackQuorum = 1;
+
+        @Parameter(names = { "-time",
+                "--test-duration" }, description = "Test duration in secs. If 
0, it will keep publishing")
+        public long testTime = 0;
+
+    }
+
+    public static void main(String[] args) throws Exception {
+
+        final Arguments arguments = new Arguments();
+        JCommander jc = new JCommander(arguments);
+        jc.setProgramName("pulsar-perf-producer");
+
+        try {
+            jc.parse(args);
+        } catch (ParameterException e) {
+            System.out.println(e.getMessage());
+            jc.usage();
+            System.exit(-1);
+        }
+
+        if (arguments.help) {
+            jc.usage();
+            System.exit(-1);
+        }
+
+        arguments.testTime = TimeUnit.SECONDS.toMillis(arguments.testTime);
+
+        // Dump config variables
+        ObjectMapper m = new ObjectMapper();
+        ObjectWriter w = m.writerWithDefaultPrettyPrinter();
+        log.info("Starting Pulsar managed-ledger perf writer with config: {}", 
w.writeValueAsString(arguments));
+
+        byte[] payloadData = new byte[arguments.msgSize];
+        ByteBuf payloadBuffer = Unpooled.directBuffer(arguments.msgSize);
+        payloadBuffer.writerIndex(arguments.msgSize);
+
+        // Now processing command line arguments
+        String managedLedgerPrefix = "test-" + 
DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 5);
+
+        ClientConfiguration bkConf = new ClientConfiguration();
+        bkConf.setUseV2WireProtocol(true);
+        
bkConf.setLedgerManagerFactoryClass(HierarchicalLedgerManagerFactory.class);
+        bkConf.setAddEntryTimeout(30);
+        bkConf.setReadEntryTimeout(30);
+        bkConf.setThrottleValue(0);
+        bkConf.setNumChannelsPerBookie(arguments.maxConnections);
+        bkConf.setZkServers(arguments.zookeeperServers);
+
+        ManagedLedgerFactoryConfig mlFactoryConf = new 
ManagedLedgerFactoryConfig();
+        mlFactoryConf.setMaxCacheSize(0);
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkConf, 
mlFactoryConf);
+
+        ManagedLedgerConfig mlConf = new ManagedLedgerConfig();
+        mlConf.setEnsembleSize(arguments.ensembleSize);
+        mlConf.setWriteQuorumSize(arguments.writeQuorum);
+        mlConf.setAckQuorumSize(arguments.ackQuorum);
+        mlConf.setMinimumRolloverTime(10, TimeUnit.MINUTES);
+        mlConf.setMetadataEnsembleSize(arguments.ensembleSize);
+        mlConf.setMetadataWriteQuorumSize(arguments.writeQuorum);
+        mlConf.setMetadataAckQuorumSize(arguments.ackQuorum);
+        mlConf.setDigestType(DigestType.CRC32);
+        mlConf.setMaxSizePerLedgerMb(2048);
+
+        List<CompletableFuture<ManagedLedger>> futures = new ArrayList<>();
+
+        for (int i = 0; i < arguments.numManagedLedgers; i++) {
+            String name = String.format("%s-%03d", managedLedgerPrefix, i);
+            CompletableFuture<ManagedLedger> future = new 
CompletableFuture<>();
+            futures.add(future);
+            factory.asyncOpen(name, mlConf, new OpenLedgerCallback() {
+
+                @Override
+                public void openLedgerComplete(ManagedLedger ledger, Object 
ctx) {
+                    future.complete(ledger);
+                }
+
+                @Override
+                public void openLedgerFailed(ManagedLedgerException exception, 
Object ctx) {
+                    future.completeExceptionally(exception);
+                }
+            }, null);
+        }
+
+        List<ManagedLedger> managedLedgers = 
futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
+
+        log.info("Created {} managed ledgers", managedLedgers.size());
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            public void run() {
+                printAggregatedStats();
+            }
+        });
+
+        Collections.shuffle(managedLedgers);
+        AtomicBoolean isDone = new AtomicBoolean();
+
+        RateLimiter rateLimiter = RateLimiter.create(arguments.msgRate);
+
+        executor.submit(() -> {
+            try {
+
+                long startTime = System.currentTimeMillis();
+
+                // Send messages on all topics/producers
+                long totalSent = 0;
+                while (true) {
+                    for (int i = 0; i < arguments.numManagedLedgers; i++) {
+                        if (arguments.testTime > 0) {
+                            if (System.currentTimeMillis() - startTime > 
arguments.testTime) {
+                                log.info("------------------- DONE 
-----------------------");
+                                printAggregatedStats();
+                                isDone.set(true);
+                                Thread.sleep(5000);
+                                System.exit(0);
+                            }
+                        }
+
+                        if (arguments.numMessages > 0) {
+                            if (totalSent++ >= arguments.numMessages) {
+                                log.info("------------------- DONE 
-----------------------");
+                                printAggregatedStats();
+                                isDone.set(true);
+                                Thread.sleep(5000);
+                                System.exit(0);
+                            }
+                        }
+                        rateLimiter.acquire();
+
+                        final long sendTime = System.nanoTime();
+
+                        managedLedgers.get(i).asyncAddEntry(payloadBuffer, new 
AddEntryCallback() {
+                            @Override
+                            public void addComplete(Position position, Object 
ctx) {
+                                messagesSent.increment();
+                                bytesSent.add(payloadData.length);
+
+                                long latencyMicros = 
NANOSECONDS.toMicros(System.nanoTime() - sendTime);
+                                recorder.recordValue(latencyMicros);
+                                cumulativeRecorder.recordValue(latencyMicros);
+                            }
+
+                            @Override
+                            public void addFailed(ManagedLedgerException 
exception, Object ctx) {
+                                log.warn("Write error on message", exception);
+                                System.exit(-1);
+                            }
+                        }, null);
+                    }
+                }
+            } catch (Throwable t) {
+                log.error("Got error", t);
+            }
+        });
+
+        // Print report stats
+        long oldTime = System.nanoTime();
+
+        Histogram reportHistogram = null;
+
+        while (true) {
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException e) {
+                break;
+            }
+
+            if (isDone.get()) {
+                break;
+            }
+
+            long now = System.nanoTime();
+            double elapsed = (now - oldTime) / 1e9;
+
+            double rate = messagesSent.sumThenReset() / elapsed;
+            double throughput = bytesSent.sumThenReset() / elapsed / 1024 / 
1024 * 8;
+
+            reportHistogram = recorder.getIntervalHistogram(reportHistogram);
+
+            log.info(
+                    "Throughput produced: {}  msg/s --- {} Mbit/s --- Latency: 
mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - 
Max: {}",
+                    throughputFormat.format(rate), 
throughputFormat.format(throughput),
+                    dec.format(reportHistogram.getMean() / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(50) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(95) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.9) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.99) / 
1000.0),
+                    dec.format(reportHistogram.getMaxValue() / 1000.0));
+
+            reportHistogram.reset();
+
+            oldTime = now;
+        }
+
+        factory.shutdown();
+    }
+
+    private static void printAggregatedStats() {
+        Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram();
+
+        log.info(
+                "Aggregated latency stats --- Latency: mean: {} ms - med: {} - 
95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
+                dec.format(reportHistogram.getMean() / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.9) / 
1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.99) / 
1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.999) / 
1000.0),
+                dec.format(reportHistogram.getMaxValue() / 1000.0));
+    }
+
+    static final DecimalFormat throughputFormat = new 
PaddingDecimalFormat("0.0", 8);
+    static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+    private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerWriter.class);
+}

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to