sijie closed pull request #1727: [table services][perf] Provides a simple perf 
tool for testing put and inc performance
URL: https://github.com/apache/bookkeeper/pull/1727
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
index 4550df55aa..0a87add788 100644
--- 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
+++ 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
@@ -91,7 +91,7 @@ public DLNamespaceProviderService(ServerConfiguration 
bkServerConf,
         this.dlConf.setWriteLockEnabled(false);
         // setting the flush policy
         this.dlConf.setImmediateFlushEnabled(false);
-        this.dlConf.setOutputBufferSize(0);
+        this.dlConf.setOutputBufferSize(512 * 1024);
         this.dlConf.setPeriodicFlushFrequencyMilliSeconds(2); // flush every 1 
ms
         // explicit truncation is required
         this.dlConf.setExplicitTruncationByApplication(true);
diff --git 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java
 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java
index 4e72e1f5cb..46ba7f1c13 100644
--- 
a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java
+++ 
b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVStore.java
@@ -275,7 +275,7 @@ protected void openRocksdb(StateStoreSpec spec) throws 
StateStoreException {
         // initialize the write options
 
         writeOpts = new WriteOptions();
-        writeOpts.setDisableWAL(false); // disable wal, since the source of 
truth will be on distributedlog
+        writeOpts.setDisableWAL(true); // disable wal, since the source of 
truth will be on distributedlog
 
         // initialize the flush options
 
diff --git a/tools/perf/pom.xml b/tools/perf/pom.xml
index d3a617f60b..3142063a2b 100644
--- a/tools/perf/pom.xml
+++ b/tools/perf/pom.xml
@@ -34,6 +34,11 @@
       <artifactId>distributedlog-core</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-java-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.hdrhistogram</groupId>
       <artifactId>HdrHistogram</artifactId>
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java
index 23021ee930..3aac74eef8 100644
--- a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java
+++ b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/BKPerf.java
@@ -35,7 +35,8 @@ public static void main(String[] args) {
             .withDescription(NAME + " evaluates the performance of Apache 
BookKeeper clusters")
             .withFlags(new BKFlags())
             .withConsole(System.out)
-            .addCommand(new DlogPerfCommandGroup());
+            .addCommand(new DlogPerfCommandGroup())
+            .addCommand(new TablePerfCommandGroup());
 
         CliSpec<BKFlags> spec = specBuilder.build();
 
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/TablePerfCommandGroup.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/TablePerfCommandGroup.java
new file mode 100644
index 0000000000..5ab16b8af8
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/TablePerfCommandGroup.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf;
+
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliCommandGroup;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.tools.perf.table.BenchmarkCommand;
+
+/**
+ * Commands that evaluate performance of table service.
+ */
+public class TablePerfCommandGroup extends CliCommandGroup<BKFlags> implements 
PerfCommandGroup<BKFlags> {
+
+    private static final String NAME = "table";
+    private static final String DESC = "Commands on evaluating performance of 
table service";
+
+    private static final CliSpec<BKFlags> spec = CliSpec.<BKFlags>newBuilder()
+        .withName(NAME)
+        .withDescription(DESC)
+        .withParent(BKPerf.NAME)
+        .addCommand(new BenchmarkCommand())
+        .build();
+
+    public TablePerfCommandGroup() {
+        super(spec);
+    }
+
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/BenchmarkCommand.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/BenchmarkCommand.java
new file mode 100644
index 0000000000..19322f5c58
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/BenchmarkCommand.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.table;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.common.BKCommand;
+import org.apache.bookkeeper.tools.common.BKFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.tools.perf.table.PerfClient.Flags;
+import org.apache.commons.configuration.CompositeConfiguration;
+
+/**
+ * Command to benchmark table service.
+ */
+@Slf4j
+public class BenchmarkCommand extends BKCommand<Flags> {
+
+    private static final String NAME = "benchmark";
+    private static final String DESC = "benchmark table service";
+
+    public BenchmarkCommand() {
+        super(CliSpec.<Flags>newBuilder()
+            .withName(NAME)
+            .withDescription(DESC)
+            .withFlags(new Flags())
+            .build());
+    }
+
+    @Override
+    protected boolean apply(ServiceURI serviceURI,
+                            CompositeConfiguration conf,
+                            BKFlags globalFlags,
+                            Flags cmdFlags) {
+        if (serviceURI == null) {
+            log.warn("No service uri is provided. Use default 
'bk://localhost:4181'.");
+            serviceURI = ServiceURI.create("bk://localhost:4181");
+        }
+        PerfClient client = new PerfClient(serviceURI, cmdFlags);
+        client.run();
+        return true;
+    }
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/BenchmarkTask.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/BenchmarkTask.java
new file mode 100644
index 0000000000..ed2321abe1
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/BenchmarkTask.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.table;
+
+import io.netty.buffer.ByteBuf;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.tools.perf.table.PerfClient.Flags;
+
+/**
+ * Abstract benchmark task.
+ */
+abstract class BenchmarkTask implements Callable<Void> {
+
+    protected final Table<ByteBuf, ByteBuf> table;
+    protected final int tid;
+    protected final Random random;
+    protected final long numRecords;
+    protected final long keyRange;
+    protected final Flags flags;
+    protected final KeyGenerator generator;
+
+    BenchmarkTask(Table<ByteBuf, ByteBuf> table,
+                  int tid,
+                  long randSeed,
+                  long numRecords,
+                  long keyRange,
+                  Flags flags,
+                  KeyGenerator generator) {
+        this.table = table;
+        this.tid = tid;
+        this.random = new Random(randSeed + tid * 1000);
+        this.numRecords = numRecords;
+        this.keyRange = keyRange;
+        this.flags = flags;
+        this.generator = generator;
+    }
+
+    @Override
+    public Void call() throws Exception {
+        runTask();
+        return null;
+    }
+
+    protected abstract void runTask() throws Exception;
+
+    protected void getFixedKey(ByteBuf key, long sn) {
+        generator.generateKeyFromLong(key, sn);
+    }
+
+    protected void getRandomKey(ByteBuf key, long range) {
+        generator.generateKeyFromLong(key, Math.abs(random.nextLong() % 
range));
+    }
+
+    protected abstract void reportStats(long oldTime);
+
+    protected abstract void printAggregatedStats();
+
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/IncrementRandomTask.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/IncrementRandomTask.java
new file mode 100644
index 0000000000..65ffb35bd6
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/IncrementRandomTask.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.table;
+
+import com.google.common.util.concurrent.RateLimiter;
+import io.netty.buffer.ByteBuf;
+import java.util.concurrent.Semaphore;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.tools.perf.table.PerfClient.Flags;
+
+/**
+ * Increment the amount of keys in sequence.
+ */
+class IncrementRandomTask extends IncrementTask {
+
+    IncrementRandomTask(Table<ByteBuf, ByteBuf> table,
+                        int tid,
+                        long randSeed,
+                        long numRecords,
+                        long keyRange,
+                        Flags flags,
+                        KeyGenerator generator,
+                        RateLimiter limiter,
+                        Semaphore semaphore) {
+        super(table, tid, randSeed, numRecords, keyRange, flags, generator, 
limiter, semaphore);
+    }
+
+
+    @Override
+    protected void getKey(ByteBuf key, long id, long range) {
+        getRandomKey(key, range);
+    }
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/IncrementSequentialTask.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/IncrementSequentialTask.java
new file mode 100644
index 0000000000..82610f170d
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/IncrementSequentialTask.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.table;
+
+import com.google.common.util.concurrent.RateLimiter;
+import io.netty.buffer.ByteBuf;
+import java.util.concurrent.Semaphore;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.tools.perf.table.PerfClient.Flags;
+
+/**
+ * Increment the amount of keys in sequence.
+ */
+class IncrementSequentialTask extends IncrementTask {
+
+    IncrementSequentialTask(Table<ByteBuf, ByteBuf> table,
+                            int tid,
+                            long randSeed,
+                            long numRecords,
+                            long keyRange,
+                            Flags flags,
+                            KeyGenerator generator,
+                            RateLimiter limiter,
+                            Semaphore semaphore) {
+        super(table, tid, randSeed, numRecords, keyRange, flags, generator, 
limiter, semaphore);
+    }
+
+
+    @Override
+    protected void getKey(ByteBuf key, long id, long range) {
+        getFixedKey(key, id);
+    }
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/IncrementTask.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/IncrementTask.java
new file mode 100644
index 0000000000..3f047d9c26
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/IncrementTask.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.table;
+
+import com.google.common.util.concurrent.RateLimiter;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.tools.perf.table.PerfClient.Flags;
+import org.apache.bookkeeper.tools.perf.table.PerfClient.OP;
+import org.apache.bookkeeper.tools.perf.table.PerfClient.OpStats;
+
+/**
+ * Write task to inject key/value pairs to the table.
+ */
+@Slf4j
+abstract class IncrementTask extends BenchmarkTask {
+
+    protected final RateLimiter limiter;
+    protected final Semaphore semaphore;
+    protected final OpStats writeOpStats;
+
+    IncrementTask(Table<ByteBuf, ByteBuf> table,
+                  int tid,
+                  long randSeed,
+                  long numRecords,
+                  long keyRange,
+                  Flags flags,
+                  KeyGenerator generator,
+                  RateLimiter limiter,
+                  Semaphore semaphore) {
+        super(table, tid, randSeed, numRecords, keyRange, flags, generator);
+        this.limiter = limiter;
+        this.semaphore = semaphore;
+        this.writeOpStats = new OpStats(OP.INC.name());
+    }
+
+    @Override
+    protected void runTask() throws Exception {
+        for (long i = 0L; i < numRecords; ++i) {
+            if (null != semaphore) {
+                semaphore.acquire();
+            }
+            if (null != limiter) {
+                limiter.acquire();
+            }
+            incKey(i);
+        }
+    }
+
+    protected abstract void getKey(ByteBuf key, long id, long range);
+
+    void incKey(long i) {
+        ByteBuf keyBuf = 
PooledByteBufAllocator.DEFAULT.heapBuffer(flags.keySize);
+        getKey(keyBuf, i, keyRange);
+        keyBuf.writerIndex(keyBuf.readerIndex() + keyBuf.writableBytes());
+
+        final long startTime = System.nanoTime();
+        table.increment(keyBuf, 100)
+            .whenComplete((result, cause) -> {
+                if (null != semaphore) {
+                    semaphore.release();
+                }
+                if (null != cause) {
+                    log.error("Error at increment key/amount", cause);
+                } else {
+                    long latencyMicros = TimeUnit.NANOSECONDS.toMicros(
+                        System.nanoTime() - startTime
+                    );
+                    writeOpStats.recordOp(latencyMicros);
+                }
+                keyBuf.release();
+            });
+    }
+
+    @Override
+    protected void reportStats(long oldTime) {
+        writeOpStats.reportStats(oldTime);
+    }
+
+    @Override
+    protected void printAggregatedStats() {
+        writeOpStats.printAggregatedStats();
+    }
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/KeyGenerator.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/KeyGenerator.java
new file mode 100644
index 0000000000..458aa1c1d7
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/KeyGenerator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.table;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * Util class used for perf benchmarks.
+ */
+final class KeyGenerator {
+
+    private final long numKeys;
+    private final long keysPerPrefix;
+    private final int prefixSize;
+
+    KeyGenerator(long numKeys,
+                 long keysPerPrefix,
+                 int prefixSize) {
+        this.numKeys = numKeys;
+        this.keysPerPrefix = keysPerPrefix;
+        this.prefixSize = prefixSize;
+    }
+
+    public void generateKeyFromLong(ByteBuf slice, long n) {
+        int startPos = 0;
+        if (keysPerPrefix > 0) {
+            long numPrefix = (numKeys + keysPerPrefix - 1) / keysPerPrefix;
+            long prefix = n % numPrefix;
+            int bytesToFill = Math.min(prefixSize, 8);
+            for (int i = 0; i < bytesToFill; i++) {
+                slice.setByte(i,  (byte) (prefix % 256));
+                prefix /= 256;
+            }
+            for (int i = 8; i < bytesToFill; ++i) {
+                slice.setByte(i, '0');
+            }
+            startPos = bytesToFill;
+        }
+        for (int i = slice.writableBytes() - 1; i >= startPos; --i) {
+            slice.setByte(i, (byte) ('0' + (n % 10)));
+            n /= 10;
+        }
+    }
+
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/PerfClient.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/PerfClient.java
new file mode 100644
index 0000000000..4b3442700c
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/PerfClient.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.table;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+
+import com.beust.jcommander.Parameter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.util.concurrent.RateLimiter;
+import io.netty.buffer.ByteBuf;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.Recorder;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.clients.StorageClientBuilder;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.common.net.ServiceURI;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
+
+/**
+ * Perf client to evaluate the performance of table service.
+ */
+@Slf4j
+public class PerfClient implements Runnable {
+
+    enum OP {
+        PUT,
+        GET,
+        INC,
+        DEL
+    }
+
+    /**
+     * Flags for the perf client.
+     */
+    public static class Flags extends CliFlags {
+
+        @Parameter(
+            names = {
+                "-r", "--rate"
+            },
+            description = "Request rate - requests/second")
+        public int rate = 100000;
+
+        @Parameter(
+            names = {
+                "-mor", "--max-outstanding-requests"
+            },
+            description = "Max outstanding request")
+        public int maxOutstandingRequests = 10000;
+
+        @Parameter(
+            names = {
+                "-ks", "--key-size"
+            },
+            description = "Key size")
+        public int keySize = 16;
+
+        @Parameter(
+            names = {
+                "-vs", "--value-size"
+            },
+            description = "Value size")
+        public int valueSize = 100;
+
+        @Parameter(
+            names = {
+                "-t", "--table-name"
+            },
+            description = "Table name")
+        public String tableName = "test-table";
+
+        @Parameter(
+            names = {
+                "-nk", "--num-keys"
+            },
+            description = "Number of the keys to test")
+        public int numKeys = 1000000;
+
+        @Parameter(
+            names = {
+                "-kpp", "--keys-per-prefix"
+            },
+            description = "control average number of keys generated per 
prefix,"
+                + " 0 means no special handling of the prefix, i.e. use the"
+                + " prefix comes with the generated random number"
+        )
+        public int keysPerPrefix = 0;
+
+        @Parameter(
+            names = {
+                "-ps", "--prefix-size"
+            },
+            description = "Prefix size"
+        )
+        public int prefixSize = 0;
+
+        @Parameter(
+            names = {
+                "-no", "--num-ops"
+            },
+            description = "Number of client operations to test")
+        public int numOps = 0;
+
+        @Parameter(
+            names = {
+                "-ns", "--namespace"
+            },
+            description = "Namespace of the tables to benchmark")
+        public String namespace = "benchmark";
+
+        @Parameter(
+            names = {
+                "-b", "--benchmarks"
+            },
+            description = "List of benchamrks to run")
+        public List<String> benchmarks;
+
+    }
+
+    static class OpStats {
+
+        private final String name;
+        private final LongAdder ops = new LongAdder();
+        private final Recorder recorder = new Recorder(
+            TimeUnit.SECONDS.toMillis(120000), 5
+        );
+        private final Recorder cumulativeRecorder = new Recorder(
+            TimeUnit.SECONDS.toMillis(120000), 5
+        );
+        private Histogram reportHistogram;
+
+        OpStats(String name) {
+            this.name = name;
+        }
+
+        void recordOp(long latencyMicros) {
+            ops.increment();
+            recorder.recordValue(latencyMicros);
+            cumulativeRecorder.recordValue(latencyMicros);
+        }
+
+        void reportStats(long oldTime) {
+            long now = System.nanoTime();
+            double elapsed = (now - oldTime) / 1e9;
+            double rate = ops.sumThenReset() / elapsed;
+            reportHistogram = recorder.getIntervalHistogram(reportHistogram);
+            log.info(
+                "[{}] Throughput: {}  ops/s --- Latency: mean:"
+                        + " {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: 
{} - 99.99pct: {} - Max: {}",
+                    name,
+                    throughputFormat.format(rate),
+                    dec.format(reportHistogram.getMean() / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(50) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(95) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.9) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.99) / 
1000.0),
+                    dec.format(reportHistogram.getMaxValue() / 1000.0));
+            reportHistogram.reset();
+        }
+
+        void printAggregatedStats() {
+            Histogram reportHistogram = 
cumulativeRecorder.getIntervalHistogram();
+            log.info("[{}] latency stats --- Latency: mean: {} ms - med: {} - 
95pct: {} - 99pct: {}"
+                    + " - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: 
{}",
+                    name,
+                    dec.format(reportHistogram.getMean() / 1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(50) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(95) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.9) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.99) / 
1000.0),
+                    dec.format(reportHistogram.getValueAtPercentile(99.999) / 
1000.0),
+                    dec.format(reportHistogram.getMaxValue() / 1000.0));
+        }
+    }
+
+    private final ServiceURI serviceURI;
+    private final Flags flags;
+
+    PerfClient(ServiceURI serviceURI, Flags flags) {
+        this.serviceURI = serviceURI;
+        this.flags = flags;
+    }
+
+    @Override
+    public void run() {
+        try {
+            execute();
+        } catch (Exception e) {
+            log.error("Encountered exception at running table perf client", e);
+        }
+    }
+
+    void execute() throws Exception {
+        ObjectMapper m = new ObjectMapper();
+        ObjectWriter w = m.writerWithDefaultPrettyPrinter();
+        log.info("Starting table perf client with config : {}", 
w.writeValueAsString(flags));
+
+        runBenchmarkTasks();
+    }
+
+    private void runBenchmarkTasks() throws Exception {
+        StorageClientSettings settings = StorageClientSettings.newBuilder()
+            .serviceUri(serviceURI.getUri().toString())
+            .build();
+        try (StorageClient client = StorageClientBuilder.newBuilder()
+             .withSettings(settings)
+             .withNamespace(flags.namespace)
+             .build()) {
+            try (Table<ByteBuf, ByteBuf> table = 
result(client.openTable(flags.tableName))) {
+
+                long randSeed = System.currentTimeMillis();
+                KeyGenerator generator = new KeyGenerator(flags.numKeys, 
flags.keysPerPrefix, flags.prefixSize);
+                RateLimiter limiter;
+                if (flags.rate <= 0) {
+                    limiter = null;
+                } else {
+                    limiter = RateLimiter.create(flags.rate);
+                }
+
+                for (String benchmark : flags.benchmarks) {
+                    List<BenchmarkTask> tasks = new ArrayList<>();
+                    int currentTaskId = 0;
+                    Semaphore semaphore;
+                    if (flags.maxOutstandingRequests <= 0) {
+                        semaphore = null;
+                    } else {
+                        semaphore = new 
Semaphore(flags.maxOutstandingRequests);
+                    }
+
+                    switch (benchmark) {
+                        case "fillseq":
+                            tasks.add(new WriteSequentialTask(
+                                table,
+                                currentTaskId++,
+                                randSeed,
+                                Math.max(flags.numOps, flags.numKeys),
+                                flags.numKeys,
+                                flags,
+                                generator,
+                                limiter,
+                                semaphore
+                            ));
+                            break;
+                        case "fillrandom":
+                            tasks.add(new WriteRandomTask(
+                                table,
+                                currentTaskId++,
+                                randSeed,
+                                Math.max(flags.numOps, flags.numKeys),
+                                flags.numKeys,
+                                flags,
+                                generator,
+                                limiter,
+                                semaphore
+                            ));
+                            break;
+                        case "incseq":
+                            tasks.add(new IncrementSequentialTask(
+                                table,
+                                currentTaskId++,
+                                randSeed,
+                                Math.max(flags.numOps, flags.numKeys),
+                                flags.numKeys,
+                                flags,
+                                generator,
+                                limiter,
+                                semaphore
+                            ));
+                            break;
+                        case "incrandom":
+                            tasks.add(new IncrementRandomTask(
+                                table,
+                                currentTaskId++,
+                                randSeed,
+                                Math.max(flags.numOps, flags.numKeys),
+                                flags.numKeys,
+                                flags,
+                                generator,
+                                limiter,
+                                semaphore
+                            ));
+                            break;
+                        default:
+                            System.err.println("Unknown benchmark: " + 
benchmark);
+                            break;
+                    }
+
+                    if (tasks.isEmpty()) {
+                        continue;
+                    }
+
+                    final CountDownLatch latch = new 
CountDownLatch(tasks.size());
+                    @Cleanup("shutdown")
+                    ExecutorService executor = Executors.newCachedThreadPool();
+                    for (BenchmarkTask task : tasks) {
+                        executor.submit(() -> {
+                            try {
+                                task.runTask();
+                            } catch (Exception e) {
+                                log.error("Encountered issue at running 
benchmark task {}",
+                                    task.tid, e);
+                            } finally {
+                                latch.countDown();
+                            }
+
+                        });
+                    }
+
+                    @Cleanup("shutdown")
+                    ExecutorService statsExecutor = 
Executors.newSingleThreadExecutor();
+                    statsExecutor.submit(() -> reportStats(tasks));
+
+                    latch.await();
+
+                    log.info("------------------- DONE 
-----------------------");
+                    tasks.forEach(task -> task.printAggregatedStats());
+                }
+            }
+        }
+    }
+
+    private void reportStats(List<BenchmarkTask> tasks) {
+        long oldTime = System.nanoTime();
+
+        while (true) {
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException ie) {
+                break;
+            }
+
+            final long startTime = oldTime;
+            tasks.forEach(task -> task.reportStats(startTime));
+            oldTime = System.nanoTime();
+        }
+    }
+
+    private static final DecimalFormat throughputFormat = new 
PaddingDecimalFormat("0.0", 8);
+    private static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 
7);
+
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/WriteRandomTask.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/WriteRandomTask.java
new file mode 100644
index 0000000000..ee9d4cc343
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/WriteRandomTask.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.table;
+
+import com.google.common.util.concurrent.RateLimiter;
+import io.netty.buffer.ByteBuf;
+import java.util.concurrent.Semaphore;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.tools.perf.table.PerfClient.Flags;
+
+/**
+ * Write key/values in sequence.
+ */
+class WriteRandomTask extends WriteTask {
+
+    WriteRandomTask(Table<ByteBuf, ByteBuf> table,
+                    int tid,
+                    long randSeed,
+                    long numRecords,
+                    long keyRange,
+                    Flags flags,
+                    KeyGenerator generator,
+                    RateLimiter limiter,
+                    Semaphore semaphore) {
+        super(table, tid, randSeed, numRecords, keyRange, flags, generator, 
limiter, semaphore);
+    }
+
+    @Override
+    protected void getKey(ByteBuf key, long id, long range) {
+        getRandomKey(key, range);
+    }
+
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/WriteSequentialTask.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/WriteSequentialTask.java
new file mode 100644
index 0000000000..963cfa74cd
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/WriteSequentialTask.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.table;
+
+import com.google.common.util.concurrent.RateLimiter;
+import io.netty.buffer.ByteBuf;
+import java.util.concurrent.Semaphore;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.tools.perf.table.PerfClient.Flags;
+
+/**
+ * Write key/values in sequence.
+ */
+class WriteSequentialTask extends WriteTask {
+
+    WriteSequentialTask(Table<ByteBuf, ByteBuf> table,
+                        int tid,
+                        long randSeed,
+                        long numRecords,
+                        long keyRange,
+                        Flags flags,
+                        KeyGenerator generator,
+                        RateLimiter limiter,
+                        Semaphore semaphore) {
+        super(table, tid, randSeed, numRecords, keyRange, flags, generator, 
limiter, semaphore);
+    }
+
+    @Override
+    protected void getKey(ByteBuf key, long id, long range) {
+        getFixedKey(key, id);
+    }
+
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/WriteTask.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/WriteTask.java
new file mode 100644
index 0000000000..a74c67abc1
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/WriteTask.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.tools.perf.table;
+
+import com.google.common.util.concurrent.RateLimiter;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.tools.perf.table.PerfClient.Flags;
+import org.apache.bookkeeper.tools.perf.table.PerfClient.OP;
+import org.apache.bookkeeper.tools.perf.table.PerfClient.OpStats;
+
+/**
+ * Write task to inject key/value pairs to the table.
+ */
+@Slf4j
+abstract class WriteTask extends BenchmarkTask {
+
+    protected final RateLimiter limiter;
+    protected final Semaphore semaphore;
+    protected final byte[] valueBytes;
+    protected final OpStats writeOpStats;
+
+    WriteTask(Table<ByteBuf, ByteBuf> table,
+              int tid,
+              long randSeed,
+              long numRecords,
+              long keyRange,
+              Flags flags,
+              KeyGenerator generator,
+              RateLimiter limiter,
+              Semaphore semaphore) {
+        super(table, tid, randSeed, numRecords, keyRange, flags, generator);
+        this.limiter = limiter;
+        this.semaphore = semaphore;
+        this.valueBytes = new byte[flags.valueSize];
+        ThreadLocalRandom.current().nextBytes(valueBytes);
+        this.writeOpStats = new OpStats(OP.PUT.name());
+    }
+
+    @Override
+    protected void runTask() throws Exception {
+        for (long i = 0L; i < numRecords; ++i) {
+            if (null != semaphore) {
+                semaphore.acquire();
+            }
+            if (null != limiter) {
+                limiter.acquire();
+            }
+            writeKey(i, valueBytes);
+        }
+    }
+
+    protected abstract void getKey(ByteBuf key, long id, long range);
+
+    void writeKey(long i, byte[] valueBytes) {
+        final ByteBuf keyBuf = 
PooledByteBufAllocator.DEFAULT.heapBuffer(flags.keySize);
+        getKey(keyBuf, i, keyRange);
+        keyBuf.writerIndex(keyBuf.readerIndex() + keyBuf.writableBytes());
+        final ByteBuf valBuf = Unpooled.wrappedBuffer(valueBytes);
+
+        final long startTime = System.nanoTime();
+        table.put(keyBuf, valBuf)
+            .whenComplete((result, cause) -> {
+                if (null != semaphore) {
+                    semaphore.release();
+                }
+                if (null != cause) {
+                    log.error("Error at put key/value", cause);
+                } else {
+                    long latencyMicros = TimeUnit.NANOSECONDS.toMicros(
+                        System.nanoTime() - startTime
+                    );
+                    writeOpStats.recordOp(latencyMicros);
+                }
+                keyBuf.release();
+                valBuf.release();
+            });
+    }
+
+    @Override
+    protected void reportStats(long oldTime) {
+        writeOpStats.reportStats(oldTime);
+    }
+
+    @Override
+    protected void printAggregatedStats() {
+        writeOpStats.printAggregatedStats();
+    }
+}
diff --git 
a/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/package-info.java
 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/package-info.java
new file mode 100644
index 0000000000..b36e39b343
--- /dev/null
+++ 
b/tools/perf/src/main/java/org/apache/bookkeeper/tools/perf/table/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Table related perf command.
+ */
+package org.apache.bookkeeper.tools.perf.table;
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to