Author: liyin Date: Wed Apr 2 20:48:55 2014 New Revision: 1584158 URL: http://svn.apache.org/r1584158 Log: [HBASE-10754] Improve the micro benchmarks to perform multigets instead of gets.
Author: manukranthk Summary: This diff intends to improve the benchmarks by adding multigets to the HBaseBenchmarkTool. Test Plan: Testing on the dev server Reviewers: adela, gauravm, fan Reviewed By: fan CC: hbase-eng@, fan Differential Revision: https://phabricator.fb.com/D1215543 Task ID: 3904136 Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClientImpl.java - copied, changed from r1577643, hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClient.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCProtocolComparison.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/ThriftBenchmarkClient.java Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClient.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClient.java?rev=1584158&r1=1584157&r2=1584158&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClient.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClient.java Wed Apr 2 20:48:55 2014 @@ -35,11 +35,22 @@ public interface BenchmarkClient { public Put createPut(byte[] row, byte[] family, byte[] qual, byte[] value); + public Put createRandomPut(int rowLength, byte[] family, int qualLength, + int valueLength); + public Result executeGet(Get get); + public Result[] executeMultiGet(List<Get> gets); + + public void executeMultiPut(List<Put> puts); + public void executePut(Put put); public List<Result> executeScan(Scan scan); public Scan createScan(byte[] row, byte[] family, int nbRows); + + public void printProfilingData(); + + public void setProfilingData(boolean flag); } Copied: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClientImpl.java (from r1577643, hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java) URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClientImpl.java?p2=hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClientImpl.java&p1=hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java&r1=1577643&r2=1584158&rev=1584158&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/BenchmarkClientImpl.java Wed Apr 2 20:48:55 2014 @@ -1,25 +1,8 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.hadoop.hbase.util.rpcbench; import java.io.IOException; import java.util.List; +import java.util.Random; import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log; @@ -29,18 +12,24 @@ import org.apache.hadoop.hbase.client.HT import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.ipc.ProfilingData; -/** - * This is a BenchmarkClient which performs RPC operations through HadoopRPC. - */ -public class HadoopRPCBenchmarkClient implements BenchmarkClient { - private static final Log LOG = LogFactory.getLog(ThriftBenchmarkClient.class); - private HTable htable = null; - HadoopRPCBenchmarkClient(HTable htable) { - this.htable = htable; - this.htable.setAutoFlush(true); - } +public abstract class BenchmarkClientImpl implements BenchmarkClient { + private static final Log LOG = LogFactory.getLog(BenchmarkClientImpl.class); + protected HTable htable = null; + @Override + public Put createRandomPut(int rowLength, byte[] family, int qualLength, + int valueLength) { + Random rand = new Random(); + byte[] row = new byte[rowLength]; + byte[] qual = new byte[qualLength]; + byte[] value = new byte[valueLength]; + rand.nextBytes(row); + rand.nextBytes(qual); + rand.nextBytes(value); + return this.createPut(row, family, qual, value); + } // Performing a get through thrift @Override public Result executeGet(Get get) { @@ -65,6 +54,7 @@ public class HadoopRPCBenchmarkClient im } } + @SuppressWarnings("deprecation") public Get createGet(byte[] row, byte[] family, byte[] qual) { Get g = new Get(row); g.addColumn(family, qual); @@ -87,4 +77,35 @@ public class HadoopRPCBenchmarkClient im throw new NotImplementedException(); } + @Override + public Result[] executeMultiGet(List<Get> gets) { + try { + return this.htable.batchGet(gets); + } catch (IOException e) { + LOG.debug("Unable to perform put"); + e.printStackTrace(); + } + return null; + } + + @Override + public void executeMultiPut(List<Put> puts) { + try { + this.htable.put(puts); + } catch (IOException e) { + LOG.debug("Unable to perform put"); + e.printStackTrace(); + } + } + + public void printProfilingData() { + if (htable.getProfiling()) { + ProfilingData data = htable.getProfilingData(); + LOG.debug(data.toPrettyString()); + } + } + + public void setProfilingData(boolean flag) { + this.htable.setProfiling(flag); + } } Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java?rev=1584158&r1=1584157&r2=1584158&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java Wed Apr 2 20:48:55 2014 @@ -24,6 +24,8 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.loadtest.ColumnFamilyProperties; import org.apache.hadoop.hbase.loadtest.HBaseUtils; @@ -31,7 +33,12 @@ import org.apache.hadoop.hbase.regionser import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Histogram; +import org.weakref.jmx.com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -39,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; + /** * Tool that runs the benchmarks. This takes the name of a benchmark factory, * and performs a single put and does a lot of gets to retrieve that put. @@ -54,30 +62,32 @@ public class HBaseRPCBenchmarkTool exten private static final long DEFAULT_REPORT_INTERVAL_MS = 10; private static final int DEFAULT_NUM_OPS = 200; private static final int DEFAULT_NUM_THREADS = 10; - private static final String DEFAULT_ROW = "rowkey"; + private static final int DEFAULT_ROW_LENGTH = 20; private static final String DEFAULT_CF = "cf"; - private static final String DEFAULT_QUAL ="q"; - private static final String DEFAULT_VALUE = "v"; + private static final int DEFAULT_QUAL_LENGTH = 10; + private static final int DEFAULT_VALUE_LENGTH = 100; private static final String DEFAULT_TABLENAME = "RPCBenchmarkingTable"; private static final int DEFAULT_ZK_PORT = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT; - private static final boolean DEFAULT_DO_PUT = true; + private static final boolean DEFAULT_ASYNC_CALLS = false; /** * The following are the command line parameters which this tool takes. */ - private static final String OPT_CF= "cf"; - private static final String OPT_QUAL = "q"; - private static final String OPT_ROW = "r"; + private static final String OPT_CF = "cf"; + private static final String OPT_QUAL_LENGTH = "q"; + private static final String OPT_ROW_LENGTH = "r"; private static final String OPT_TBL_NAME = "t"; - private static final String OPT_VALUE = "v"; + private static final String OPT_VALUE_LENGTH = "v"; private static final String OPT_CLASS = "c"; private static final String OPT_NUM_OPS = "ops"; private static final String OPT_NUM_THREADS = "threads"; private static final String OPT_REPORT_INTERVAL = "interval"; - private static final String OPT_NO_PUT = "no_put"; private static final String OPT_ZK_QUORUM = "zk"; private static final String OPT_ZK_PORT = "zkPort"; + private static final String OPT_GET_BATCH_SIZE = "gbatch"; + private static final String OPT_PUT_BATCH_SIZE = "pbatch"; + private static final String OPT_ASYNC_CALLS = "async"; /** * These are values that we get from the command line and @@ -91,17 +101,20 @@ public class HBaseRPCBenchmarkTool exten private byte[] tblName; private String zkQuorum; private int zkPort; - private byte[] row; + private int rowLength; private byte[] family; - private byte[] qual; - private byte[] value; + private int qualLength; + private int valueLength; private int numOps; private int numThreads; private long reportInterval; - private boolean doPut; private AtomicLong sumLatency = new AtomicLong(0); private AtomicLong totalOps = new AtomicLong(0); private long runtimeMs; + private int multigetBatch; + private int multiputBatch; + private boolean useAsync; + private HBaseRPCBenchmarkTool() { } @@ -111,20 +124,21 @@ public class HBaseRPCBenchmarkTool exten } private HBaseRPCBenchmarkTool(Class<? extends BenchmarkFactory> factoryCls, - byte[] tableName, Configuration conf, byte[] row, byte[] cf, byte[] qual, - byte[] value, int numOps, int numThreads, long reportIntervalMs, - boolean doPut) { + byte[] tableName, Configuration conf, int rowLength, byte[] cf, + int qualLength, int valueLength, int numOps, int numThreads, + long reportIntervalMs, int multiGetBatch, int multiputBatch) { this.factoryCls = factoryCls; this.conf = conf; this.tblName = tableName; - this.row = row; + this.rowLength = rowLength; this.family = cf; - this.qual = qual; - this.value = value; + this.qualLength = qualLength; + this.valueLength = valueLength; this.numOps = numOps; this.numThreads = numThreads; this.reportInterval = reportIntervalMs; - this.doPut = doPut; + this.multigetBatch = multiGetBatch; + this.multiputBatch = multiputBatch; } /** @@ -133,15 +147,16 @@ public class HBaseRPCBenchmarkTool exten public static class Builder { private final Class<? extends BenchmarkFactory> factoryCls; private byte[] tableName = Bytes.toBytes(DEFAULT_TABLENAME); - private byte[] row = Bytes.toBytes(DEFAULT_ROW); + private int rowLength = DEFAULT_ROW_LENGTH; private byte[] cf = Bytes.toBytes(DEFAULT_CF); - private byte[] qual = Bytes.toBytes(DEFAULT_QUAL); - private byte[] value = Bytes.toBytes(DEFAULT_VALUE); + private int qualLength = DEFAULT_QUAL_LENGTH; + private int valueLength = DEFAULT_VALUE_LENGTH; private int numOps = DEFAULT_NUM_OPS; private int numThreads = DEFAULT_NUM_THREADS; private long reportIntervalMs = DEFAULT_REPORT_INTERVAL_MS; - private boolean doPut = DEFAULT_DO_PUT; private Configuration conf; + private int multigetBatch = 1000; + private int multiputBatch = 1000; public Builder(Class<? extends BenchmarkFactory> factoryCls) { this.factoryCls = factoryCls; @@ -152,8 +167,8 @@ public class HBaseRPCBenchmarkTool exten return this; } - public Builder withRow(byte[] row) { - this.row = row; + public Builder withRowLength(int rowLength) { + this.rowLength = rowLength; return this; } @@ -162,13 +177,13 @@ public class HBaseRPCBenchmarkTool exten return this; } - public Builder withQualifier(byte[] qual) { - this.qual = qual; + public Builder withQualifierLength(int qualLength) { + this.qualLength = qualLength; return this; } - public Builder withValue(byte[] value) { - this.value = value; + public Builder withValue(int valueLength) { + this.valueLength = valueLength; return this; } @@ -182,20 +197,26 @@ public class HBaseRPCBenchmarkTool exten return this; } - public Builder withDoPut(boolean doPut) { - this.doPut = doPut; + public Builder withConf(Configuration conf) { + this.conf = conf; return this; } - public Builder withConf(Configuration conf) { - this.conf = conf; + public Builder withMultiGetBatch(int multiGetBatch) { + this.multigetBatch = multiGetBatch; + return this; + } + + public Builder withMultiPutBatch(int multiputBatch) { + this.multiputBatch = multiputBatch; return this; } public HBaseRPCBenchmarkTool create() { return new HBaseRPCBenchmarkTool(this.factoryCls, this.tableName, - this.conf, this.row, this.cf, this.qual, this.value, - this.numOps, this.numThreads, this.reportIntervalMs, this.doPut); + this.conf, this.rowLength, this.cf, this.qualLength, this.valueLength, + this.numOps, this.numThreads, this.reportIntervalMs, + this.multigetBatch, this.multiputBatch); } } @@ -207,16 +228,17 @@ public class HBaseRPCBenchmarkTool exten addOptWithArg(OPT_CLASS, "Benchmark factory class"); addOptWithArg(OPT_NUM_THREADS, "Number of threads"); addOptWithArg(OPT_NUM_OPS, "Number of operations to execute per thread"); + addOptWithArg(OPT_GET_BATCH_SIZE, "Number of gets in a single batch"); + addOptWithArg(OPT_PUT_BATCH_SIZE, "Number of puts in a single batch"); addOptWithArg(OPT_TBL_NAME, "Table name to use"); addOptWithArg(OPT_ZK_QUORUM, "Table name"); addOptWithArg(OPT_ZK_PORT, "Zookeeper Port"); addOptWithArg(OPT_REPORT_INTERVAL, "Reporting interval in milliseconds"); - addOptWithArg(OPT_ROW, "Row key"); - addOptWithArg(OPT_NO_PUT, - "DO NOT perform a single put (writing the value) before the benchmark"); + addOptWithArg(OPT_ROW_LENGTH, "Row key length"); addOptWithArg(OPT_CF, "Column family to use"); - addOptWithArg(OPT_QUAL, "Column qualifier to use"); - addOptWithArg(OPT_VALUE, "Value to use"); + addOptWithArg(OPT_QUAL_LENGTH, "Column qualifier length"); + addOptWithArg(OPT_VALUE_LENGTH, "Value length"); + addOptWithArg(OPT_ASYNC_CALLS, "Use async calls underneath the sync calls"); } /** @@ -244,7 +266,8 @@ public class HBaseRPCBenchmarkTool exten LOG.debug("Adding zookeeper quorum : " + zkQuorum); } reportInterval = parseLong(cmd.getOptionValue(OPT_REPORT_INTERVAL, - String.valueOf(DEFAULT_REPORT_INTERVAL_MS)), reportInterval, Long.MAX_VALUE); + String.valueOf(DEFAULT_REPORT_INTERVAL_MS)), + reportInterval, Long.MAX_VALUE); if (cmd.hasOption(OPT_TBL_NAME)) { tblName = Bytes.toBytes(cmd.getOptionValue(OPT_TBL_NAME)); } else { @@ -256,39 +279,60 @@ public class HBaseRPCBenchmarkTool exten HBaseUtils.createTableIfNotExists(conf, tblName, familyProperties, 1); } - row = Bytes.toBytes(cmd.getOptionValue(OPT_ROW, DEFAULT_ROW)); + rowLength = Integer.parseInt( + cmd.getOptionValue(OPT_ROW_LENGTH, String.valueOf(DEFAULT_ROW_LENGTH))); family = Bytes.toBytes(cmd.getOptionValue(OPT_CF, DEFAULT_CF)); - qual = Bytes.toBytes(cmd.getOptionValue(OPT_QUAL, DEFAULT_QUAL)); - value = Bytes.toBytes(cmd.getOptionValue(OPT_VALUE, DEFAULT_VALUE)); + qualLength = Integer.parseInt(cmd.getOptionValue(OPT_QUAL_LENGTH, + String.valueOf(DEFAULT_QUAL_LENGTH))); + valueLength = Integer.parseInt(cmd.getOptionValue(OPT_VALUE_LENGTH, + String.valueOf(DEFAULT_VALUE_LENGTH))); numOps = parseInt(cmd.getOptionValue(OPT_NUM_OPS, String.valueOf(DEFAULT_NUM_OPS)), 1, Integer.MAX_VALUE); numThreads = parseInt(cmd.getOptionValue(OPT_NUM_THREADS, String.valueOf(DEFAULT_NUM_THREADS)), 1, Integer.MAX_VALUE); - doPut = !cmd.hasOption(OPT_NO_PUT); + this.multigetBatch = parseInt(cmd.getOptionValue(OPT_GET_BATCH_SIZE, + String.valueOf(1000)), 1, Integer.MAX_VALUE); + this.multiputBatch = parseInt(cmd.getOptionValue(OPT_PUT_BATCH_SIZE, + String.valueOf(1000)), 1, Integer.MAX_VALUE); + this.useAsync = Boolean.parseBoolean(cmd.getOptionValue(OPT_ASYNC_CALLS)); + } + + public List<Get> performRandomMultiputs(BenchmarkClient benchmark, + int numMultiPutOps, int multiputBatchSize) throws IOException { + List<Get> ret = new ArrayList<Get>(); + for (int i = 0; i < numMultiPutOps; i++) { + List<Put> puts = new ArrayList<Put>(multiputBatchSize); + for (int j = 0; j < multiputBatchSize; j++) { + Put p = benchmark.createRandomPut(this.rowLength, this.family, + this.qualLength, this.valueLength); + puts.add(p); + ret.add(new Get(p.getRow())); + } + benchmark.executeMultiPut(puts); + } + return ret; } /** * Main function which does the benchmarks. * @throws IllegalAccessException * @throws InstantiationException + * @throws IOException */ @Override - protected void doWork() throws InterruptedException, InstantiationException, IllegalAccessException { + protected void doWork() throws InterruptedException, + InstantiationException, IllegalAccessException, IOException { // Initializing the required objects. BenchmarkFactory factory = (BenchmarkFactory) factoryCls.newInstance(); LOG.debug("Creating an instance of the factory class : " + factoryCls.getCanonicalName()); ExecutorService executor = Executors.newFixedThreadPool(numThreads); + Configuration conf = HBaseConfiguration.create(this.conf); + conf.setBoolean(HConstants.HTABLE_ASYNC_CALLS, this.useAsync); BenchmarkClient benchmark = factory.makeBenchmarkClient(tblName, conf); - // Performing a single put to the region server. - if (doPut) { - benchmark.executePut(benchmark.createPut(row, family, qual, value)); - Result r = benchmark.executeGet(benchmark.createGet(row, family, qual)); - if (Bytes.equals(r.getValue(family, qual), value)) { - } - doPut = false; - } + List<Get> gets = performRandomMultiputs(benchmark, this.numOps/10, + this.multiputBatch); runtimeMs = System.currentTimeMillis(); // Count down latches which let me synchronize all the benchmark workers to @@ -301,8 +345,9 @@ public class HBaseRPCBenchmarkTool exten // Spawning the worker threads here. for (int i = 0; i < numThreads; i++) { executor.submit(new WorkerThread(histogram, sumLatency, - totalOps, factory, tblName, conf, numOps, reportInterval, row, - family, qual, readySignal, startSignal, doneSignal, running)); + totalOps, factory, tblName, conf, numOps, reportInterval, rowLength, + family, qualLength, valueLength, gets, this.multigetBatch, + readySignal, startSignal, doneSignal, running)); } // Here we will wait for all the worker threads to kick off and then we let @@ -345,7 +390,6 @@ public class HBaseRPCBenchmarkTool exten * it starts running the benchmark. */ class WorkerThread extends Thread { - private final Histogram histogram; private final AtomicLong totalLatency; private final AtomicLong totalOps; @@ -354,14 +398,21 @@ public class HBaseRPCBenchmarkTool exten private final byte[] tableName; private final Configuration conf; private final int numOps; - private final byte[] row; + @SuppressWarnings("unused") + private final int rowLength; + @SuppressWarnings("unused") private final byte[] family; - private final byte[] qual; + @SuppressWarnings("unused") + private final int qualLength; + @SuppressWarnings("unused") + private final int valueLength; + private final List<Get> gets; private final CountDownLatch readySignal; // To notify the controller that this thread has started private final CountDownLatch startSignal; // To notify this thread that it is free to start private final CountDownLatch doneSignal; // To notify the controller thread to shutdown the threads. private final AtomicBoolean running; // the state variable which tells whether the threads should be running. private boolean signalledDone = false; + private final int multiGetBatch; WorkerThread(Histogram histogram, AtomicLong totalLatency, @@ -371,9 +422,12 @@ public class HBaseRPCBenchmarkTool exten Configuration conf, int numOps, long reportIntervalMs, - byte[] row, + int rowLength, byte[] family, - byte[] qual, + int qualLength, + int valueLength, + List<Get> gets, + int multiGetBatch, CountDownLatch readySignal, CountDownLatch startSignal, CountDownLatch doneSignal, @@ -385,15 +439,32 @@ public class HBaseRPCBenchmarkTool exten this.totalLatency = totalLatency; this.totalOps = totalOps; this.numOps = numOps; - this.row = row; + this.rowLength = rowLength; this.family = family; - this.qual = qual; + this.qualLength = qualLength; + this.valueLength = valueLength; + this.gets = gets; + this.multiGetBatch = multiGetBatch; this.readySignal = readySignal; this.startSignal = startSignal; this.doneSignal = doneSignal; this.running = running; } + public Result[] executeMultiGet(int batch) { + List<Get> todogets = new ArrayList<Get>(); + Random rand = new Random(); + if (batch == 1) { + return new Result[] { + benchmark.executeGet(gets.get(rand.nextInt(gets.size()))) + }; + } + for (int i = 0; i < batch; i++) { + todogets.add(new Get(this.gets.get(rand.nextInt(this.gets.size())).getRow())); + } + return benchmark.executeMultiGet(todogets); + } + @Override public void run() { LOG.debug("Worker Thread. numOps:" + numOps); @@ -411,7 +482,15 @@ public class HBaseRPCBenchmarkTool exten for (int i = 0; ; ++i) { long opStartNs = System.nanoTime(); try { - benchmark.executeGet(benchmark.createGet(row, family, qual)); + if (i % (this.numOps / 10) == 0) { + setProfilingData(true); + } + Result[] ret = executeMultiGet(this.multiGetBatch); + Preconditions.checkArgument(ret.length == multigetBatch); + if (i % (this.numOps / 10) == 0) { + printProfilingData(); + setProfilingData(false); + } } catch (Exception e) { LOG.debug("Encountered exception while performing get"); e.printStackTrace(); @@ -419,7 +498,7 @@ public class HBaseRPCBenchmarkTool exten } long delta = System.nanoTime() - opStartNs; totalLatency.addAndGet(delta); - totalOps.incrementAndGet(); + totalOps.addAndGet(this.multiGetBatch); histogram.addValue(delta); if (i >= numOps) { if (!signalledDone) { @@ -431,8 +510,12 @@ public class HBaseRPCBenchmarkTool exten } StringBuilder sb = new StringBuilder(); sb.append("Printing statistics for " + factoryCls.getName()); + sb.append(". Total Ops : "); + sb.append(totalOps.get()); + sb.append(". Get Batch Size : "); + sb.append(this.multiGetBatch); sb.append(". Average latency : "); - sb.append(sumLatency.get()/totalOps.get()); + sb.append(totalLatency.get()/totalOps.get()); sb.append("ns. "); sb.append("p95 latency : "); sb.append(histogram.getPercentileEstimate(PercentileMetric.P95)); @@ -441,8 +524,17 @@ public class HBaseRPCBenchmarkTool exten sb.append(". Throughput : "); sb.append((totalOps.get() * 1000)/ (System.currentTimeMillis() - startTime)); + sb.append(" ops/s."); LOG.debug(sb); } + + private void printProfilingData() { + this.benchmark.printProfilingData(); + } + + private void setProfilingData(boolean flag) { + this.benchmark.setProfilingData(flag); + } } public long getTotalOps() { @@ -466,7 +558,12 @@ public class HBaseRPCBenchmarkTool exten } public static void main(String[] args) { - int ret = new HBaseRPCBenchmarkTool().doStaticMain(args); + HBaseRPCBenchmarkTool tool = new HBaseRPCBenchmarkTool(); + int ret = tool.doStaticMain(args); + System.out.println("Total throughput : " + tool.getThroughput()); + System.out.println("Avg Latency : " + tool.getAverageLatency()); + System.out.println("P99 latency : " + tool.getP99Latency()); + System.out.println("P95 latency : " + tool.getP95Latency()); System.exit(ret); } } Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCProtocolComparison.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCProtocolComparison.java?rev=1584158&r1=1584157&r2=1584158&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCProtocolComparison.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCProtocolComparison.java Wed Apr 2 20:48:55 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.util.rpcbench; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -57,25 +58,27 @@ public class HBaseRPCProtocolComparison private static final int DEFAULT_NUM_OPS = 10000; private static final int DEFAULT_NUM_ROUNDS = 100; private static final int DEFAULT_NUM_THREADS = 10; - private static final String DEFAULT_ROW = "rowkey"; + private static final int DEFAULT_ROW_LENGTH = 20; private static final String DEFAULT_CF = "cf"; - private static final String DEFAULT_QUAL ="q"; - private static final String DEFAULT_VALUE = "v"; + private static final int DEFAULT_QUAL_LENGTH =10; + private static final int DEFAULT_VALUE_LENGTH = 100; private static final String DEFAULT_TABLENAME = "RPCBenchmarkingTable"; private static final int DEFAULT_ZK_PORT = 2181; - private static final boolean DEFAULT_DO_PUT = true; + private static final int DEFAULT_GET_BATCH_SIZE = 100; + private static final int DEFAULT_PUT_BATCH_SIZE = 100; private static final String OPT_CF= "cf"; - private static final String OPT_QUAL = "q"; - private static final String OPT_ROW = "r"; + private static final String OPT_QUAL_LENGTH = "q"; + private static final String OPT_ROW_LENGTH = "r"; private static final String OPT_TBL_NAME = "t"; - private static final String OPT_VALUE_LENGTH = "vlen"; + private static final String OPT_VALUE_LENGTH = "v"; private static final String OPT_CLASSES = "c"; private static final String OPT_NUM_OPS = "ops"; + private static final String OPT_GET_BATCH_SIZE = "gbatch"; + private static final String OPT_PUT_BATCH_SIZE = "pbatch"; private static final String OPT_NUM_ROUNDS = "rounds"; private static final String OPT_NUM_THREADS = "threads"; private static final String OPT_REPORT_INTERVAL = "interval"; - private static final String OPT_NO_PUT = "no_put"; private static final String OPT_ZK_QUORUM = "zk"; private static final String OPT_ZK_PORT = "zkPort"; @@ -84,31 +87,31 @@ public class HBaseRPCProtocolComparison private byte[] tblName; private String zkQuorum; private int zkPort; - private byte[] row; + private int rowLength; private byte[] family; - private byte[] qual; - private byte[] value; + private int qualLength; private int valueLength; private int numOps; + private int multigetbatch; + private int multiputbatch; private int numRounds; private int numThreads; private long reportInterval; - private boolean doPut; @Override protected void addOptions() { addOptWithArg(OPT_CLASSES, "Benchmark factory classes"); addOptWithArg(OPT_NUM_THREADS, "Number of threads"); addOptWithArg(OPT_NUM_OPS, "Number of operations to execute per thread"); + addOptWithArg(OPT_GET_BATCH_SIZE, "multiget size to execute per thread"); + addOptWithArg(OPT_PUT_BATCH_SIZE, "multiput size to execute"); addOptWithArg(OPT_TBL_NAME, "Table name"); - addOptWithArg(OPT_ZK_QUORUM, "Table name"); + addOptWithArg(OPT_ZK_QUORUM, "Zookeeper Quorum"); addOptWithArg(OPT_ZK_PORT, "Zookeeper Port"); addOptWithArg(OPT_REPORT_INTERVAL, "Reporting interval in milliseconds"); - addOptWithArg(OPT_ROW, "Row key"); - addOptWithArg(OPT_NO_PUT, - "DO NOT perform a single put (writing the value) before the benchmark"); + addOptWithArg(OPT_ROW_LENGTH, "Row key length"); addOptWithArg(OPT_CF, "Column family to use"); - addOptWithArg(OPT_QUAL, "Column qualifier to use"); + addOptWithArg(OPT_QUAL_LENGTH, "Column qualifier length to use"); addOptWithArg(OPT_VALUE_LENGTH, "Value length to use"); addOptWithArg(OPT_NUM_ROUNDS, "Number of rounds to perform the tests"); } @@ -160,22 +163,23 @@ public class HBaseRPCProtocolComparison HBaseUtils.createTableIfNotExists(conf, tblName, familyProperties, 1); } - row = Bytes.toBytes(cmd.getOptionValue(OPT_ROW, DEFAULT_ROW)); + rowLength = Integer.parseInt(cmd.getOptionValue(OPT_ROW_LENGTH, + String.valueOf(DEFAULT_ROW_LENGTH))); family = Bytes.toBytes(cmd.getOptionValue(OPT_CF, DEFAULT_CF)); - qual = Bytes.toBytes(cmd.getOptionValue(OPT_QUAL, DEFAULT_QUAL)); - valueLength = parseInt(cmd.getOptionValue(OPT_VALUE_LENGTH, DEFAULT_VALUE), - 0, Integer.MAX_VALUE); - Random r = new Random(); - value = new byte[valueLength]; - r.nextBytes(value); + qualLength = Integer.parseInt(cmd.getOptionValue(OPT_QUAL_LENGTH, + String.valueOf(DEFAULT_QUAL_LENGTH))); + valueLength = parseInt(cmd.getOptionValue(OPT_VALUE_LENGTH, + String.valueOf(DEFAULT_VALUE_LENGTH)), 0, Integer.MAX_VALUE); numOps = parseInt(cmd.getOptionValue(OPT_NUM_OPS, String.valueOf(DEFAULT_NUM_OPS)), 1, Integer.MAX_VALUE); + multigetbatch = parseInt(cmd.getOptionValue(OPT_GET_BATCH_SIZE, + String.valueOf(DEFAULT_GET_BATCH_SIZE)), 1, Integer.MAX_VALUE); + multiputbatch = parseInt(cmd.getOptionValue(OPT_PUT_BATCH_SIZE, + String.valueOf(DEFAULT_PUT_BATCH_SIZE)), 1, Integer.MAX_VALUE); numRounds = parseInt(cmd.getOptionValue(OPT_NUM_ROUNDS, String.valueOf(DEFAULT_NUM_ROUNDS)), 1, Integer.MAX_VALUE); numThreads = parseInt(cmd.getOptionValue(OPT_NUM_THREADS, String.valueOf(DEFAULT_NUM_THREADS)), 1, Integer.MAX_VALUE); - doPut = DEFAULT_DO_PUT; - doPut = !cmd.hasOption(OPT_NO_PUT); } /** @@ -216,9 +220,11 @@ public class HBaseRPCProtocolComparison long startTime = System.currentTimeMillis(); HBaseRPCBenchmarkTool tool = new HBaseRPCBenchmarkTool .Builder(factoryCls).withColumnFamily(family).withNumOps(numOps) - .withRow(row).withNumThreads(numThreads).withConf(conf) - .withQualifier(qual).withTableName(tblName).withValue(value) - .withDoPut(doPut).create(); + .withRowLength(rowLength).withNumThreads(numThreads) + .withConf(conf).withQualifierLength(qualLength) + .withMultiPutBatch(multiputbatch) + .withMultiGetBatch(multigetbatch) + .withTableName(tblName).withValue(valueLength).create(); tool.doWork(); hist.addValue(tool.getP95Latency()); runTime.addAndGet(System.currentTimeMillis() - startTime); @@ -229,6 +235,8 @@ public class HBaseRPCProtocolComparison LOG.debug("Cannot run the tool for factory : " + factoryCls.getName()); e.printStackTrace(); + } catch (IOException e) { + LOG.error("Caught unknown IOException", e); } } }); Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java?rev=1584158&r1=1584157&r2=1584158&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HadoopRPCBenchmarkClient.java Wed Apr 2 20:48:55 2014 @@ -18,73 +18,14 @@ package org.apache.hadoop.hbase.util.rpcbench; -import java.io.IOException; -import java.util.List; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; /** * This is a BenchmarkClient which performs RPC operations through HadoopRPC. */ -public class HadoopRPCBenchmarkClient implements BenchmarkClient { - private static final Log LOG = LogFactory.getLog(ThriftBenchmarkClient.class); - private HTable htable = null; +public class HadoopRPCBenchmarkClient extends BenchmarkClientImpl { HadoopRPCBenchmarkClient(HTable htable) { this.htable = htable; this.htable.setAutoFlush(true); } - - // Performing a get through thrift - @Override - public Result executeGet(Get get) { - Result r = null; - try { - r = this.htable.get(get); - } catch (IOException e) { - LOG.debug("Unable to perform get"); - e.printStackTrace(); - } - return r; - } - - // Performing a put through hadoop rpc. - @Override - public void executePut(Put put) { - try { - this.htable.put(put); - } catch (IOException e) { - LOG.debug("Unable to perform put"); - e.printStackTrace(); - } - } - - public Get createGet(byte[] row, byte[] family, byte[] qual) { - Get g = new Get(row); - g.addColumn(family, qual); - return g; - } - - public Put createPut(byte[] row, byte[] family, byte[] qual, byte[] value) { - Put p = new Put(row); - p.add(family, qual, value); - return p; - } - - @Override - public List<Result> executeScan(Scan scan) { - throw new NotImplementedException(); - } - - @Override - public Scan createScan(byte[] row, byte[] family, int nbRows) { - throw new NotImplementedException(); - } - } Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/ThriftBenchmarkClient.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/ThriftBenchmarkClient.java?rev=1584158&r1=1584157&r2=1584158&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/ThriftBenchmarkClient.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/ThriftBenchmarkClient.java Wed Apr 2 20:48:55 2014 @@ -17,75 +17,16 @@ */ package org.apache.hadoop.hbase.util.rpcbench; -import java.io.IOException; -import java.util.List; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; /** * Implements the BenchmarkClient interface and provides functions to perform * gets and puts. * */ -public class ThriftBenchmarkClient implements BenchmarkClient { - private static final Log LOG = LogFactory.getLog(ThriftBenchmarkClient.class); - private HTable htable = null; - +public class ThriftBenchmarkClient extends BenchmarkClientImpl { ThriftBenchmarkClient(HTable htable) { this.htable = htable; this.htable.setAutoFlush(true); } - - @Override - public Result executeGet(Get get) { - Result r = null; - try { - r = this.htable.get(get); - } catch (IOException e) { - LOG.debug("Unable to perform get"); - e.printStackTrace(); - } - return r; - } - - @Override - public void executePut(Put put) { - try { - this.htable.put(put); - } catch (IOException e) { - LOG.debug("Unable to perform put"); - e.printStackTrace(); - } - } - - /** - * TODO: make use of qual or get rid of it - */ - public Get createGet(byte[] row, byte[] family, byte[] qual) { - return new Get.Builder(row).addFamily(family).create(); - } - - public Put createPut(byte[] row, byte[] family, byte[] qual, byte[] value) { - Put p = new Put(row); - p.add(family, qual, value); - return p; - } - - @Override - public List<Result> executeScan(Scan scan) { - throw new NotImplementedException(); - } - - @Override - public Scan createScan(byte[] row, byte[] family, int nbRows) { - throw new NotImplementedException(); - } - }
