sijie closed pull request #1697: [tools] Add a perf tool for benchmarking 
bookkeeper
URL: https://github.com/apache/bookkeeper/pull/1697
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/bkperf b/bin/bkperf
new file mode 100755
index 0000000000..6ec2909989
--- /dev/null
+++ b/bin/bkperf
@@ -0,0 +1,66 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+# BookKeeper Perf Tool (experimental)
+
+BINDIR=`dirname "$0"`
+BK_HOME=`cd ${BINDIR}/..;pwd`
+
+source ${BK_HOME}/bin/common.sh
+source ${BK_HOME}/conf/bk_cli_env.sh
+
+CLI_MODULE_PATH=tools/perf
+CLI_MODULE_NAME="(org.apache.bookkeeper-)?bookkeeper-perf"
+CLI_MODULE_HOME=${BK_HOME}/${CLI_MODULE_PATH}
+
+# find the module jar
+CLI_JAR=$(find_module_jar ${CLI_MODULE_PATH} ${CLI_MODULE_NAME})
+
+# set up the classpath
+CLI_CLASSPATH=$(set_module_classpath ${CLI_MODULE_PATH})
+
+DEFAULT_CONF=${BK_HOME}/conf/bk_server.conf
+if [ -z "${CLI_CONF}" ]; then
+  CLI_CONF=${DEFAULT_CONF}
+fi
+
+DEFAULT_LOG_CONF=${BK_HOME}/conf/log4j.cli.properties
+if [ -z "${CLI_LOG_CONF}" ]; then
+  CLI_LOG_CONF=${DEFAULT_LOG_CONF}
+fi
+CLI_LOG_DIR=${CLI_LOG_DIR:-"$BK_HOME/logs"}
+CLI_LOG_FILE=${CLI_LOG_FILE:-"bkperf.log"}
+CLI_ROOT_LOGGER=${CLI_ROOT_LOGGER:-"INFO,ROLLINGFILE"}
+
+# Configure the classpath
+CLI_CLASSPATH="$CLI_JAR:$CLI_CLASSPATH:$CLI_EXTRA_CLASSPATH"
+CLI_CLASSPATH="`dirname $CLI_LOG_CONF`:$CLI_CLASSPATH"
+
+# Build the OPTs
+BOOKIE_OPTS=$(build_bookie_opts)
+GC_OPTS=$(build_cli_jvm_opts ${CLI_LOG_DIR} "bkperf-gc.log")
+NETTY_OPTS=$(build_netty_opts)
+LOGGING_OPTS=$(build_cli_logging_opts ${CLI_LOG_CONF} ${CLI_LOG_DIR} 
${CLI_LOG_FILE} ${CLI_ROOT_LOGGER})
+
+OPTS="${OPTS} -cp ${CLI_CLASSPATH} ${BOOKIE_OPTS} ${GC_OPTS} ${NETTY_OPTS} 
${LOGGING_OPTS} ${CLI_EXTRA_OPTS}"
+
+#Change to BK_HOME to support relative paths
+cd "$BK_HOME"
+exec ${JAVA} ${OPTS} org.apache.bookkeeper.tools.perf.BKPerf --conf 
${CLI_CONF} $@
diff --git a/conf/log4j.cli.properties b/conf/log4j.cli.properties
index 51c95f58c8..ceb77cc934 100644
--- a/conf/log4j.cli.properties
+++ b/conf/log4j.cli.properties
@@ -55,5 +55,6 @@ 
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{
 log4j.logger.verbose=INFO,VERBOSECONSOLE
 log4j.logger.org.apache.zookeeper=ERROR
 log4j.logger.org.apache.bookkeeper=ERROR
+log4j.logger.org.apache.bookkeeper.tools=INFO
 log4j.logger.org.apache.bookkeeper.bookie.BookieShell=INFO
 log4j.logger.org.apache.bookkeeper.client.BookKeeperAdmin=INFO
diff --git a/pom.xml b/pom.xml
index e5a3ffde00..557025965f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,6 +130,7 @@
     <guava.version>21.0</guava.version>
     <hadoop.version>2.7.3</hadoop.version>
     <hamcrest.version>1.3</hamcrest.version>
+    <hdrhistogram.version>2.1.10</hdrhistogram.version>
     <jackson.version>2.8.9</jackson.version>
     <jackson-mapper-asl.version>1.9.11</jackson-mapper-asl.version>
     <jcommander.version>1.48</jcommander.version>
@@ -543,6 +544,13 @@
         <version>${jcommander.version}</version>
       </dependency>
 
+      <!-- pref dependencies -->
+      <dependency>
+        <groupId>org.hdrhistogram</groupId>
+        <artifactId>HdrHistogram</artifactId>
+        <version>${hdrhistogram.version}</version>
+      </dependency>
+
       <!-- test dependencies -->
       <dependency>
         <groupId>junit</groupId>
diff --git a/tools/perf/README.md b/tools/perf/README.md
new file mode 100644
index 0000000000..945d80c81b
--- /dev/null
+++ b/tools/perf/README.md
@@ -0,0 +1,100 @@
+## BookKeeper Perf Tool
+
+### Dlog
+
+```shell
+$ bin/bkperf dlog
+Commands on evaluating performance of distributedlog library
+
+Usage:  bkperf dlog [command] [command options]
+
+Commands:
+
+    read        Read log records to distributedlog streams
+    write       Write log records to distributedlog streams
+
+    help        Display help information about it
+```
+
+#### Write records to logs
+
+```shell
+$ bin/bkperf dlog write -h
+Write log records to distributedlog streams
+
+Usage:  bkperf dlog write [flags]
+
+Flags:
+
+    -a, --ack-quorum-size
+        Ledger ack quorum size
+
+    -e, --ensemble-size
+        Ledger ensemble size
+
+    -ln, --log-name
+        Log name or log name pattern if more than 1 log is specified at
+        `--num-logs`
+
+    -b, --num-bytes
+        Number of bytes to write in total. If 0, it will keep writing
+
+    -l, --num-logs
+        Number of log streams
+
+    -n, --num-records
+        Number of records to write in total. If 0, it will keep writing
+
+    -r, --rate
+        Write rate bytes/s across log streams
+
+    -rs, --record-size
+        Log record size
+
+    --threads
+        Number of threads writing
+
+    -w, --write-quorum-size
+        Ledger write quorum size
+
+
+    -h, --help
+        Display help information
+```
+
+Example: write to log stream `test-log` at `100mb/second`, using 1-bookie 
ensemble.
+
+```shell
+$ bin/bkperf dlog write -w 1 -a 1 -e 1 -r 104857600 --log-name test-log
+```
+
+### Read records from logs
+
+```shell
+$ bin/bkperf dlog read -h
+Read log records from distributedlog streams
+
+Usage:  bkperf dlog read [flags]
+
+Flags:
+
+    -ln, --log-name
+        Log name or log name pattern if more than 1 log is specified at
+        `--num-logs`
+
+    -l, --num-logs
+        Number of log streams
+
+    --threads
+        Number of threads reading
+
+
+    -h, --help
+        Display help information
+```
+
+Example: read from log stream `test-log-000000`.
+
+```shell
+$ bin/bkperf dlog read --log-name test-log-000000
+```
diff --git a/tools/perf/pom.xml b/tools/perf/pom.xml
new file mode 100644
index 0000000000..d3a617f60b
--- /dev/null
+++ b/tools/perf/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd"; 
xmlns="http://maven.apache.org/POM/4.0.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.bookkeeper</groupId>
+    <artifactId>bookkeeper-tools-parent</artifactId>
+    <version>4.9.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>bookkeeper-perf</artifactId>
+  <name>Apache BookKeeper :: Tools :: Perf</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-tools-framework</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.distributedlog</groupId>
+      <artifactId>distributedlog-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.hdrhistogram</groupId>
+      <artifactId>HdrHistogram</artifactId>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java
new file mode 100644
index 0000000000..23021ee930
--- /dev/null
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.Cli;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+
+/**
+ * <b>bkperf</b> evaluates the performance of <i>Apache BookKeeper</i> cluster.
+ */
+@Slf4j
+public class BKPerf {
+
+    public static final String NAME = "bkperf";
+
+    @SuppressWarnings("unchecked")
+    public static void main(String[] args) {
+        CliSpec.Builder<BKFlags> specBuilder = CliSpec.<BKFlags>newBuilder()
+            .withName(NAME)
+            .withUsage(NAME + " [flags] [command group] [commands]")
+            .withDescription(NAME + " evaluates the performance of Apache 
BookKeeper clusters")
+            .withFlags(new BKFlags())
+            .withConsole(System.out)
+            .addCommand(new DlogPerfCommandGroup());
+
+        CliSpec<BKFlags> spec = specBuilder.build();
+
+        int retCode = Cli.runCli(spec, args);
+        Runtime.getRuntime().exit(retCode);
+    }
+
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
new file mode 100644
index 0000000000..75adc4b292
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/DlogPerfCommandGroup.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf;
+
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliCommandGroup;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.tools.perf.dlog.ReadCommand;
+import org.apache.bookkeeper.tools.perf.dlog.WriteCommand;
+
+/**
+ * Commands that evaluate performance of distributedlog library.
+ */
+public class DlogPerfCommandGroup extends CliCommandGroup<BKFlags> implements 
PerfCommandGroup<BKFlags> {
+
+    private static final String NAME = "dlog";
+    private static final String DESC = "Commands on evaluating performance of 
distributedlog library";
+
+    private static final CliSpec<BKFlags> spec = CliSpec.<BKFlags>newBuilder()
+        .withName(NAME)
+        .withDescription(DESC)
+        .withParent(BKPerf.NAME)
+        .addCommand(new WriteCommand())
+        .addCommand(new ReadCommand())
+        .build();
+
+    public DlogPerfCommandGroup() {
+        super(spec);
+    }
+
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/PerfCommandGroup.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/PerfCommandGroup.java
new file mode 100644
index 0000000000..c64d77f2d8
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/PerfCommandGroup.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf;
+
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CommandGroup;
+
+/**
+ * A command group that group commands together for performance evaluations.
+ */
+public interface PerfCommandGroup<GlobalFlagsT extends CliFlags>
+    extends CommandGroup<GlobalFlagsT> {
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
new file mode 100644
index 0000000000..7497d7df3b
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfReader.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import com.beust.jcommander.Parameter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.Recorder;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+
+/**
+ * A perf writer to evaluate write performance.
+ */
+@Slf4j
+public class PerfReader implements Runnable {
+
+    /**
+     * Flags for the write command.
+     */
+    public static class Flags extends CliFlags {
+
+        @Parameter(
+            names = {
+                "-ln", "--log-name"
+            },
+            description = "Log name or log name pattern if more than 1 log is 
specified at `--num-logs`")
+        public String logName = "test-log-%06d";
+
+        @Parameter(
+            names = {
+                "-l", "--num-logs"
+            },
+            description = "Number of log streams")
+        public int numLogs = 1;
+
+        @Parameter(
+            names = {
+                "-t", "--threads"
+            },
+            description = "Number of threads reading")
+        public int numThreads = 1;
+
+        @Parameter(
+            names = {
+                "-mr", "--max-readahead-records"
+            },
+            description = "Max readhead records")
+        public int maxReadAheadRecords = 1000000;
+
+        @Parameter(
+            names = {
+                "-bs", "--readahead-batch-size"
+            },
+            description = "ReadAhead Batch Size, in entries"
+        )
+        public int readAheadBatchSize = 4;
+
+    }
+
+
+    // stats
+    private final LongAdder recordsRead = new LongAdder();
+    private final LongAdder bytesRead = new LongAdder();
+
+    private final ServiceURI serviceURI;
+    private final Flags flags;
+    private final Recorder recorder = new Recorder(
+        TimeUnit.SECONDS.toMillis(120000), 5
+    );
+    private final Recorder cumulativeRecorder = new Recorder(
+        TimeUnit.SECONDS.toMillis(120000), 5
+    );
+    private final AtomicBoolean isDone = new AtomicBoolean(false);
+
+    PerfReader(ServiceURI serviceURI, Flags flags) {
+        this.serviceURI = serviceURI;
+        this.flags = flags;
+    }
+
+    @Override
+    public void run() {
+        try {
+            execute();
+        } catch (Exception e) {
+            log.error("Encountered exception at running dlog perf writer", e);
+        }
+    }
+
+    void execute() throws Exception {
+        ObjectMapper m = new ObjectMapper();
+        ObjectWriter w = m.writerWithDefaultPrettyPrinter();
+        log.info("Starting dlog perf reader with config : {}", 
w.writeValueAsString(flags));
+
+        DistributedLogConfiguration conf = newDlogConf(flags);
+        try (Namespace namespace = NamespaceBuilder.newBuilder()
+             .conf(conf)
+             .uri(serviceURI.getUri())
+             .build()) {
+            execute(namespace);
+        }
+    }
+
+    void execute(Namespace namespace) throws Exception {
+        List<Pair<Integer, DistributedLogManager>> managers = new 
ArrayList<>(flags.numLogs);
+        for (int i = 0; i < flags.numLogs; i++) {
+            String logName = String.format(flags.logName, i);
+            managers.add(Pair.of(i, namespace.openLog(logName)));
+        }
+        log.info("Successfully open {} logs", managers.size());
+
+        // register shutdown hook to aggregate stats
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            isDone.set(true);
+            printAggregatedStats(cumulativeRecorder);
+        }));
+
+        ExecutorService executor = 
Executors.newFixedThreadPool(flags.numThreads);
+        try {
+            for (int i = 0; i < flags.numThreads; i++) {
+                final int idx = i;
+                final List<DistributedLogManager> logsThisThread = managers
+                    .stream()
+                    .filter(pair -> pair.getLeft() % flags.numThreads == idx)
+                    .map(pair -> pair.getRight())
+                    .collect(Collectors.toList());
+                executor.submit(() -> {
+                    try {
+                        read(logsThisThread);
+                    } catch (Exception e) {
+                        log.error("Encountered error at writing records", e);
+                    }
+                });
+            }
+            log.info("Started {} write threads", flags.numThreads);
+            reportStats();
+        } finally {
+            executor.shutdown();
+            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+                executor.shutdownNow();
+            }
+            managers.forEach(manager -> manager.getRight().asyncClose());
+        }
+    }
+
+    void read(List<DistributedLogManager> logs) throws Exception {
+        log.info("Read thread started with : logs = {}",
+            logs.stream().map(l -> 
l.getStreamName()).collect(Collectors.toList()));
+
+        List<LogReader> readers = logs.stream()
+            .map(manager -> {
+                try {
+                    return manager.openLogReader(DLSN.InitialDLSN);
+                } catch (IOException e) {
+                    log.error("Failed to open reader for log stream {}", 
manager.getStreamName(), e);
+                    throw new UncheckedIOException(e);
+                }
+            })
+            .collect(Collectors.toList());
+
+        final int numLogs = logs.size();
+        while (true) {
+            for (int i = 0; i < numLogs; i++) {
+                LogRecordWithDLSN record = readers.get(i).readNext(true);
+                if (null != record) {
+                    recordsRead.increment();
+                    bytesRead.add(record.getPayloadBuf().readableBytes());
+                }
+            }
+        }
+    }
+
+    void reportStats() {
+        // Print report stats
+        long oldTime = System.nanoTime();
+
+        Histogram reportHistogram = null;
+
+        while (true) {
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException e) {
+                break;
+            }
+
+            if (isDone.get()) {
+                break;
+            }
+
+            long now = System.nanoTime();
+            double elapsed = (now - oldTime) / 1e9;
+
+            double rate = recordsRead.sumThenReset() / elapsed;
+            double throughput = bytesRead.sumThenReset() / elapsed / 1024 / 
1024;
+
+            reportHistogram = recorder.getIntervalHistogram(reportHistogram);
+
+            log.info("Throughput read : {}  records/s --- {} MB/s --- Latency: 
mean:"
+                        + " {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: 
{} - 99.99pct: {} - Max: {}",
+                    throughputFormat.format(rate), 
throughputFormat.format(throughput),
+                    dec.format(reportHistogram.getMean() / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(50) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(95) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.9) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.99) / 
1000.0),
+                    dec.format(reportHistogram.getMaxValue() / 1000.0));
+
+            reportHistogram.reset();
+
+            oldTime = now;
+        }
+
+    }
+
+    private static DistributedLogConfiguration newDlogConf(Flags flags) {
+        return new DistributedLogConfiguration()
+            .setReadAheadBatchSize(flags.readAheadBatchSize)
+            .setReadAheadMaxRecords(flags.maxReadAheadRecords)
+            .setReadAheadWaitTime(200);
+    }
+
+
+    private static final DecimalFormat throughputFormat = new 
PaddingDecimalFormat("0.0", 8);
+    private static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 
7);
+
+    private static void printAggregatedStats(Recorder recorder) {
+        Histogram reportHistogram = recorder.getIntervalHistogram();
+
+        log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} 
- 95pct: {} - 99pct: {}"
+                + " - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
+                dec.format(reportHistogram.getMean() / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.9) / 
1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.99) / 
1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.999) / 
1000.0),
+                dec.format(reportHistogram.getMaxValue() / 1000.0));
+    }
+
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
new file mode 100644
index 0000000000..e29dc6441b
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/PerfWriter.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+
+import com.beust.jcommander.Parameter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.util.concurrent.RateLimiter;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.netty.buffer.Unpooled;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.Recorder;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+
+/**
+ * A perf writer to evaluate write performance.
+ */
+@Slf4j
+public class PerfWriter implements Runnable {
+
+    /**
+     * Flags for the write command.
+     */
+    public static class Flags extends CliFlags {
+
+        @Parameter(
+            names = {
+                "-r", "--rate"
+            },
+            description = "Write rate bytes/s across log streams")
+        public int writeRate = 0;
+
+        @Parameter(
+            names = {
+                "-rs", "--record-size"
+            },
+            description = "Log record size")
+        public int recordSize = 1024;
+
+        @Parameter(
+            names = {
+                "-ln", "--log-name"
+            },
+            description = "Log name or log name pattern if more than 1 log is 
specified at `--num-logs`")
+        public String logName = "test-log-%06d";
+
+        @Parameter(
+            names = {
+                "-l", "--num-logs"
+            },
+            description = "Number of log streams")
+        public int numLogs = 1;
+
+        @Parameter(
+            names = {
+                "-t", "--threads"
+            },
+            description = "Number of threads writing")
+        public int numThreads = 1;
+
+        @Parameter(
+            names = {
+                "-mob", "--max-outstanding-megabytes"
+            },
+            description = "Number of threads writing")
+        public long maxOutstandingMB = 200;
+
+        @Parameter(
+            names = {
+                "-n", "--num-records"
+            },
+            description = "Number of records to write in total. If 0, it will 
keep writing")
+        public long numRecords = 0;
+
+        @Parameter(
+            names = {
+                "-b", "--num-bytes"
+            },
+            description = "Number of bytes to write in total. If 0, it will 
keep writing")
+        public long numBytes = 0;
+
+        @Parameter(
+            names = {
+                "-e", "--ensemble-size"
+            },
+            description = "Ledger ensemble size")
+        public int ensembleSize = 1;
+
+        @Parameter(
+            names = {
+                "-w", "--write-quorum-size"
+            },
+            description = "Ledger write quorum size")
+        public int writeQuorumSize = 1;
+
+        @Parameter(
+            names = {
+                "-a", "--ack-quorum-size"
+            },
+            description = "Ledger ack quorum size")
+        public int ackQuorumSize = 1;
+
+    }
+
+
+    // stats
+    private final LongAdder recordsWritten = new LongAdder();
+    private final LongAdder bytesWritten = new LongAdder();
+
+    private final byte[] payload;
+    private final ServiceURI serviceURI;
+    private final Flags flags;
+    private final Recorder recorder = new Recorder(
+        TimeUnit.SECONDS.toMillis(120000), 5
+    );
+    private final Recorder cumulativeRecorder = new Recorder(
+        TimeUnit.SECONDS.toMillis(120000), 5
+    );
+    private final AtomicBoolean isDone = new AtomicBoolean(false);
+
+    PerfWriter(ServiceURI serviceURI, Flags flags) {
+        this.serviceURI = serviceURI;
+        this.flags = flags;
+        this.payload = new byte[flags.recordSize];
+        ThreadLocalRandom.current().nextBytes(payload);
+    }
+
+    @Override
+    public void run() {
+        try {
+            execute();
+        } catch (Exception e) {
+            log.error("Encountered exception at running dlog perf writer", e);
+        }
+    }
+
+    void execute() throws Exception {
+        ObjectMapper m = new ObjectMapper();
+        ObjectWriter w = m.writerWithDefaultPrettyPrinter();
+        log.info("Starting dlog perf writer with config : {}", 
w.writeValueAsString(flags));
+
+        DistributedLogConfiguration conf = newDlogConf(flags);
+        try (Namespace namespace = NamespaceBuilder.newBuilder()
+             .conf(conf)
+             .uri(serviceURI.getUri())
+             .build()) {
+            execute(namespace);
+        }
+    }
+
+    void execute(Namespace namespace) throws Exception {
+        List<Pair<Integer, DistributedLogManager>> managers = new 
ArrayList<>(flags.numLogs);
+        for (int i = 0; i < flags.numLogs; i++) {
+            String logName = String.format(flags.logName, i);
+            managers.add(Pair.of(i, namespace.openLog(logName)));
+        }
+        log.info("Successfully open {} logs", managers.size());
+
+        // register shutdown hook to aggregate stats
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            isDone.set(true);
+            printAggregatedStats(cumulativeRecorder);
+        }));
+
+        ExecutorService executor = 
Executors.newFixedThreadPool(flags.numThreads);
+        try {
+            for (int i = 0; i < flags.numThreads; i++) {
+                final int idx = i;
+                final List<DistributedLogManager> logsThisThread = managers
+                    .stream()
+                    .filter(pair -> pair.getLeft() % flags.numThreads == idx)
+                    .map(pair -> pair.getRight())
+                    .collect(Collectors.toList());
+                final long numRecordsForThisThread = flags.numRecords / 
flags.numThreads;
+                final long numBytesForThisThread = flags.numBytes / 
flags.numThreads;
+                final double writeRateForThisThread = flags.writeRate / 
(double) flags.numThreads;
+                final long maxOutstandingBytesForThisThread = 
flags.maxOutstandingMB * 1024 * 1024 / flags.numThreads;
+                executor.submit(() -> {
+                    try {
+                        write(
+                            logsThisThread,
+                            writeRateForThisThread,
+                            (int) maxOutstandingBytesForThisThread,
+                            numRecordsForThisThread,
+                            numBytesForThisThread);
+                    } catch (Exception e) {
+                        log.error("Encountered error at writing records", e);
+                    }
+                });
+            }
+            log.info("Started {} write threads", flags.numThreads);
+            reportStats();
+        } finally {
+            executor.shutdown();
+            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+                executor.shutdownNow();
+            }
+            managers.forEach(manager -> manager.getRight().asyncClose());
+        }
+    }
+
+    void write(List<DistributedLogManager> logs,
+               double writeRate,
+               int maxOutstandingBytesForThisThread,
+               long numRecordsForThisThread,
+               long numBytesForThisThread) throws Exception {
+        log.info("Write thread started with : logs = {}, rate = {},"
+                + " num records = {}, num bytes = {}, max outstanding bytes = 
{}",
+            logs.stream().map(l -> 
l.getStreamName()).collect(Collectors.toList()),
+            writeRate,
+            numRecordsForThisThread,
+            numBytesForThisThread,
+            maxOutstandingBytesForThisThread);
+
+        List<CompletableFuture<AsyncLogWriter>> writerFutures = logs.stream()
+            .map(manager -> manager.openAsyncLogWriter())
+            .collect(Collectors.toList());
+        List<AsyncLogWriter> writers = 
result(FutureUtils.collect(writerFutures));
+
+        long txid = writers
+            .stream()
+            .mapToLong(writer -> writer.getLastTxId())
+            .max()
+            .orElse(0L);
+        txid = Math.max(0L, txid);
+
+        RateLimiter limiter;
+        if (writeRate > 0) {
+            limiter = RateLimiter.create(writeRate);
+        } else {
+            limiter = null;
+        }
+        final Semaphore semaphore;
+        if (maxOutstandingBytesForThisThread > 0) {
+            semaphore = new Semaphore(maxOutstandingBytesForThisThread);
+        } else {
+            semaphore = null;
+        }
+
+        // Acquire 1 second worth of records to have a slower ramp-up
+        if (limiter != null) {
+            limiter.acquire((int) writeRate);
+        }
+
+        long totalWritten = 0L;
+        long totalBytesWritten = 0L;
+        final int numLogs = logs.size();
+        while (true) {
+            for (int i = 0; i < numLogs; i++) {
+                if (numRecordsForThisThread > 0
+                    && totalWritten >= numRecordsForThisThread) {
+                    markPerfDone();
+                }
+                if (numBytesForThisThread > 0
+                    && totalBytesWritten >= numBytesForThisThread) {
+                    markPerfDone();
+                }
+                if (null != semaphore) {
+                    semaphore.acquire(payload.length);
+                }
+
+                totalWritten++;
+                totalBytesWritten += payload.length;
+                if (null != limiter) {
+                    limiter.acquire(payload.length);
+                }
+                final long sendTime = System.nanoTime();
+                writers.get(i).write(
+                    new LogRecord(++txid, Unpooled.wrappedBuffer(payload))
+                ).thenAccept(dlsn -> {
+                    if (null != semaphore) {
+                        semaphore.release(payload.length);
+                    }
+
+                    recordsWritten.increment();
+                    bytesWritten.add(payload.length);
+
+                    long latencyMicros = TimeUnit.NANOSECONDS.toMicros(
+                        System.nanoTime() - sendTime
+                    );
+                    recorder.recordValue(latencyMicros);
+                    cumulativeRecorder.recordValue(latencyMicros);
+                }).exceptionally(cause -> {
+                    log.warn("Error at writing records", cause);
+                    System.exit(-1);
+                    return null;
+                });
+            }
+        }
+    }
+
+    @SuppressFBWarnings("DM_EXIT")
+    void markPerfDone() throws Exception {
+        log.info("------------------- DONE -----------------------");
+        printAggregatedStats(cumulativeRecorder);
+        isDone.set(true);
+        Thread.sleep(5000);
+        System.exit(0);
+    }
+
+    void reportStats() {
+        // Print report stats
+        long oldTime = System.nanoTime();
+
+        Histogram reportHistogram = null;
+
+        while (true) {
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException e) {
+                break;
+            }
+
+            if (isDone.get()) {
+                break;
+            }
+
+            long now = System.nanoTime();
+            double elapsed = (now - oldTime) / 1e9;
+
+            double rate = recordsWritten.sumThenReset() / elapsed;
+            double throughput = bytesWritten.sumThenReset() / elapsed / 1024 / 
1024;
+
+            reportHistogram = recorder.getIntervalHistogram(reportHistogram);
+
+            log.info(
+                    "Throughput written : {}  records/s --- {} MB/s --- 
Latency: mean:"
+                        + " {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: 
{} - 99.99pct: {} - Max: {}",
+                    throughputFormat.format(rate), 
throughputFormat.format(throughput),
+                    dec.format(reportHistogram.getMean() / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(50) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(95) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.9) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.99) / 
1000.0),
+                    dec.format(reportHistogram.getMaxValue() / 1000.0));
+
+            reportHistogram.reset();
+
+            oldTime = now;
+        }
+
+    }
+
+    private static DistributedLogConfiguration newDlogConf(Flags flags) {
+        return new DistributedLogConfiguration()
+            .setEnsembleSize(flags.ensembleSize)
+            .setWriteQuorumSize(flags.writeQuorumSize)
+            .setAckQuorumSize(flags.ackQuorumSize)
+            .setOutputBufferSize(512 * 1024)
+            .setPeriodicFlushFrequencyMilliSeconds(2)
+            .setWriteLockEnabled(false)
+            .setMaxLogSegmentBytes(512 * 1024 * 1024) // 512MB
+            .setExplicitTruncationByApplication(true);
+    }
+
+
+    private static final DecimalFormat throughputFormat = new 
PaddingDecimalFormat("0.0", 8);
+    private static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 
7);
+
+    private static void printAggregatedStats(Recorder recorder) {
+        Histogram reportHistogram = recorder.getIntervalHistogram();
+
+        log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} 
- 95pct: {} - 99pct: {}"
+                + " - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
+                dec.format(reportHistogram.getMean() / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.9) / 
1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.99) / 
1000.0),
+                dec.format(reportHistogram.getValueAtPercentile(99.999) / 
1000.0),
+                dec.format(reportHistogram.getMaxValue() / 1000.0));
+    }
+
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
new file mode 100644
index 0000000000..001cacba3d
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/ReadCommand.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.tools.perf.dlog.PerfReader.Flags;
+import org.apache.commons.configuration.CompositeConfiguration;
+
+/**
+ * Command to read log records to distributedlog streams.
+ */
+@Slf4j
+public class ReadCommand extends BKCommand<Flags> {
+
+    private static final String NAME = "read";
+    private static final String DESC = "Read log records from distributedlog 
streams";
+
+    public ReadCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .build());
+    }
+
+    @Override
+    protected boolean apply(ServiceURI serviceURI,
+                            CompositeConfiguration conf,
+                            BKFlags globalFlags, Flags cmdFlags) {
+
+
+        if (serviceURI == null) {
+            log.warn("No service uri is provided. Use default 
'distributedlog://localhost/distributedlog'.");
+            serviceURI = 
ServiceURI.create("distributedlog://localhost/distributedlog");
+        }
+
+        PerfReader reader = new PerfReader(serviceURI, cmdFlags);
+        reader.run();
+        return true;
+    }
+
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/WriteCommand.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/WriteCommand.java
new file mode 100644
index 0000000000..aa7c92e46e
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/WriteCommand.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.dlog;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.tools.perf.dlog.PerfWriter.Flags;
+import org.apache.commons.configuration.CompositeConfiguration;
+
+/**
+ * Command to write log records to distributedlog streams.
+ */
+@Slf4j
+public class WriteCommand extends BKCommand<Flags> {
+
+    private static final String NAME = "write";
+    private static final String DESC = "Write log records to distributedlog 
streams";
+
+    public WriteCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .build());
+    }
+
+    @Override
+    protected boolean apply(ServiceURI serviceURI,
+                            CompositeConfiguration conf,
+                            BKFlags globalFlags, Flags cmdFlags) {
+
+
+        if (serviceURI == null) {
+            log.warn("No service uri is provided. Use default 
'distributedlog://localhost/distributedlog'.");
+            serviceURI = 
ServiceURI.create("distributedlog://localhost/distributedlog");
+        }
+
+        PerfWriter writer = new PerfWriter(serviceURI, cmdFlags);
+        writer.run();
+        return true;
+    }
+
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/package-info.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/package-info.java
new file mode 100644
index 0000000000..596d419d58
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/dlog/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Dlog related perf command.
+ */
+package org.apache.bookkeeper.tools.perf.dlog;
\ No newline at end of file
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/package-info.java 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/package-info.java
new file mode 100644
index 0000000000..ca7aee64bc
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * BookKeeper Perf Tool.
+ */
+package org.apache.bookkeeper.tools.perf;
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/PaddingDecimalFormat.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/PaddingDecimalFormat.java
new file mode 100644
index 0000000000..0bc92af86f
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/PaddingDecimalFormat.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.utils;
+
+import java.text.DecimalFormat;
+import java.text.DecimalFormatSymbols;
+import java.text.FieldPosition;
+
+/**
+ * A decimal format that adds padding zeros.
+ */
+public class PaddingDecimalFormat extends DecimalFormat {
+    private int minimumLength;
+
+    /**
+     * Creates a PaddingDecimalFormat using the given pattern and minimum 
minimumLength and the symbols for the default
+     * locale.
+     */
+    public PaddingDecimalFormat(String pattern, int minLength) {
+        super(pattern);
+        minimumLength = minLength;
+    }
+
+    /**
+     * Creates a PaddingDecimalFormat using the given pattern, symbols and 
minimum minimumLength.
+     */
+    public PaddingDecimalFormat(String pattern, DecimalFormatSymbols symbols, 
int minLength) {
+        super(pattern, symbols);
+        minimumLength = minLength;
+    }
+
+    @Override
+    public StringBuffer format(double number, StringBuffer toAppendTo, 
FieldPosition pos) {
+        int initLength = toAppendTo.length();
+        super.format(number, toAppendTo, pos);
+        return pad(toAppendTo, initLength);
+    }
+
+    @Override
+    public StringBuffer format(long number, StringBuffer toAppendTo, 
FieldPosition pos) {
+        int initLength = toAppendTo.length();
+        super.format(number, toAppendTo, pos);
+        return pad(toAppendTo, initLength);
+    }
+
+    private StringBuffer pad(StringBuffer toAppendTo, int initLength) {
+        int numLength = toAppendTo.length() - initLength;
+        int padLength = minimumLength - numLength;
+        if (padLength > 0) {
+            StringBuffer pad = new StringBuffer(padLength);
+            for (int i = 0; i < padLength; i++) {
+                pad.append(' ');
+            }
+            toAppendTo.insert(initLength, pad);
+        }
+        return toAppendTo;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof PaddingDecimalFormat)) {
+            return false;
+        }
+
+        PaddingDecimalFormat other = (PaddingDecimalFormat) obj;
+        return minimumLength == other.minimumLength && super.equals(obj);
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 * super.hashCode() + minimumLength;
+    }
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/package-info.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/package-info.java
new file mode 100644
index 0000000000..76a7e427d9
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/utils/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utils used in the perf tool.
+ */
+package org.apache.bookkeeper.tools.perf.utils;
\ No newline at end of file
diff --git a/tools/pom.xml b/tools/pom.xml
index 988de817fb..37e63751ed 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -27,6 +27,7 @@
   <packaging>pom</packaging>
   <modules>
     <module>framework</module>
+    <module>perf</module>
     <module>all</module>
   </modules>
   <profiles>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to