http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
new file mode 100644
index 0000000..23a70a9
--- /dev/null
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
@@ -0,0 +1,2627 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import static 
org.codehaus.jackson.map.SerializationConfig.Feature.SORT_PROPERTIES_ALPHABETICALLY;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.Constructor;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.text.DecimalFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Consistency;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RawAsyncTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterAllFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
+import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
+import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
+import org.apache.hadoop.hbase.trace.SpanReceiverHost;
+import org.apache.hadoop.hbase.util.*;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.apache.htrace.impl.ProbabilitySampler;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.MoreObjects;
+import 
org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.UniformReservoir;
+
+/**
+ * Script used evaluating HBase performance and scalability.  Runs a HBase
+ * client that steps through one of a set of hardcoded tests or 'experiments'
+ * (e.g. a random reads test, a random writes test, etc.). Pass on the
+ * command-line which test to run and how many clients are participating in
+ * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
+ *
+ * <p>This class sets up and runs the evaluation programs described in
+ * Section 7, <i>Performance Evaluation</i>, of the <a
+ * href="http://labs.google.com/papers/bigtable.html";>Bigtable</a>
+ * paper, pages 8-10.
+ *
+ * <p>By default, runs as a mapreduce job where each mapper runs a single test
+ * client. Can also run as a non-mapreduce, multithreaded application by
+ * specifying {@code --nomapred}. Each client does about 1GB of data, unless
+ * specified otherwise.
+ */
[email protected](HBaseInterfaceAudience.TOOLS)
+public class PerformanceEvaluation extends Configured implements Tool {
+  static final String RANDOM_SEEK_SCAN = "randomSeekScan";
+  static final String RANDOM_READ = "randomRead";
+  private static final Log LOG = 
LogFactory.getLog(PerformanceEvaluation.class.getName());
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  static {
+    MAPPER.configure(SORT_PROPERTIES_ALPHABETICALLY, true);
+  }
+
+  public static final String TABLE_NAME = "TestTable";
+  public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
+  public static final byte [] COLUMN_ZERO = Bytes.toBytes("" + 0);
+  public static final byte [] QUALIFIER_NAME = COLUMN_ZERO;
+  public static final int DEFAULT_VALUE_LENGTH = 1000;
+  public static final int ROW_LENGTH = 26;
+
+  private static final int ONE_GB = 1024 * 1024 * 1000;
+  private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
+  // TODO : should we make this configurable
+  private static final int TAG_LENGTH = 256;
+  private static final DecimalFormat FMT = new DecimalFormat("0.##");
+  private static final MathContext CXT = MathContext.DECIMAL64;
+  private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
+  private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 
1024);
+  private static final TestOptions DEFAULT_OPTS = new TestOptions();
+
+  private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
+  private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
+
+  static {
+    addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
+        "Run async random read test");
+    addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
+        "Run async random write test");
+    addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
+        "Run async sequential read test");
+    addCommandDescriptor(AsyncSequentialWriteTest.class, 
"asyncSequentialWrite",
+        "Run async sequential write test");
+    addCommandDescriptor(AsyncScanTest.class, "asyncScan",
+        "Run async scan test (read every row)");
+    addCommandDescriptor(RandomReadTest.class, RANDOM_READ,
+      "Run random read test");
+    addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
+      "Run random seek and scan 100 test");
+    addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
+      "Run random seek scan with both start and stop row (max 10 rows)");
+    addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
+      "Run random seek scan with both start and stop row (max 100 rows)");
+    addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
+      "Run random seek scan with both start and stop row (max 1000 rows)");
+    addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
+      "Run random seek scan with both start and stop row (max 10000 rows)");
+    addCommandDescriptor(RandomWriteTest.class, "randomWrite",
+      "Run random write test");
+    addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
+      "Run sequential read test");
+    addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
+      "Run sequential write test");
+    addCommandDescriptor(ScanTest.class, "scan",
+      "Run scan test (read every row)");
+    addCommandDescriptor(FilteredScanTest.class, "filterScan",
+      "Run scan test using a filter to find a specific row based on it's value 
" +
+      "(make sure to use --rows=20)");
+    addCommandDescriptor(IncrementTest.class, "increment",
+      "Increment on each row; clients overlap on keyspace so some concurrent 
operations");
+    addCommandDescriptor(AppendTest.class, "append",
+      "Append on each row; clients overlap on keyspace so some concurrent 
operations");
+    addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
+      "CheckAndMutate on each row; clients overlap on keyspace so some 
concurrent operations");
+    addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
+      "CheckAndPut on each row; clients overlap on keyspace so some concurrent 
operations");
+    addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
+      "CheckAndDelete on each row; clients overlap on keyspace so some 
concurrent operations");
+  }
+
+  /**
+   * Enum for map metrics.  Keep it out here rather than inside in the Map
+   * inner-class so we can find associated properties.
+   */
+  protected static enum Counter {
+    /** elapsed time */
+    ELAPSED_TIME,
+    /** number of rows */
+    ROWS
+  }
+
+  protected static class RunResult implements Comparable<RunResult> {
+    public RunResult(long duration, Histogram hist) {
+      this.duration = duration;
+      this.hist = hist;
+    }
+
+    public final long duration;
+    public final Histogram hist;
+
+    @Override
+    public String toString() {
+      return Long.toString(duration);
+    }
+
+    @Override public int compareTo(RunResult o) {
+      return Long.compare(this.duration, o.duration);
+    }
+  }
+
+  /**
+   * Constructor
+   * @param conf Configuration object
+   */
+  public PerformanceEvaluation(final Configuration conf) {
+    super(conf);
+  }
+
+  protected static void addCommandDescriptor(Class<? extends TestBase> 
cmdClass,
+      String name, String description) {
+    CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, 
description);
+    COMMANDS.put(name, cmdDescriptor);
+  }
+
+  /**
+   * Implementations can have their status set.
+   */
+  interface Status {
+    /**
+     * Sets status
+     * @param msg status message
+     * @throws IOException
+     */
+    void setStatus(final String msg) throws IOException;
+  }
+
+  /**
+   * MapReduce job that runs a performance evaluation client in each map task.
+   */
+  public static class EvaluationMapTask
+      extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
+
+    /** configuration parameter name that contains the command */
+    public final static String CMD_KEY = "EvaluationMapTask.command";
+    /** configuration parameter name that contains the PE impl */
+    public static final String PE_KEY = 
"EvaluationMapTask.performanceEvalImpl";
+
+    private Class<? extends Test> cmd;
+
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+      this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
+
+      // this is required so that extensions of PE are instantiated within the
+      // map reduce task...
+      Class<? extends PerformanceEvaluation> peClass =
+          forName(context.getConfiguration().get(PE_KEY), 
PerformanceEvaluation.class);
+      try {
+        
peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
+      } catch (Exception e) {
+        throw new IllegalStateException("Could not instantiate PE instance", 
e);
+      }
+    }
+
+    private <Type> Class<? extends Type> forName(String className, Class<Type> 
type) {
+      try {
+        return Class.forName(className).asSubclass(type);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalStateException("Could not find class for name: " + 
className, e);
+      }
+    }
+
+    @Override
+    protected void map(LongWritable key, Text value, final Context context)
+           throws IOException, InterruptedException {
+
+      Status status = new Status() {
+        @Override
+        public void setStatus(String msg) {
+           context.setStatus(msg);
+        }
+      };
+
+      ObjectMapper mapper = new ObjectMapper();
+      TestOptions opts = mapper.readValue(value.toString(), TestOptions.class);
+      Configuration conf = 
HBaseConfiguration.create(context.getConfiguration());
+      final Connection con = ConnectionFactory.createConnection(conf);
+      AsyncConnection asyncCon = null;
+      try {
+        asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+
+      // Evaluation task
+      RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, 
con, asyncCon, opts, status);
+      // Collect how much time the thing took. Report as map output and
+      // to the ELAPSED_TIME counter.
+      context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
+      context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
+      context.write(new LongWritable(opts.startRow), new 
LongWritable(result.duration));
+      context.progress();
+    }
+  }
+
+  /*
+   * If table does not already exist, create. Also create a table when
+   * {@code opts.presplitRegions} is specified or when the existing table's
+   * region replica count doesn't match {@code opts.replicas}.
+   */
+  static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
+    TableName tableName = TableName.valueOf(opts.tableName);
+    boolean needsDelete = false, exists = admin.tableExists(tableName);
+    boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
+      || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
+    if (!exists && isReadCmd) {
+      throw new IllegalStateException(
+        "Must specify an existing table for read commands. Run a write command 
first.");
+    }
+    HTableDescriptor desc =
+      exists ? admin.getTableDescriptor(TableName.valueOf(opts.tableName)) : 
null;
+    byte[][] splits = getSplits(opts);
+
+    // recreate the table when user has requested presplit or when existing
+    // {RegionSplitPolicy,replica count} does not match requested.
+    if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
+      || (!isReadCmd && desc != null &&
+          !StringUtils.equals(desc.getRegionSplitPolicyClassName(), 
opts.splitPolicy))
+      || (!isReadCmd && desc != null && desc.getRegionReplication() != 
opts.replicas)) {
+      needsDelete = true;
+      // wait, why did it delete my table?!?
+      LOG.debug(MoreObjects.toStringHelper("needsDelete")
+        .add("needsDelete", needsDelete)
+        .add("isReadCmd", isReadCmd)
+        .add("exists", exists)
+        .add("desc", desc)
+        .add("presplit", opts.presplitRegions)
+        .add("splitPolicy", opts.splitPolicy)
+        .add("replicas", opts.replicas));
+    }
+
+    // remove an existing table
+    if (needsDelete) {
+      if (admin.isTableEnabled(tableName)) {
+        admin.disableTable(tableName);
+      }
+      admin.deleteTable(tableName);
+    }
+
+    // table creation is necessary
+    if (!exists || needsDelete) {
+      desc = getTableDescriptor(opts);
+      if (splits != null) {
+        if (LOG.isDebugEnabled()) {
+          for (int i = 0; i < splits.length; i++) {
+            LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
+          }
+        }
+      }
+      admin.createTable(desc, splits);
+      LOG.info("Table " + desc + " created");
+    }
+    return admin.tableExists(tableName);
+  }
+
+  /**
+   * Create an HTableDescriptor from provided TestOptions.
+   */
+  protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
+    HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(opts.tableName));
+    HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
+    family.setDataBlockEncoding(opts.blockEncoding);
+    family.setCompressionType(opts.compression);
+    family.setBloomFilterType(opts.bloomType);
+    family.setBlocksize(opts.blockSize);
+    if (opts.inMemoryCF) {
+      family.setInMemory(true);
+    }
+    family.setInMemoryCompaction(opts.inMemoryCompaction);
+    desc.addFamily(family);
+    if (opts.replicas != DEFAULT_OPTS.replicas) {
+      desc.setRegionReplication(opts.replicas);
+    }
+    if (opts.splitPolicy != DEFAULT_OPTS.splitPolicy) {
+      desc.setRegionSplitPolicyClassName(opts.splitPolicy);
+    }
+    return desc;
+  }
+
+  /**
+   * generates splits based on total number of rows and specified split regions
+   */
+  protected static byte[][] getSplits(TestOptions opts) {
+    if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
+      return null;
+
+    int numSplitPoints = opts.presplitRegions - 1;
+    byte[][] splits = new byte[numSplitPoints][];
+    int jump = opts.totalRows / opts.presplitRegions;
+    for (int i = 0; i < numSplitPoints; i++) {
+      int rowkey = jump * (1 + i);
+      splits[i] = format(rowkey);
+    }
+    return splits;
+  }
+
+  /*
+   * Run all clients in this vm each to its own thread.
+   */
+  static RunResult[] doLocalClients(final TestOptions opts, final 
Configuration conf)
+      throws IOException, InterruptedException, ExecutionException {
+    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
+    assert cmd != null;
+    @SuppressWarnings("unchecked")
+    Future<RunResult>[] threads = new Future[opts.numClientThreads];
+    RunResult[] results = new RunResult[opts.numClientThreads];
+    ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
+      new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
+    final Connection con = ConnectionFactory.createConnection(conf);
+    final AsyncConnection asyncCon = 
ConnectionFactory.createAsyncConnection(conf).get();
+    for (int i = 0; i < threads.length; i++) {
+      final int index = i;
+      threads[i] = pool.submit(new Callable<RunResult>() {
+        @Override
+        public RunResult call() throws Exception {
+          TestOptions threadOpts = new TestOptions(opts);
+          if (threadOpts.startRow == 0) threadOpts.startRow = index * 
threadOpts.perClientRunRows;
+          RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, 
new Status() {
+            @Override
+            public void setStatus(final String msg) throws IOException {
+              LOG.info(msg);
+            }
+          });
+          LOG.info("Finished " + Thread.currentThread().getName() + " in " + 
run.duration +
+            "ms over " + threadOpts.perClientRunRows + " rows");
+          return run;
+        }
+      });
+    }
+    pool.shutdown();
+
+    for (int i = 0; i < threads.length; i++) {
+      try {
+        results[i] = threads[i].get();
+      } catch (ExecutionException e) {
+        throw new IOException(e.getCause());
+      }
+    }
+    final String test = cmd.getSimpleName();
+    LOG.info("[" + test + "] Summary of timings (ms): "
+             + Arrays.toString(results));
+    Arrays.sort(results);
+    long total = 0;
+    for (RunResult result : results) {
+      total += result.duration;
+    }
+    LOG.info("[" + test + "]"
+      + "\tMin: " + results[0] + "ms"
+      + "\tMax: " + results[results.length - 1] + "ms"
+      + "\tAvg: " + (total / results.length) + "ms");
+
+    con.close();
+    asyncCon.close();
+
+    return results;
+  }
+
+  /*
+   * Run a mapreduce job.  Run as many maps as asked-for clients.
+   * Before we start up the job, write out an input file with instruction
+   * per client regards which row they are to start on.
+   * @param cmd Command to run.
+   * @throws IOException
+   */
+  static Job doMapReduce(TestOptions opts, final Configuration conf)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
+    assert cmd != null;
+    Path inputDir = writeInputFile(conf, opts);
+    conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
+    conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
+    Job job = Job.getInstance(conf);
+    job.setJarByClass(PerformanceEvaluation.class);
+    job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
+
+    job.setInputFormatClass(NLineInputFormat.class);
+    NLineInputFormat.setInputPaths(job, inputDir);
+    // this is default, but be explicit about it just in case.
+    NLineInputFormat.setNumLinesPerSplit(job, 1);
+
+    job.setOutputKeyClass(LongWritable.class);
+    job.setOutputValueClass(LongWritable.class);
+
+    job.setMapperClass(EvaluationMapTask.class);
+    job.setReducerClass(LongSumReducer.class);
+
+    job.setNumReduceTasks(1);
+
+    job.setOutputFormatClass(TextOutputFormat.class);
+    TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), 
"outputs"));
+
+    TableMapReduceUtil.addDependencyJars(job);
+    TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
+      Histogram.class,     // yammer metrics
+      ObjectMapper.class); // jackson-mapper-asl
+
+    TableMapReduceUtil.initCredentials(job);
+
+    job.waitForCompletion(true);
+    return job;
+  }
+
+  /**
+   * Each client has one mapper to do the work,  and client do the resulting 
count in a map task.
+   */
+
+  static String JOB_INPUT_FILENAME = "input.txt";
+
+  /*
+   * Write input file of offsets-per-client for the mapreduce job.
+   * @param c Configuration
+   * @return Directory that contains file written whose name is 
JOB_INPUT_FILENAME
+   * @throws IOException
+   */
+  static Path writeInputFile(final Configuration c, final TestOptions opts) 
throws IOException {
+    return writeInputFile(c, opts, new Path("."));
+  }
+
+  static Path writeInputFile(final Configuration c, final TestOptions opts, 
final Path basedir)
+  throws IOException {
+    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
+    Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), 
formatter.format(new Date()));
+    Path inputDir = new Path(jobdir, "inputs");
+
+    FileSystem fs = FileSystem.get(c);
+    fs.mkdirs(inputDir);
+
+    Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
+    PrintStream out = new PrintStream(fs.create(inputFile));
+    // Make input random.
+    Map<Integer, String> m = new TreeMap<>();
+    Hash h = MurmurHash.getInstance();
+    int perClientRows = (opts.totalRows / opts.numClientThreads);
+    try {
+      for (int j = 0; j < opts.numClientThreads; j++) {
+        TestOptions next = new TestOptions(opts);
+        next.startRow = j * perClientRows;
+        next.perClientRunRows = perClientRows;
+        String s = MAPPER.writeValueAsString(next);
+        LOG.info("Client=" + j + ", input=" + s);
+        byte[] b = Bytes.toBytes(s);
+        int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
+        m.put(hash, s);
+      }
+      for (Map.Entry<Integer, String> e: m.entrySet()) {
+        out.println(e.getValue());
+      }
+    } finally {
+      out.close();
+    }
+    return inputDir;
+  }
+
+  /**
+   * Describes a command.
+   */
+  static class CmdDescriptor {
+    private Class<? extends TestBase> cmdClass;
+    private String name;
+    private String description;
+
+    CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String 
description) {
+      this.cmdClass = cmdClass;
+      this.name = name;
+      this.description = description;
+    }
+
+    public Class<? extends TestBase> getCmdClass() {
+      return cmdClass;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public String getDescription() {
+      return description;
+    }
+  }
+
+  /**
+   * Wraps up options passed to {@link 
org.apache.hadoop.hbase.PerformanceEvaluation}.
+   * This makes tracking all these arguments a little easier.
+   * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter 
(to make JSON
+   * serialization of this TestOptions class behave), and you need to add to 
the clone constructor
+   * below copying your new option from the 'that' to the 'this'.  Look for 
'clone' below.
+   */
+  static class TestOptions {
+    String cmdName = null;
+    boolean nomapred = false;
+    boolean filterAll = false;
+    int startRow = 0;
+    float size = 1.0f;
+    int perClientRunRows = DEFAULT_ROWS_PER_GB;
+    int numClientThreads = 1;
+    int totalRows = DEFAULT_ROWS_PER_GB;
+    int measureAfter = 0;
+    float sampleRate = 1.0f;
+    double traceRate = 0.0;
+    String tableName = TABLE_NAME;
+    boolean flushCommits = true;
+    boolean writeToWAL = true;
+    boolean autoFlush = false;
+    boolean oneCon = false;
+    boolean useTags = false;
+    int noOfTags = 1;
+    boolean reportLatency = false;
+    int multiGet = 0;
+    int randomSleep = 0;
+    boolean inMemoryCF = false;
+    int presplitRegions = 0;
+    int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
+    String splitPolicy = null;
+    Compression.Algorithm compression = Compression.Algorithm.NONE;
+    BloomType bloomType = BloomType.ROW;
+    int blockSize = HConstants.DEFAULT_BLOCKSIZE;
+    DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
+    boolean valueRandom = false;
+    boolean valueZipf = false;
+    int valueSize = DEFAULT_VALUE_LENGTH;
+    int period = (this.perClientRunRows / 10) == 0? perClientRunRows: 
perClientRunRows / 10;
+    int cycles = 1;
+    int columns = 1;
+    int caching = 30;
+    boolean addColumns = true;
+    MemoryCompactionPolicy inMemoryCompaction =
+        MemoryCompactionPolicy.valueOf(
+            CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
+    boolean asyncPrefetch = false;
+    boolean cacheBlocks = true;
+    Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
+
+    public TestOptions() {}
+
+    /**
+     * Clone constructor.
+     * @param that Object to copy from.
+     */
+    public TestOptions(TestOptions that) {
+      this.cmdName = that.cmdName;
+      this.cycles = that.cycles;
+      this.nomapred = that.nomapred;
+      this.startRow = that.startRow;
+      this.size = that.size;
+      this.perClientRunRows = that.perClientRunRows;
+      this.numClientThreads = that.numClientThreads;
+      this.totalRows = that.totalRows;
+      this.sampleRate = that.sampleRate;
+      this.traceRate = that.traceRate;
+      this.tableName = that.tableName;
+      this.flushCommits = that.flushCommits;
+      this.writeToWAL = that.writeToWAL;
+      this.autoFlush = that.autoFlush;
+      this.oneCon = that.oneCon;
+      this.useTags = that.useTags;
+      this.noOfTags = that.noOfTags;
+      this.reportLatency = that.reportLatency;
+      this.multiGet = that.multiGet;
+      this.inMemoryCF = that.inMemoryCF;
+      this.presplitRegions = that.presplitRegions;
+      this.replicas = that.replicas;
+      this.splitPolicy = that.splitPolicy;
+      this.compression = that.compression;
+      this.blockEncoding = that.blockEncoding;
+      this.filterAll = that.filterAll;
+      this.bloomType = that.bloomType;
+      this.blockSize = that.blockSize;
+      this.valueRandom = that.valueRandom;
+      this.valueZipf = that.valueZipf;
+      this.valueSize = that.valueSize;
+      this.period = that.period;
+      this.randomSleep = that.randomSleep;
+      this.measureAfter = that.measureAfter;
+      this.addColumns = that.addColumns;
+      this.columns = that.columns;
+      this.caching = that.caching;
+      this.inMemoryCompaction = that.inMemoryCompaction;
+      this.asyncPrefetch = that.asyncPrefetch;
+      this.cacheBlocks = that.cacheBlocks;
+      this.scanReadType = that.scanReadType;
+    }
+
+    public int getCaching() {
+      return this.caching;
+    }
+
+    public void setCaching(final int caching) {
+      this.caching = caching;
+    }
+
+    public int getColumns() {
+      return this.columns;
+    }
+
+    public void setColumns(final int columns) {
+      this.columns = columns;
+    }
+
+    public int getCycles() {
+      return this.cycles;
+    }
+
+    public void setCycles(final int cycles) {
+      this.cycles = cycles;
+    }
+
+    public boolean isValueZipf() {
+      return valueZipf;
+    }
+
+    public void setValueZipf(boolean valueZipf) {
+      this.valueZipf = valueZipf;
+    }
+
+    public String getCmdName() {
+      return cmdName;
+    }
+
+    public void setCmdName(String cmdName) {
+      this.cmdName = cmdName;
+    }
+
+    public int getRandomSleep() {
+      return randomSleep;
+    }
+
+    public void setRandomSleep(int randomSleep) {
+      this.randomSleep = randomSleep;
+    }
+
+    public int getReplicas() {
+      return replicas;
+    }
+
+    public void setReplicas(int replicas) {
+      this.replicas = replicas;
+    }
+
+    public String getSplitPolicy() {
+      return splitPolicy;
+    }
+
+    public void setSplitPolicy(String splitPolicy) {
+      this.splitPolicy = splitPolicy;
+    }
+
+    public void setNomapred(boolean nomapred) {
+      this.nomapred = nomapred;
+    }
+
+    public void setFilterAll(boolean filterAll) {
+      this.filterAll = filterAll;
+    }
+
+    public void setStartRow(int startRow) {
+      this.startRow = startRow;
+    }
+
+    public void setSize(float size) {
+      this.size = size;
+    }
+
+    public void setPerClientRunRows(int perClientRunRows) {
+      this.perClientRunRows = perClientRunRows;
+    }
+
+    public void setNumClientThreads(int numClientThreads) {
+      this.numClientThreads = numClientThreads;
+    }
+
+    public void setTotalRows(int totalRows) {
+      this.totalRows = totalRows;
+    }
+
+    public void setSampleRate(float sampleRate) {
+      this.sampleRate = sampleRate;
+    }
+
+    public void setTraceRate(double traceRate) {
+      this.traceRate = traceRate;
+    }
+
+    public void setTableName(String tableName) {
+      this.tableName = tableName;
+    }
+
+    public void setFlushCommits(boolean flushCommits) {
+      this.flushCommits = flushCommits;
+    }
+
+    public void setWriteToWAL(boolean writeToWAL) {
+      this.writeToWAL = writeToWAL;
+    }
+
+    public void setAutoFlush(boolean autoFlush) {
+      this.autoFlush = autoFlush;
+    }
+
+    public void setOneCon(boolean oneCon) {
+      this.oneCon = oneCon;
+    }
+
+    public void setUseTags(boolean useTags) {
+      this.useTags = useTags;
+    }
+
+    public void setNoOfTags(int noOfTags) {
+      this.noOfTags = noOfTags;
+    }
+
+    public void setReportLatency(boolean reportLatency) {
+      this.reportLatency = reportLatency;
+    }
+
+    public void setMultiGet(int multiGet) {
+      this.multiGet = multiGet;
+    }
+
+    public void setInMemoryCF(boolean inMemoryCF) {
+      this.inMemoryCF = inMemoryCF;
+    }
+
+    public void setPresplitRegions(int presplitRegions) {
+      this.presplitRegions = presplitRegions;
+    }
+
+    public void setCompression(Compression.Algorithm compression) {
+      this.compression = compression;
+    }
+
+    public void setBloomType(BloomType bloomType) {
+      this.bloomType = bloomType;
+    }
+
+    public void setBlockSize(int blockSize) {
+      this.blockSize = blockSize;
+    }
+
+    public void setBlockEncoding(DataBlockEncoding blockEncoding) {
+      this.blockEncoding = blockEncoding;
+    }
+
+    public void setValueRandom(boolean valueRandom) {
+      this.valueRandom = valueRandom;
+    }
+
+    public void setValueSize(int valueSize) {
+      this.valueSize = valueSize;
+    }
+
+    public void setPeriod(int period) {
+      this.period = period;
+    }
+
+    public boolean isNomapred() {
+      return nomapred;
+    }
+
+    public boolean isFilterAll() {
+      return filterAll;
+    }
+
+    public int getStartRow() {
+      return startRow;
+    }
+
+    public float getSize() {
+      return size;
+    }
+
+    public int getPerClientRunRows() {
+      return perClientRunRows;
+    }
+
+    public int getNumClientThreads() {
+      return numClientThreads;
+    }
+
+    public int getTotalRows() {
+      return totalRows;
+    }
+
+    public float getSampleRate() {
+      return sampleRate;
+    }
+
+    public double getTraceRate() {
+      return traceRate;
+    }
+
+    public String getTableName() {
+      return tableName;
+    }
+
+    public boolean isFlushCommits() {
+      return flushCommits;
+    }
+
+    public boolean isWriteToWAL() {
+      return writeToWAL;
+    }
+
+    public boolean isAutoFlush() {
+      return autoFlush;
+    }
+
+    public boolean isUseTags() {
+      return useTags;
+    }
+
+    public int getNoOfTags() {
+      return noOfTags;
+    }
+
+    public boolean isReportLatency() {
+      return reportLatency;
+    }
+
+    public int getMultiGet() {
+      return multiGet;
+    }
+
+    public boolean isInMemoryCF() {
+      return inMemoryCF;
+    }
+
+    public int getPresplitRegions() {
+      return presplitRegions;
+    }
+
+    public Compression.Algorithm getCompression() {
+      return compression;
+    }
+
+    public DataBlockEncoding getBlockEncoding() {
+      return blockEncoding;
+    }
+
+    public boolean isValueRandom() {
+      return valueRandom;
+    }
+
+    public int getValueSize() {
+      return valueSize;
+    }
+
+    public int getPeriod() {
+      return period;
+    }
+
+    public BloomType getBloomType() {
+      return bloomType;
+    }
+
+    public int getBlockSize() {
+      return blockSize;
+    }
+
+    public boolean isOneCon() {
+      return oneCon;
+    }
+
+    public int getMeasureAfter() {
+      return measureAfter;
+    }
+
+    public void setMeasureAfter(int measureAfter) {
+      this.measureAfter = measureAfter;
+    }
+
+    public boolean getAddColumns() {
+      return addColumns;
+    }
+
+    public void setAddColumns(boolean addColumns) {
+      this.addColumns = addColumns;
+    }
+
+    public void setInMemoryCompaction(MemoryCompactionPolicy 
inMemoryCompaction) {
+      this.inMemoryCompaction = inMemoryCompaction;
+    }
+
+    public MemoryCompactionPolicy getInMemoryCompaction() {
+      return this.inMemoryCompaction;
+    }
+  }
+
+  /*
+   * A test.
+   * Subclass to particularize what happens per row.
+   */
+  static abstract class TestBase {
+    // Below is make it so when Tests are all running in the one
+    // jvm, that they each have a differently seeded Random.
+    private static final Random randomSeed = new 
Random(System.currentTimeMillis());
+
+    private static long nextRandomSeed() {
+      return randomSeed.nextLong();
+    }
+    private final int everyN;
+
+    protected final Random rand = new Random(nextRandomSeed());
+    protected final Configuration conf;
+    protected final TestOptions opts;
+
+    private final Status status;
+    private final Sampler<?> traceSampler;
+    private final SpanReceiverHost receiverHost;
+
+    private String testName;
+    private Histogram latencyHistogram;
+    private Histogram valueSizeHistogram;
+    private RandomDistribution.Zipf zipf;
+
+    /**
+     * Note that all subclasses of this class must provide a public constructor
+     * that has the exact same list of arguments.
+     */
+    TestBase(final Configuration conf, final TestOptions options, final Status 
status) {
+      this.conf = conf;
+      this.receiverHost = this.conf == null? null: 
SpanReceiverHost.getInstance(conf);
+      this.opts = options;
+      this.status = status;
+      this.testName = this.getClass().getSimpleName();
+      if (options.traceRate >= 1.0) {
+        this.traceSampler = Sampler.ALWAYS;
+      } else if (options.traceRate > 0.0) {
+        conf.setDouble("hbase.sampler.fraction", options.traceRate);
+        this.traceSampler = new ProbabilitySampler(new 
HBaseHTraceConfiguration(conf));
+      } else {
+        this.traceSampler = Sampler.NEVER;
+      }
+      everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
+      if (options.isValueZipf()) {
+        this.zipf = new RandomDistribution.Zipf(this.rand, 1, 
options.getValueSize(), 1.2);
+      }
+      LOG.info("Sampling 1 every " + everyN + " out of " + 
opts.perClientRunRows + " total rows.");
+    }
+
+    int getValueLength(final Random r) {
+      if (this.opts.isValueRandom()) return Math.abs(r.nextInt() % 
opts.valueSize);
+      else if (this.opts.isValueZipf()) return Math.abs(this.zipf.nextInt());
+      else return opts.valueSize;
+    }
+
+    void updateValueSize(final Result [] rs) throws IOException {
+      if (rs == null || !isRandomValueSize()) return;
+      for (Result r: rs) updateValueSize(r);
+    }
+
+    void updateValueSize(final Result r) throws IOException {
+      if (r == null || !isRandomValueSize()) return;
+      int size = 0;
+      for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
+        size += scanner.current().getValueLength();
+      }
+      updateValueSize(size);
+    }
+
+    void updateValueSize(final int valueSize) {
+      if (!isRandomValueSize()) return;
+      this.valueSizeHistogram.update(valueSize);
+    }
+
+    String generateStatus(final int sr, final int i, final int lr) {
+      return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
+        (!isRandomValueSize()? "": ", value size " + 
getShortValueSizeReport());
+    }
+
+    boolean isRandomValueSize() {
+      return opts.valueRandom;
+    }
+
+    protected int getReportingPeriod() {
+      return opts.period;
+    }
+
+    /**
+     * Populated by testTakedown. Only implemented by RandomReadTest at the 
moment.
+     */
+    public Histogram getLatencyHistogram() {
+      return latencyHistogram;
+    }
+
+    void testSetup() throws IOException {
+      createConnection();
+      onStartup();
+      latencyHistogram = YammerHistogramUtils.newHistogram(new 
UniformReservoir(1024 * 500));
+      valueSizeHistogram = YammerHistogramUtils.newHistogram(new 
UniformReservoir(1024 * 500));
+    }
+
+    abstract void createConnection() throws IOException;
+
+    abstract void onStartup() throws IOException;
+
+    void testTakedown() throws IOException {
+      onTakedown();
+      // Print all stats for this thread continuously.
+      // Synchronize on Test.class so different threads don't intermingle the
+      // output. We can't use 'this' here because each thread has its own 
instance of Test class.
+      synchronized (Test.class) {
+        status.setStatus("Test : " + testName + ", Thread : " + 
Thread.currentThread().getName());
+        status.setStatus("Latency (us) : " + 
YammerHistogramUtils.getHistogramReport(
+            latencyHistogram));
+        status.setStatus("Num measures (latency) : " + 
latencyHistogram.getCount());
+        
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
+        status.setStatus("ValueSize (bytes) : "
+            + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
+        status.setStatus("Num measures (ValueSize): " + 
valueSizeHistogram.getCount());
+        
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
+      }
+      closeConnection();
+      receiverHost.closeReceivers();
+    }
+
+    abstract void onTakedown() throws IOException;
+
+    abstract void closeConnection() throws IOException;
+
+    /*
+     * Run test
+     * @return Elapsed time.
+     * @throws IOException
+     */
+    long test() throws IOException, InterruptedException {
+      testSetup();
+      LOG.info("Timed test starting in thread " + 
Thread.currentThread().getName());
+      final long startTime = System.nanoTime();
+      try {
+        testTimed();
+      } finally {
+        testTakedown();
+      }
+      return (System.nanoTime() - startTime) / 1000000;
+    }
+
+    int getStartRow() {
+      return opts.startRow;
+    }
+
+    int getLastRow() {
+      return getStartRow() + opts.perClientRunRows;
+    }
+
+    /**
+     * Provides an extension point for tests that don't want a per row 
invocation.
+     */
+    void testTimed() throws IOException, InterruptedException {
+      int startRow = getStartRow();
+      int lastRow = getLastRow();
+      // Report on completion of 1/10th of total.
+      for (int ii = 0; ii < opts.cycles; ii++) {
+        if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
+        for (int i = startRow; i < lastRow; i++) {
+          if (i % everyN != 0) continue;
+          long startTime = System.nanoTime();
+          TraceScope scope = Trace.startSpan("test row", traceSampler);
+          try {
+            testRow(i);
+          } finally {
+            scope.close();
+          }
+          if ( (i - startRow) > opts.measureAfter) {
+            // If multiget is enabled, say set to 10, testRow() returns 
immediately first 9 times
+            // and sends the actual get request in the 10th iteration. We 
should only set latency
+            // when actual request is sent because otherwise it turns out to 
be 0.
+            if (opts.multiGet == 0 || (i - startRow + 1) % opts.multiGet == 0) 
{
+              latencyHistogram.update((System.nanoTime() - startTime) / 1000);
+            }
+            if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
+              status.setStatus(generateStatus(startRow, i, lastRow));
+            }
+          }
+        }
+      }
+    }
+
+    /**
+     * @return Subset of the histograms' calculation.
+     */
+    public String getShortLatencyReport() {
+      return 
YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
+    }
+
+    /**
+     * @return Subset of the histograms' calculation.
+     */
+    public String getShortValueSizeReport() {
+      return 
YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
+    }
+
+    /*
+    * Test for individual row.
+    * @param i Row index.
+    */
+    abstract void testRow(final int i) throws IOException, 
InterruptedException;
+  }
+
+  static abstract class Test extends TestBase {
+    protected Connection connection;
+
+    Test(final Connection con, final TestOptions options, final Status status) 
{
+      super(con == null ? HBaseConfiguration.create() : 
con.getConfiguration(), options, status);
+      this.connection = con;
+    }
+
+    @Override
+    void createConnection() throws IOException {
+      if (!opts.isOneCon()) {
+        this.connection = ConnectionFactory.createConnection(conf);
+      }
+    }
+
+    @Override
+    void closeConnection() throws IOException {
+      if (!opts.isOneCon()) {
+        this.connection.close();
+      }
+    }
+  }
+
+  static abstract class AsyncTest extends TestBase {
+    protected AsyncConnection connection;
+
+    AsyncTest(final AsyncConnection con, final TestOptions options, final 
Status status) {
+      super(con == null ? HBaseConfiguration.create() : 
con.getConfiguration(), options, status);
+      this.connection = con;
+    }
+
+    @Override
+    void createConnection() {
+      if (!opts.isOneCon()) {
+        try {
+          this.connection = 
ConnectionFactory.createAsyncConnection(conf).get();
+        } catch (InterruptedException | ExecutionException e) {
+          LOG.error("Failed to create async connection", e);
+        }
+      }
+    }
+
+    @Override
+    void closeConnection() throws IOException {
+      if (!opts.isOneCon()) {
+        this.connection.close();
+      }
+    }
+  }
+
+  static abstract class TableTest extends Test {
+    protected Table table;
+
+    TableTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void onStartup() throws IOException {
+      this.table = connection.getTable(TableName.valueOf(opts.tableName));
+    }
+
+    @Override
+    void onTakedown() throws IOException {
+      table.close();
+    }
+  }
+
+  static abstract class AsyncTableTest extends AsyncTest {
+    protected RawAsyncTable table;
+
+    AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void onStartup() throws IOException {
+      this.table = connection.getRawTable(TableName.valueOf(opts.tableName));
+    }
+
+    @Override
+    void onTakedown() throws IOException {
+    }
+  }
+
+  static class AsyncRandomReadTest extends AsyncTableTest {
+    private final Consistency consistency;
+    private ArrayList<Get> gets;
+    private Random rd = new Random();
+
+    AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status 
status) {
+      super(con, options, status);
+      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : 
Consistency.TIMELINE;
+      if (opts.multiGet > 0) {
+        LOG.info("MultiGet enabled. Sending GETs in batches of " + 
opts.multiGet + ".");
+        this.gets = new ArrayList<>(opts.multiGet);
+      }
+    }
+
+    @Override
+    void testRow(final int i) throws IOException, InterruptedException {
+      if (opts.randomSleep > 0) {
+        Thread.sleep(rd.nextInt(opts.randomSleep));
+      }
+      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
+      if (opts.addColumns) {
+        get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        get.addFamily(FAMILY_NAME);
+      }
+      if (opts.filterAll) {
+        get.setFilter(new FilterAllFilter());
+      }
+      get.setConsistency(consistency);
+      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
+      try {
+        if (opts.multiGet > 0) {
+          this.gets.add(get);
+          if (this.gets.size() == opts.multiGet) {
+            Result[] rs =
+                this.table.get(this.gets).stream().map(f -> 
propagate(f::get)).toArray(Result[]::new);
+            updateValueSize(rs);
+            this.gets.clear();
+          }
+        } else {
+          updateValueSize(this.table.get(get).get());
+        }
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
+
+    public static RuntimeException runtime(Throwable e) {
+      if (e instanceof RuntimeException) {
+        return (RuntimeException) e;
+      }
+      return new RuntimeException(e);
+    }
+
+    public static <V> V propagate(Callable<V> callable) {
+      try {
+        return callable.call();
+      } catch (Exception e) {
+        throw runtime(e);
+      }
+    }
+
+    @Override
+    protected int getReportingPeriod() {
+      int period = opts.perClientRunRows / 10;
+      return period == 0 ? opts.perClientRunRows : period;
+    }
+
+    @Override
+    protected void testTakedown() throws IOException {
+      if (this.gets != null && this.gets.size() > 0) {
+        this.table.get(gets);
+        this.gets.clear();
+      }
+      super.testTakedown();
+    }
+  }
+
+  static class AsyncRandomWriteTest extends AsyncTableTest {
+    AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status 
status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException, InterruptedException {
+      byte[] row = getRandomRow(this.rand, opts.totalRows);
+      Put put = new Put(row);
+      for (int column = 0; column < opts.columns; column++) {
+        byte[] qualifier = column == 0 ? COLUMN_ZERO : Bytes.toBytes("" + 
column);
+        byte[] value = generateData(this.rand, getValueLength(this.rand));
+        if (opts.useTags) {
+          byte[] tag = generateData(this.rand, TAG_LENGTH);
+          Tag[] tags = new Tag[opts.noOfTags];
+          for (int n = 0; n < opts.noOfTags; n++) {
+            Tag t = new ArrayBackedTag((byte) n, tag);
+            tags[n] = t;
+          }
+          KeyValue kv =
+              new KeyValue(row, FAMILY_NAME, qualifier, 
HConstants.LATEST_TIMESTAMP, value, tags);
+          put.add(kv);
+          updateValueSize(kv.getValueLength());
+        } else {
+          put.addColumn(FAMILY_NAME, qualifier, value);
+          updateValueSize(value.length);
+        }
+      }
+      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : 
Durability.SKIP_WAL);
+      try {
+        table.put(put).get();
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  static class AsyncScanTest extends AsyncTableTest {
+    private ResultScanner testScanner;
+    private AsyncTable asyncTable;
+
+    AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void onStartup() throws IOException {
+      this.asyncTable =
+          connection.getTable(TableName.valueOf(opts.tableName),
+            
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
+    }
+
+    @Override
+    void testTakedown() throws IOException {
+      if (this.testScanner != null) {
+        this.testScanner.close();
+      }
+      super.testTakedown();
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      if (this.testScanner == null) {
+        Scan scan =
+            new 
Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
+                
.setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
+                .setReadType(opts.scanReadType);
+        if (opts.addColumns) {
+          scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+        } else {
+          scan.addFamily(FAMILY_NAME);
+        }
+        if (opts.filterAll) {
+          scan.setFilter(new FilterAllFilter());
+        }
+        this.testScanner = asyncTable.getScanner(scan);
+      }
+      Result r = testScanner.next();
+      updateValueSize(r);
+    }
+  }
+
+  static class AsyncSequentialReadTest extends AsyncTableTest {
+    AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status 
status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException, InterruptedException {
+      Get get = new Get(format(i));
+      if (opts.addColumns) {
+        get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      }
+      if (opts.filterAll) {
+        get.setFilter(new FilterAllFilter());
+      }
+      try {
+        updateValueSize(table.get(get).get());
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  static class AsyncSequentialWriteTest extends AsyncTableTest {
+    AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status 
status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException, InterruptedException {
+      byte[] row = format(i);
+      Put put = new Put(row);
+      for (int column = 0; column < opts.columns; column++) {
+        byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + 
column);
+        byte[] value = generateData(this.rand, getValueLength(this.rand));
+        if (opts.useTags) {
+          byte[] tag = generateData(this.rand, TAG_LENGTH);
+          Tag[] tags = new Tag[opts.noOfTags];
+          for (int n = 0; n < opts.noOfTags; n++) {
+            Tag t = new ArrayBackedTag((byte) n, tag);
+            tags[n] = t;
+          }
+          KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, 
HConstants.LATEST_TIMESTAMP,
+              value, tags);
+          put.add(kv);
+          updateValueSize(kv.getValueLength());
+        } else {
+          put.addColumn(FAMILY_NAME, qualifier, value);
+          updateValueSize(value.length);
+        }
+      }
+      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : 
Durability.SKIP_WAL);
+      try {
+        table.put(put).get();
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  static abstract class BufferedMutatorTest extends Test {
+    protected BufferedMutator mutator;
+    protected Table table;
+
+    BufferedMutatorTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void onStartup() throws IOException {
+      this.mutator = 
connection.getBufferedMutator(TableName.valueOf(opts.tableName));
+      this.table = connection.getTable(TableName.valueOf(opts.tableName));
+    }
+
+    @Override
+    void onTakedown() throws IOException {
+      mutator.close();
+      table.close();
+    }
+  }
+
+  static class RandomSeekScanTest extends TableTest {
+    RandomSeekScanTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      Scan scan = new Scan().withStartRow(getRandomRow(this.rand, 
opts.totalRows))
+          .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
+          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType);
+      FilterList list = new FilterList();
+      if (opts.addColumns) {
+        scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        scan.addFamily(FAMILY_NAME);
+      }
+      if (opts.filterAll) {
+        list.addFilter(new FilterAllFilter());
+      }
+      list.addFilter(new WhileMatchFilter(new PageFilter(120)));
+      scan.setFilter(list);
+      ResultScanner s = this.table.getScanner(scan);
+      for (Result rr; (rr = s.next()) != null;) {
+        updateValueSize(rr);
+      }
+      s.close();
+    }
+
+    @Override
+    protected int getReportingPeriod() {
+      int period = opts.perClientRunRows / 100;
+      return period == 0 ? opts.perClientRunRows : period;
+    }
+
+  }
+
+  static abstract class RandomScanWithRangeTest extends TableTest {
+    RandomScanWithRangeTest(Connection con, TestOptions options, Status 
status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
+      Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
+          .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
+          
.setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
+          .setReadType(opts.scanReadType);
+      if (opts.filterAll) {
+        scan.setFilter(new FilterAllFilter());
+      }
+      if (opts.addColumns) {
+        scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        scan.addFamily(FAMILY_NAME);
+      }
+      Result r = null;
+      int count = 0;
+      ResultScanner s = this.table.getScanner(scan);
+      for (; (r = s.next()) != null;) {
+        updateValueSize(r);
+        count++;
+      }
+      if (i % 100 == 0) {
+        LOG.info(String.format("Scan for key range %s - %s returned %s rows",
+            Bytes.toString(startAndStopRow.getFirst()),
+            Bytes.toString(startAndStopRow.getSecond()), count));
+      }
+
+      s.close();
+    }
+
+    protected abstract Pair<byte[],byte[]> getStartAndStopRow();
+
+    protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
+      int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
+      int stop = start + maxRange;
+      return new Pair<>(format(start), format(stop));
+    }
+
+    @Override
+    protected int getReportingPeriod() {
+      int period = opts.perClientRunRows / 100;
+      return period == 0? opts.perClientRunRows: period;
+    }
+  }
+
+  static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
+    RandomScanWithRange10Test(Connection con, TestOptions options, Status 
status) {
+      super(con, options, status);
+    }
+
+    @Override
+    protected Pair<byte[], byte[]> getStartAndStopRow() {
+      return generateStartAndStopRows(10);
+    }
+  }
+
+  static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
+    RandomScanWithRange100Test(Connection con, TestOptions options, Status 
status) {
+      super(con, options, status);
+    }
+
+    @Override
+    protected Pair<byte[], byte[]> getStartAndStopRow() {
+      return generateStartAndStopRows(100);
+    }
+  }
+
+  static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
+    RandomScanWithRange1000Test(Connection con, TestOptions options, Status 
status) {
+      super(con, options, status);
+    }
+
+    @Override
+    protected Pair<byte[], byte[]> getStartAndStopRow() {
+      return generateStartAndStopRows(1000);
+    }
+  }
+
+  static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
+    RandomScanWithRange10000Test(Connection con, TestOptions options, Status 
status) {
+      super(con, options, status);
+    }
+
+    @Override
+    protected Pair<byte[], byte[]> getStartAndStopRow() {
+      return generateStartAndStopRows(10000);
+    }
+  }
+
+  static class RandomReadTest extends TableTest {
+    private final Consistency consistency;
+    private ArrayList<Get> gets;
+    private Random rd = new Random();
+
+    RandomReadTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+      consistency = options.replicas == DEFAULT_OPTS.replicas ? null : 
Consistency.TIMELINE;
+      if (opts.multiGet > 0) {
+        LOG.info("MultiGet enabled. Sending GETs in batches of " + 
opts.multiGet + ".");
+        this.gets = new ArrayList<>(opts.multiGet);
+      }
+    }
+
+    @Override
+    void testRow(final int i) throws IOException, InterruptedException {
+      if (opts.randomSleep > 0) {
+        Thread.sleep(rd.nextInt(opts.randomSleep));
+      }
+      Get get = new Get(getRandomRow(this.rand, opts.totalRows));
+      if (opts.addColumns) {
+        get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        get.addFamily(FAMILY_NAME);
+      }
+      if (opts.filterAll) {
+        get.setFilter(new FilterAllFilter());
+      }
+      get.setConsistency(consistency);
+      if (LOG.isTraceEnabled()) LOG.trace(get.toString());
+      if (opts.multiGet > 0) {
+        this.gets.add(get);
+        if (this.gets.size() == opts.multiGet) {
+          Result [] rs = this.table.get(this.gets);
+          updateValueSize(rs);
+          this.gets.clear();
+        }
+      } else {
+        updateValueSize(this.table.get(get));
+      }
+    }
+
+    @Override
+    protected int getReportingPeriod() {
+      int period = opts.perClientRunRows / 10;
+      return period == 0 ? opts.perClientRunRows : period;
+    }
+
+    @Override
+    protected void testTakedown() throws IOException {
+      if (this.gets != null && this.gets.size() > 0) {
+        this.table.get(gets);
+        this.gets.clear();
+      }
+      super.testTakedown();
+    }
+  }
+
+  static class RandomWriteTest extends BufferedMutatorTest {
+    RandomWriteTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      byte[] row = getRandomRow(this.rand, opts.totalRows);
+      Put put = new Put(row);
+      for (int column = 0; column < opts.columns; column++) {
+        byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + 
column);
+        byte[] value = generateData(this.rand, getValueLength(this.rand));
+        if (opts.useTags) {
+          byte[] tag = generateData(this.rand, TAG_LENGTH);
+          Tag[] tags = new Tag[opts.noOfTags];
+          for (int n = 0; n < opts.noOfTags; n++) {
+            Tag t = new ArrayBackedTag((byte) n, tag);
+            tags[n] = t;
+          }
+          KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, 
HConstants.LATEST_TIMESTAMP,
+              value, tags);
+          put.add(kv);
+          updateValueSize(kv.getValueLength());
+        } else {
+          put.addColumn(FAMILY_NAME, qualifier, value);
+          updateValueSize(value.length);
+        }
+      }
+      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : 
Durability.SKIP_WAL);
+      if (opts.autoFlush) {
+        table.put(put);
+      } else {
+        mutator.mutate(put);
+      }
+    }
+  }
+
+  static class ScanTest extends TableTest {
+    private ResultScanner testScanner;
+
+    ScanTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testTakedown() throws IOException {
+      if (this.testScanner != null) {
+        this.testScanner.close();
+      }
+      super.testTakedown();
+    }
+
+
+    @Override
+    void testRow(final int i) throws IOException {
+      if (this.testScanner == null) {
+        Scan scan = new 
Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
+            
.setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
+            .setReadType(opts.scanReadType);
+        if (opts.addColumns) {
+          scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+        } else {
+          scan.addFamily(FAMILY_NAME);
+        }
+        if (opts.filterAll) {
+          scan.setFilter(new FilterAllFilter());
+        }
+        this.testScanner = table.getScanner(scan);
+      }
+      Result r = testScanner.next();
+      updateValueSize(r);
+    }
+  }
+
+  /**
+   * Base class for operations that are CAS-like; that read a value and then 
set it based off what
+   * they read. In this category is increment, append, checkAndPut, etc.
+   *
+   * <p>These operations also want some concurrency going on. Usually when 
these tests run, they
+   * operate in their own part of the key range. In CASTest, we will have them 
all overlap on the
+   * same key space. We do this with our getStartRow and getLastRow overrides.
+   */
+  static abstract class CASTableTest extends TableTest {
+    private final byte [] qualifier;
+    CASTableTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+      qualifier = Bytes.toBytes(this.getClass().getSimpleName());
+    }
+
+    byte [] getQualifier() {
+      return this.qualifier;
+    }
+
+    @Override
+    int getStartRow() {
+      return 0;
+    }
+
+    @Override
+    int getLastRow() {
+      return opts.perClientRunRows;
+    }
+  }
+
+  static class IncrementTest extends CASTableTest {
+    IncrementTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      Increment increment = new Increment(format(i));
+      increment.addColumn(FAMILY_NAME, getQualifier(), 1l);
+      updateValueSize(this.table.increment(increment));
+    }
+  }
+
+  static class AppendTest extends CASTableTest {
+    AppendTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      byte [] bytes = format(i);
+      Append append = new Append(bytes);
+      append.addColumn(FAMILY_NAME, getQualifier(), bytes);
+      updateValueSize(this.table.append(append));
+    }
+  }
+
+  static class CheckAndMutateTest extends CASTableTest {
+    CheckAndMutateTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      byte [] bytes = format(i);
+      // Put a known value so when we go to check it, it is there.
+      Put put = new Put(bytes);
+      put.addColumn(FAMILY_NAME, getQualifier(), bytes);
+      this.table.put(put);
+      RowMutations mutations = new RowMutations(bytes);
+      mutations.add(put);
+      this.table.checkAndMutate(bytes, FAMILY_NAME, getQualifier(), 
CompareOp.EQUAL, bytes,
+          mutations);
+    }
+  }
+
+  static class CheckAndPutTest extends CASTableTest {
+    CheckAndPutTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      byte [] bytes = format(i);
+      // Put a known value so when we go to check it, it is there.
+      Put put = new Put(bytes);
+      put.addColumn(FAMILY_NAME, getQualifier(), bytes);
+      this.table.put(put);
+      this.table.checkAndPut(bytes, FAMILY_NAME, getQualifier(), 
CompareOp.EQUAL, bytes, put);
+    }
+  }
+
+  static class CheckAndDeleteTest extends CASTableTest {
+    CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      byte [] bytes = format(i);
+      // Put a known value so when we go to check it, it is there.
+      Put put = new Put(bytes);
+      put.addColumn(FAMILY_NAME, getQualifier(), bytes);
+      this.table.put(put);
+      Delete delete = new Delete(put.getRow());
+      delete.addColumn(FAMILY_NAME, getQualifier());
+      this.table.checkAndDelete(bytes, FAMILY_NAME, getQualifier(), 
CompareOp.EQUAL, bytes, delete);
+    }
+  }
+
+  static class SequentialReadTest extends TableTest {
+    SequentialReadTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      Get get = new Get(format(i));
+      if (opts.addColumns) {
+        get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      }
+      if (opts.filterAll) {
+        get.setFilter(new FilterAllFilter());
+      }
+      updateValueSize(table.get(get));
+    }
+  }
+
+  static class SequentialWriteTest extends BufferedMutatorTest {
+    SequentialWriteTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(final int i) throws IOException {
+      byte[] row = format(i);
+      Put put = new Put(row);
+      for (int column = 0; column < opts.columns; column++) {
+        byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + 
column);
+        byte[] value = generateData(this.rand, getValueLength(this.rand));
+        if (opts.useTags) {
+          byte[] tag = generateData(this.rand, TAG_LENGTH);
+          Tag[] tags = new Tag[opts.noOfTags];
+          for (int n = 0; n < opts.noOfTags; n++) {
+            Tag t = new ArrayBackedTag((byte) n, tag);
+            tags[n] = t;
+          }
+          KeyValue kv = new KeyValue(row, FAMILY_NAME, qualifier, 
HConstants.LATEST_TIMESTAMP,
+              value, tags);
+          put.add(kv);
+          updateValueSize(kv.getValueLength());
+        } else {
+          put.addColumn(FAMILY_NAME, qualifier, value);
+          updateValueSize(value.length);
+        }
+      }
+      put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : 
Durability.SKIP_WAL);
+      if (opts.autoFlush) {
+        table.put(put);
+      } else {
+        mutator.mutate(put);
+      }
+    }
+  }
+
+  static class FilteredScanTest extends TableTest {
+    protected static final Log LOG = 
LogFactory.getLog(FilteredScanTest.class.getName());
+
+    FilteredScanTest(Connection con, TestOptions options, Status status) {
+      super(con, options, status);
+    }
+
+    @Override
+    void testRow(int i) throws IOException {
+      byte[] value = generateData(this.rand, getValueLength(this.rand));
+      Scan scan = constructScan(value);
+      ResultScanner scanner = null;
+      try {
+        scanner = this.table.getScanner(scan);
+        for (Result r = null; (r = scanner.next()) != null;) {
+          updateValueSize(r);
+        }
+      } finally {
+        if (scanner != null) scanner.close();
+      }
+    }
+
+    protected Scan constructScan(byte[] valuePrefix) throws IOException {
+      FilterList list = new FilterList();
+      Filter filter = new SingleColumnValueFilter(
+          FAMILY_NAME, COLUMN_ZERO, CompareFilter.CompareOp.EQUAL,
+          new BinaryComparator(valuePrefix)
+      );
+      list.addFilter(filter);
+      if(opts.filterAll) {
+        list.addFilter(new FilterAllFilter());
+      }
+      Scan scan = new 
Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
+          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType);
+      if (opts.addColumns) {
+        scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
+      } else {
+        scan.addFamily(FAMILY_NAME);
+      }
+      scan.setFilter(list);
+      return scan;
+    }
+  }
+
+  /**
+   * Compute a throughput rate in MB/s.
+   * @param rows Number of records consumed.
+   * @param timeMs Time taken in milliseconds.
+   * @return String value with label, ie '123.76 MB/s'
+   */
+  private static String calculateMbps(int rows, long timeMs, final int 
valueSize, int columns) {
+    BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
+      ((valueSize + FAMILY_NAME.length + COLUMN_ZERO.length) * columns));
+    BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
+      .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
+      .divide(BYTES_PER_MB, CXT);
+    return FMT.format(mbps) + " MB/s";
+  }
+
+  /*
+   * Format passed integer.
+   * @param number
+   * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of 
passed
+   * number (Does absolute in case number is negative).
+   */
+  public static byte [] format(final int number) {
+    byte [] b = new byte[ROW_LENGTH];
+    int d = Math.abs(number);
+    for (int i = b.length - 1; i >= 0; i--) {
+      b[i] = (byte)((d % 10) + '0');
+      d /= 10;
+    }
+    return b;
+  }
+
+  /*
+   * This method takes some time and is done inline uploading data.  For
+   * example, doing the mapfile test, generation of the key and value
+   * consumes about 30% of CPU time.
+   * @return Generated random value to insert into a table cell.
+   */
+  public static byte[] generateData(final Random r, int length) {
+    byte [] b = new byte [length];
+    int i;
+
+    for(i = 0; i < (length-8); i += 8) {
+      b[i] = (byte) (65 + r.nextInt(26));
+      b[i+1] = b[i];
+      b[i+2] = b[i];
+      b[i+3] = b[i];
+      b[i+4] = b[i];
+      b[i+5] = b[i];
+      b[i+6] = b[i];
+      b[i+7] = b[i];
+    }
+
+    byte a = (byte) (65 + r.nextInt(26));
+    for(; i < length; i++) {
+      b[i] = a;
+    }
+    return b;
+  }
+
+  static byte [] getRandomRow(final Random random, final int totalRows) {
+    return format(generateRandomRow(random, totalRows));
+  }
+
+  static int generateRandomRow(final Random random, final int totalRows) {
+    return random.nextInt(Integer.MAX_VALUE) % totalRows;
+  }
+
+  static RunResult runOneClient(final Class<? extends TestBase> cmd, 
Configuration conf,
+      Connection con, AsyncConnection asyncCon, TestOptions opts, final Status 
status)
+      throws IOException, InterruptedException {
+    status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for "
+        + opts.perClientRunRows + " rows");
+    long totalElapsedTime;
+
+    final TestBase t;
+    try {
+      if (AsyncTest.class.isAssignableFrom(cmd)) {
+        Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
+        Constructor<? extends AsyncTest> constructor =
+            newCmd.getDeclaredConstructor(AsyncConnection.class, 
TestOptions.class, Status.class);
+        t = constructor.newInstance(asyncCon, opts, status);
+      } else {
+        Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
+        Constructor<? extends Test> constructor =
+            newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, 
Status.class);
+        t = constructor.newInstance(con, opts, status);
+      }
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException("Invalid command class: " + 
cmd.getName()
+          + ".  It does not provide a constructor as described by "
+          + "the javadoc comment.  Available constructors are: "
+          + Arrays.toString(cmd.getConstructors()));
+    } catch (Exception e) {
+      throw new IllegalStateException("Failed to construct command class", e);
+    }
+    totalElapsedTime = t.test();
+
+    status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
+      "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " 
rows" +
+      " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), 
totalElapsedTime,
+          getAverageValueLength(opts), opts.columns) + ")");
+
+    return new RunResult(totalElapsedTime, t.getLatencyHistogram());
+  }
+
+  private static int getAverageValueLength(final TestOptions opts) {
+    return opts.valueRandom? opts.valueSize/2: opts.valueSize;
+  }
+
+  private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) 
throws IOException,
+      InterruptedException, ClassNotFoundException, ExecutionException {
+    // Log the configuration we're going to run with. Uses JSON mapper because 
lazy. It'll do
+    // the TestOptions introspection for us and dump the output in a readable 
format.
+    LOG.info(cmd.getSimpleName() + " test run options=" + 
MAPPER.writeValueAsString(opts));
+    Admin admin = null;
+    Connection connection = null;
+    try {
+      connection = ConnectionFactory.createConnection(getConf());
+      admin = connection.getAdmin();
+      checkTable(admin, opts);
+    } finally {
+      if (admin != null) admin.close();
+      if (connection != null) connection.close();
+    }
+    if (opts.nomapred) {
+      doLocalClients(opts, getConf());
+    } else {
+      doMapReduce(opts, getConf());
+    }
+  }
+
+  protected void printUsage() {
+    printUsage(this.getClass().getName(), null);
+  }
+
+  protected static void printUsage(final String message) {
+    printUsage(PerformanceEvaluation.class.getName(), message);
+  }
+
+  protected static void printUsageAndExit(final String message, final int 
exitCode) {
+    printUsage(message);
+    System.exit(exitCode);
+  }
+
+  protected static void printUsage(final String className, final String 
message) {
+    if (message != null && message.length() > 0) {
+      System.err.println(message);
+    }
+    System.err.println("Usage: java " + className + " \\");
+    System.err.println("  <OPTIONS> [-D<property=value>]* <command> 
<nclients>");
+    System.err.println();
+    System.err.println("General Options:");
+    System.err.println(" nomapred        Run multiple clients using threads " +
+      "(rather than use mapreduce)");
+    System.err.println(" oneCon          all the threads share the same 
connection. Default: False");
+    System.err.println(" sampleRate      Execute test on a sample of total " +
+      "rows. Only supported by randomRead. Default: 1.0");
+    System.err.println(" period          Report every 'period' rows: " +
+      "Default: opts.perClientRunRows / 10 = " + 
DEFAULT_OPTS.getPerClientRunRows()/10);
+    System.err.println(" cycles          How many times to cycle the test. 
Defaults: 1.");
+    System.err.println(" traceRate       Enable HTrace spans. Initiate tracing 
every N rows. " +
+      "Default: 0");
+    System.err.println(" latency         Set to report operation latencies. 
Default: False");
+    System.err.println(" measureAfter    Start to measure the latency once 
'measureAfter'" +
+        " rows have been treated. Default: 0");
+    System.err.println(" valueSize       Pass value size to use: Default: "
+        + DEFAULT_OPTS.getValueSize());
+    System.err.println(" valueRandom     Set if we should vary value size 
between 0 and " +
+        "'valueSize'; set on read for stats on size: Default: Not set.");
+    System.err.println(" blockEncoding   Block encoding to use. Value should 
be one of "
+        + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
+    System.err.println();
+    System.err.println("Table Creation / Write Tests:");
+    System.err.println(" table           Alternate table name. Default: 
'TestTable'");
+    System.err.println(" rows            Rows each client runs. Default: "
+        + DEFAULT_OPTS.getPerClientRunRows()
+        + ".  In case of randomReads and randomSeekScans this could"
+        + " be specified along with --size to specify the number of rows to be 
scanned within"
+        + " the total range specified by the size.");
+    System.err.println(
+      " size            Total size in GiB. Mutually exclusive with --rows for 
writes and scans"
+          + ". But for randomReads and randomSeekScans when you use size with 
--rows you could"
+          + " use size to specify the end range and --rows"
+          + " specifies the number of rows within that range. " + "Default: 
1.0.");
+    System.err.println(" compress        Compression type to use (GZ, LZO, 
...). Default: 'NONE'");
+    System.err.println(" flushCommits    Used to determine if the test should 
flush the table. " +
+      "Default: false");
+    System.err.println(" valueZipf       Set if we should vary value size 
between 0 and " +
+        "'valueSize' in zipf form: Default: Not set.");
+    System.err.println(" writeToWAL      Set writeToWAL on puts. Default: 
True");
+    System.err.println(" autoFlush       Set autoFlush on htable. Default: 
False");
+    System.err.println(" presplit        Create presplit table. If a table 
with same name exists,"
+        + " it'll be deleted and recreated (instead of verifying count of its 
existing regions). "
+        + "Recommended for accurate perf analysis (see guide). Default: 
disabled");
+    System.err.println(" usetags         Writes tags along with KVs. Use with 
HFile V3. " +
+      "Default: false");
+    System.err.println(" numoftags       Specify the no of tags that would be 
needed. " +
+       "This works only if usetags is true. Default: " + 
DEFAULT_OPTS.noOfTags);
+    System.err.println(" splitPolicy     Specify a custom RegionSplitPolicy 
for the table.");
+    System.err.println(" columns         Columns to write per row. Default: 
1");
+    System.err.println();
+    System.err.println("Read Tests:");
+    System.err.println(" filterAll       Helps to filter out all the rows on 
the server side"
+        + " there by not returning any thing back to the client.  Helps to 
check the server side"
+        + " performance.  Uses FilterAllFilter internally. ");
+    System.err.println(" multiGet        Batch gets together into groups of N. 
Only supported " +
+      "by randomRead. Default: disabled");
+    System.err.println(" inmemory        Tries to keep the HFiles of the CF " +
+      "inmemory as far as possible. Not guaranteed that reads are always 
served " +
+      "from memory.  Default: false");
+    System.err.println(" bloomFilter     Bloom filter type, one of "
+        + Arrays.toString(BloomType.values()));
+    System.err.println(" blockSize       Blocksize to use when writing out 
hfiles. ");
+    System.err.println(" inmemoryCompaction  Makes the column family to do 
inmemory flushes/compactions. "
+        + "Uses the CompactingMemstore");
+    System.err.println(" addColumns      Adds columns to scans/gets 
explicitly. Default: true");
+    System.err.println(" replicas        Enable region replica testing. 
Defaults: 1.");
+    System.err.println(" randomSleep     Do a random sleep before each get 
between 0 and entered value. Defaults: 0");
+    System.err.println(" caching         Scan caching to use. Default: 30");
+    System.err.println(" asyncPrefetch   Enable asyncPrefetch for scan");
+    System.err.println(" cacheBlocks     Set the cacheBlocks option for scan. 
Default: true");
+    System.err.println(" scanReadType    Set the readType option for scan, 
stream/pread/default. Default: default");
+    System.err.println();
+    System.err.println(" Note: -D properties will be applied to the conf used. 
");
+    System.err.println("  For example: ");
+    System.err.println("   -Dmapreduce.output.fileoutputformat.compress=true");
+    System.err.println("   -Dmapreduce.task.timeout=60000");
+    System.err.println();
+    System.err.println("Command:");
+    for (CmdDescriptor command : COMMANDS.values()) {
+      System.err.println(String.format(" %-20s %s", command.getName(), 
command.getDescription()));
+    }
+    System.err.println();
+    System.err.println("Args:");
+    System.err.println(" nclients        Integer. Required. Total number of 
clients "
+        + "(and HRegionServers) running. 1 <= value <= 500");
+    System.err.println("Examples:");
+    System.err.println(" To run a single client doing the default 1M 
sequentialWrites:");
+    System.err.println(" $ hbase " + className + " sequentialWrite 1");
+    System.err.println(" To run 10 clients doing increments over ten rows:");
+    System.err.println(" $ hbase " + className + " --rows=10 --nomapred 
increment 10");
+  }
+
+  /**
+   * Parse options passed in via an arguments array. Assumes that array has 
been split
+   * on white-space and placed into a {@code Queue}. Any unknown arguments 
will remain
+   * in the queue at the conclusion of this method call. It's up to the caller 
to deal
+   * with these unrecognized arguments.
+   */
+  static TestOptions parseOpts(Queue<String> args) {
+    TestOptions opts = new TestOptions();
+
+    String cmd = null;
+    while ((cmd = args.poll()) != null) {
+      if (cmd.equals("-h") || cmd.startsWith("--h")) {
+        // place item back onto queue so that caller knows parsing was 
incomplete
+        args.add(cmd);
+        break;
+      }
+
+      final String nmr = "--nomapred";
+      if (cmd.startsWith(nmr)) {
+        opts.nomapred = true;
+        continue;
+      }
+
+      final String rows = "--rows=";
+      if (cmd.startsWith(rows)) {
+        opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
+        continue;
+      }
+
+      final String cycles = "--cycles=";
+      if (cmd.startsWith(cycles)) {
+        opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
+        continue;
+      }
+
+      final String sampleRate = "--sampleRate=";
+      if (cmd.startsWith(sampleRate)) {
+        opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
+        continue;
+      }
+
+      final String table = "--table=";
+      if (cmd.startsWith(table)) {
+        opts.tableName = cmd.substring(table.length());
+        continue;
+      }
+
+      final String startRow = "--startRow=";
+      if (cmd.startsWith(startRow)) {
+        opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
+        continue;
+      }
+
+      final String compress = "--compress=";
+      if (cmd.startsWith(compress)) {
+        opts.compression = 
Compression.Algorithm.valueOf(cmd.substring(compress.length()));
+        continue;
+      }
+
+      final String traceRate = "--traceRate=";
+      if (cmd.startsWith(traceRate)) {
+        opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
+        continue;
+      }
+
+      final String blockEncoding = "--blockEncoding=";
+      if (cmd.startsWith(blockEncoding)) {
+        opts.blockEncoding = 
DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
+        continue;
+      }
+
+      final String flushCommits = "--flushCommits=";
+      if (cmd.startsWith(flushCommits)) {
+        opts.flushCommits = 
Boolean.parseBoolean(cmd.substring(flushCommits.length()));
+        continue;
+      }
+
+      final String writeToWAL = "--writeToWAL=";
+      if (cmd.startsWith(writeToWAL)) {
+        opts.writeToWAL = 
Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
+        continue;
+      }
+
+      final String presplit = "--presplit=";
+      if (cmd.startsWith(presplit)) {
+        opts.presplitRegions = 
Integer.parseInt(cmd.substring(presplit.length()));
+        continue;
+      }
+
+      final String inMemory = "--inmemory=";
+      if (cmd.startsWith(inMemory)) {
+        opts.inMemoryCF = 
Boolean.parseBoolean(cmd.substring(inMemory.length()));
+        continue;
+      }
+
+      final String autoFlush = "--autoFlush=";
+      if (cmd.startsWith(autoFlush)) {
+        opts.autoFlush = 
Boolean.parseBoolean(cmd.substring(autoFlush.length()));
+        continue;
+      }
+
+      final String onceCon = "--oneCon=";
+      if (cmd.startsWith(onceCon)) {
+        opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
+        continue;
+      }
+
+      final String latency = "--latency";
+      if (cmd.startsWith(latency)) {
+        opts.reportLatency = true;
+        continue;
+      }
+
+      final String multiGet = "--multiGet=";
+      if (cmd.startsWith(multiGet)) {
+        opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
+        continue;
+      }
+
+      final String useTags = "--usetags=";
+      if (cmd.startsWith(useTags)) {
+        opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
+        continue;
+      }
+
+      final String noOfTags = "--numoftags=";
+      if (cmd.startsWith(noOfTags)) {
+        opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
+        continue;
+      }
+
+      final String replicas = "--replicas=";
+      if (cmd.startsWith(replicas)) {
+        opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
+        continue;
+      }
+
+      final String filterOutAll = "--filterAll";
+      if (cmd.startsWith(filterOutAll)) {
+        opts.filterAll = true;
+        continue;
+      }
+
+      final String size = "--size=";
+      if (cmd.startsWith(size)) {
+        opts.size = Float.parseFloat(cmd.substring(size.length()));
+        if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 
1; i.e. 1GB");
+        continue;
+      }
+
+      final String splitPolicy = "--splitPolicy=";
+      if (cmd.startsWith(splitPolicy)) {
+        opts.splitPolicy = cmd.substring(splitPolicy.length());
+        continue;
+      }
+
+      final String randomSleep = "--randomSleep=";
+      if (cmd.startsWith(randomSleep)) {
+        opts.randomSleep = 
Integer.parseInt(cmd.substring(randomSleep.length()));
+        continue;
+      }
+
+      final String measureAfter = "--measureAfter=";
+      if (cmd.startsWith(measureAfter)) {
+        opts.measureAfter = 
Integer.parseInt(cmd.substring(measureAfter.length()));
+        continue;
+      }
+
+      final String bloomFilter = "--bloomFilter=";
+      if (cmd.startsWith(bloomFilter)) {
+        opts.bloomType = 
BloomType.valueOf(cmd.substring(bloomFilter.length()));
+        continue;
+      }
+
+      final String blockSize = "--blockSize=";
+      if(cmd.startsWith(blockSize) ) {
+        opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
+      }
+
+      final String valueSize = "--valueSize=";
+      if (cmd.startsWith(valueSize)) {
+        opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
+        continue;
+      }
+
+      final String valueRandom = "--valueRandom";
+      if (cmd.startsWith(valueRandom)) {
+        opts.valueRandom = true;
+        if (opts.valueZipf) {
+          throw new IllegalStateException("Either valueZipf or valueRandom but 
not both");
+        }
+        continue;
+      }
+
+      final String valueZipf = "--valueZipf";
+      if (cmd.startsWith(valueZipf)) {
+        opts.valueZipf = true;
+        if (opts.valueRandom) {
+          throw new IllegalStateException("Either valueZipf or valueRandom but 
not both");
+        }
+        continue;
+      }
+
+      final String period = "--period=";
+      if (cmd.startsWith(period)) {
+        opts.period = Integer.parseInt(cmd.substring(period.length()));
+        continue;
+      }
+
+      final String addColumns = "--addColumns=";
+      if (cmd.startsWith(addColumns)) {
+  

<TRUNCATED>

Reply via email to