Repository: hbase Updated Branches: refs/heads/branch-1.2 074fbcfc5 -> e2e55481f
HBASE-16101 Tool to microbenchmark procedure WAL performance. Change-Id: I8ec158319395d2ec8e36641a3beab2694f7b6aef Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e2e55481 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e2e55481 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e2e55481 Branch: refs/heads/branch-1.2 Commit: e2e55481f64b519823644b86d575a1c01300bcf3 Parents: 074fbcf Author: Apekshit Sharma <[email protected]> Authored: Mon Aug 29 19:23:09 2016 -0700 Committer: Apekshit Sharma <[email protected]> Committed: Thu Sep 1 17:58:30 2016 -0700 ---------------------------------------------------------------------- hbase-assembly/pom.xml | 11 + .../hadoop/hbase/util/AbstractHBaseTool.java | 78 +++--- .../procedure2/ProcedureTestingUtility.java | 32 ++- ...ProcedureWALLoaderPerformanceEvaluation.java | 248 +++++++++++++++++ .../wal/ProcedureWALPerformanceEvaluation.java | 267 +++++++++++++++++++ 5 files changed, 597 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e2e55481/hbase-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-assembly/pom.xml b/hbase-assembly/pom.xml index 82a655d..2d0a5a5 100644 --- a/hbase-assembly/pom.xml +++ b/hbase-assembly/pom.xml @@ -161,6 +161,17 @@ <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> </dependency> + <!-- To dump tools in hbase-procedure into cached_classpath.txt. --> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-procedure</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-procedure</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop-compat</artifactId> http://git-wip-us.apache.org/repos/asf/hbase/blob/e2e55481/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java index a876aef..6e3dec6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AbstractHBaseTool.java @@ -17,13 +17,11 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; -import java.util.Set; -import java.util.TreeSet; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; @@ -40,12 +38,11 @@ import org.apache.hadoop.util.ToolRunner; */ @InterfaceAudience.Private public abstract class AbstractHBaseTool implements Tool { - protected static final int EXIT_SUCCESS = 0; protected static final int EXIT_FAILURE = 1; - private static final String SHORT_HELP_OPTION = "h"; - private static final String LONG_HELP_OPTION = "help"; + private static final Option HELP_OPTION = new Option("h", "help", false, + "Prints help for this tool."); private static final Log LOG = LogFactory.getLog(AbstractHBaseTool.class); @@ -53,8 +50,6 @@ public abstract class AbstractHBaseTool implements Tool { protected Configuration conf = null; - private static final Set<String> requiredOptions = new TreeSet<String>(); - protected String[] cmdLineArgs = null; /** @@ -83,6 +78,7 @@ public abstract class AbstractHBaseTool implements Tool { @Override public final int run(String[] args) throws IOException { + cmdLineArgs = args; if (conf == null) { LOG.error("Tool configuration is not initialized"); throw new NullPointerException("conf"); @@ -90,24 +86,22 @@ public abstract class AbstractHBaseTool implements Tool { CommandLine cmd; try { + addOptions(); + if (isHelpCommand(args)) { + printUsage(); + return EXIT_SUCCESS; + } // parse the command line arguments - cmd = parseArgs(args); - cmdLineArgs = args; + cmd = new BasicParser().parse(options, args); } catch (ParseException e) { LOG.error("Error when parsing command-line arguments", e); printUsage(); return EXIT_FAILURE; } - if (cmd.hasOption(SHORT_HELP_OPTION) || cmd.hasOption(LONG_HELP_OPTION) || - !sanityCheckOptions(cmd)) { - printUsage(); - return EXIT_FAILURE; - } - processOptions(cmd); - int ret = EXIT_FAILURE; + int ret; try { ret = doWork(); } catch (Exception e) { @@ -117,22 +111,11 @@ public abstract class AbstractHBaseTool implements Tool { return ret; } - private boolean sanityCheckOptions(CommandLine cmd) { - boolean success = true; - for (String reqOpt : requiredOptions) { - if (!cmd.hasOption(reqOpt)) { - LOG.error("Required option -" + reqOpt + " is missing"); - success = false; - } - } - return success; - } - - protected CommandLine parseArgs(String[] args) throws ParseException { - options.addOption(SHORT_HELP_OPTION, LONG_HELP_OPTION, false, "Show usage"); - addOptions(); - CommandLineParser parser = new BasicParser(); - return parser.parse(options, args); + private boolean isHelpCommand(String[] args) throws ParseException { + Options helpOption = new Options().addOption(HELP_OPTION); + // this parses the command line but doesn't throw an exception on unknown options + CommandLine cl = new BasicParser().parse(helpOption, args, true); + return cl.getOptions().length != 0; } protected void printUsage() { @@ -146,14 +129,20 @@ public abstract class AbstractHBaseTool implements Tool { helpFormatter.printHelp(usageStr, usageHeader, options, usageFooter); } + protected void addOption(Option option) { + options.addOption(option); + } + protected void addRequiredOptWithArg(String opt, String description) { - requiredOptions.add(opt); - addOptWithArg(opt, description); + Option option = new Option(opt, true, description); + option.setRequired(true); + options.addOption(option); } protected void addRequiredOptWithArg(String shortOpt, String longOpt, String description) { - requiredOptions.add(longOpt); - addOptWithArg(shortOpt, longOpt, description); + Option option = new Option(shortOpt, longOpt, true, description); + option.setRequired(true); + options.addOption(option); } protected void addOptNoArg(String opt, String description) { @@ -172,6 +161,21 @@ public abstract class AbstractHBaseTool implements Tool { options.addOption(shortOpt, longOpt, true, description); } + public int getOptionAsInt(CommandLine cmd, String opt, int defaultValue) { + if (cmd.hasOption(opt)) { + return Integer.parseInt(cmd.getOptionValue(opt)); + } else { + return defaultValue; + } + } + + public double getOptionAsDouble(CommandLine cmd, String opt, double defaultValue) { + if (cmd.hasOption(opt)) { + return Double.parseDouble(cmd.getOptionValue(opt)); + } else { + return defaultValue; + } + } /** * Parse a number and enforce a range. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/e2e55481/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index dc65e2d..ddddf41 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -31,6 +31,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ProcedureInfo; +import org.apache.hadoop.hbase.io.util.StreamUtils; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; @@ -212,6 +214,8 @@ public class ProcedureTestingUtility { } public static class TestProcedure extends Procedure<Void> { + private byte[] data = null; + public TestProcedure() {} public TestProcedure(long procId) { @@ -219,6 +223,11 @@ public class ProcedureTestingUtility { } public TestProcedure(long procId, long parentId) { + this(procId, parentId, null); + } + + public TestProcedure(long procId, long parentId, byte[] data) { + setData(data); setProcId(procId); if (parentId > 0) { setParentProcId(parentId); @@ -229,6 +238,14 @@ public class ProcedureTestingUtility { addStackIndex(index); } + public void setFinishedState() { + setState(ProcedureProtos.ProcedureState.FINISHED); + } + + public void setData(final byte[] data) { + this.data = data; + } + @Override protected Procedure[] execute(Void env) { return null; } @@ -239,10 +256,21 @@ public class ProcedureTestingUtility { protected boolean abort(Void env) { return false; } @Override - protected void serializeStateData(final OutputStream stream) throws IOException { } + protected void serializeStateData(final OutputStream stream) throws IOException { + StreamUtils.writeRawVInt32(stream, data != null ? data.length : 0); + if (data != null) stream.write(data); + } @Override - protected void deserializeStateData(final InputStream stream) throws IOException { } + protected void deserializeStateData(final InputStream stream) throws IOException { + int len = StreamUtils.readRawVarint32(stream); + if (len > 0) { + data = new byte[len]; + stream.read(data); + } else { + data = null; + } + } } public static class LoadCounter implements ProcedureStore.ProcedureLoader { http://git-wip-us.apache.org/repos/asf/hbase/blob/e2e55481/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java new file mode 100644 index 0000000..347239d --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.store.wal; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.ProcedureInfo; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.hadoop.hbase.util.AbstractHBaseTool; + +import static java.lang.System.currentTimeMillis; + +public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool { + protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); + + // Command line options and defaults. + public static int DEFAULT_NUM_PROCS = 1000000; // 1M + public static Option NUM_PROCS_OPTION = new Option("procs", true, + "Total number of procedures. Default: " + DEFAULT_NUM_PROCS); + public static int DEFAULT_NUM_WALS = 0; + public static Option NUM_WALS_OPTION = new Option("wals", true, + "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY + + " conf to roll the logs. Default: " + DEFAULT_NUM_WALS); + public static int DEFAULT_STATE_SIZE = 1024; // 1KB + public static Option STATE_SIZE_OPTION = new Option("size", true, + "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE + + " bytes"); + public static int DEFAULT_UPDATES_PER_PROC = 5; + public static Option UPDATES_PER_PROC_OPTION = new Option("updates_per_proc", true, + "Number of update states to write for each proc. Default: " + DEFAULT_UPDATES_PER_PROC); + public static double DEFAULT_DELETE_PROCS_FRACTION = 0.50; + public static Option DELETE_PROCS_FRACTION_OPTION = new Option("delete_procs_fraction", true, + "Fraction of procs for which to write delete state. Distribution of procs chosen for " + + "delete is uniform across all procs. Default: " + DEFAULT_DELETE_PROCS_FRACTION); + + public int numProcs; + public int updatesPerProc; + public double deleteProcsFraction; + public int numWals; + private WALProcedureStore store; + static byte[] serializedState; + + private class LoadCounter implements ProcedureStore.ProcedureLoader { + public LoadCounter() {} + + @Override + public void setMaxProcId(long maxProcId) { + } + + @Override + public void load(ProcedureIterator procIter) throws IOException { + while (procIter.hasNext()) { + if (procIter.isNextCompleted()) { + ProcedureInfo proc = procIter.nextAsProcedureInfo(); + } else { + Procedure proc = procIter.nextAsProcedure(); + } + } + } + + @Override + public void handleCorrupted(ProcedureIterator procIter) throws IOException { + while (procIter.hasNext()) { + Procedure proc = procIter.nextAsProcedure(); + } + } + } + + @Override + protected void addOptions() { + addOption(NUM_PROCS_OPTION); + addOption(UPDATES_PER_PROC_OPTION); + addOption(DELETE_PROCS_FRACTION_OPTION); + addOption(NUM_WALS_OPTION); + addOption(STATE_SIZE_OPTION); + } + + @Override + protected void processOptions(CommandLine cmd) { + numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS); + numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS); + int stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE); + serializedState = new byte[stateSize]; + updatesPerProc = getOptionAsInt(cmd, UPDATES_PER_PROC_OPTION.getOpt(), + DEFAULT_UPDATES_PER_PROC); + deleteProcsFraction = getOptionAsDouble(cmd, DELETE_PROCS_FRACTION_OPTION.getOpt(), + DEFAULT_DELETE_PROCS_FRACTION); + setupConf(); + } + + private void setupConf() { + if (numWals > 0) { + conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE); + } + } + + public void setUpProcedureStore() throws IOException { + Path testDir = UTIL.getDataTestDir(); + FileSystem fs = testDir.getFileSystem(conf); + Path logDir = new Path(testDir, "proc-logs"); + System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n"); + fs.delete(logDir, true); + store = ProcedureTestingUtility.createWalStore(conf, fs, logDir); + store.start(1); + store.recoverLease(); + store.load(new LoadCounter()); + } + + /** + * @return a list of shuffled integers which represent state of proc id. First occurrence of a + * number denotes insert state, consecutive occurrences denote update states, and -ve value + * denotes delete state. + */ + private List<Integer> shuffleProcWriteSequence() { + Random rand = new Random(); + List<Integer> procStatesSequence = new ArrayList<>(); + Set<Integer> toBeDeletedProcs = new HashSet<>(); + // Add n + 1 entries of the proc id for insert + updates. If proc is chosen for delete, add + // extra entry which is marked -ve in the loop after shuffle. + for (int procId = 1; procId <= numProcs; ++procId) { + procStatesSequence.addAll(Collections.nCopies(updatesPerProc + 1, procId)); + if (rand.nextFloat() < deleteProcsFraction) { + procStatesSequence.add(procId); + toBeDeletedProcs.add(procId); + } + } + Collections.shuffle(procStatesSequence); + // Mark last occurrences of proc ids in toBeDeletedProcs with -ve to denote it's a delete state. + for (int i = procStatesSequence.size() - 1; i >= 0; --i) { + int procId = procStatesSequence.get(i); + if (toBeDeletedProcs.contains(procId)) { + procStatesSequence.set(i, -1 * procId); + toBeDeletedProcs.remove(procId); + } + } + return procStatesSequence; + } + + private void writeWals() throws IOException { + List<Integer> procStates = shuffleProcWriteSequence(); + TestProcedure[] procs = new TestProcedure[numProcs + 1]; // 0 is not used. + int numProcsPerWal = numWals > 0 ? (int)Math.ceil(procStates.size() / numWals) + : Integer.MAX_VALUE; + long startTime = currentTimeMillis(); + long lastTime = startTime; + for (int i = 0; i < procStates.size(); ++i) { + int procId = procStates.get(i); + if (procId < 0) { + store.delete(procs[-procId].getProcId()); + procs[-procId] = null; + } else if (procs[procId] == null) { + procs[procId] = new TestProcedure(procId, 0); + procs[procId].setData(serializedState); + store.insert(procs[procId], null); + } else { + store.update(procs[procId]); + } + if (i > 0 && i % numProcsPerWal == 0) { + long currentTime = currentTimeMillis(); + System.out.println("Forcing wall roll. Time taken on last WAL: " + + (currentTime - lastTime) / 1000.0f + " sec"); + store.rollWriterForTesting(); + lastTime = currentTime; + } + } + long timeTaken = currentTimeMillis() - startTime; + System.out.println("\n\nDone writing WALs.\nNum procs : " + numProcs + "\nTotal time taken : " + + StringUtils.humanTimeDiff(timeTaken) + "\n\n"); + } + + private void storeRestart(ProcedureStore.ProcedureLoader loader) throws IOException { + System.out.println("Restarting procedure store to read back the WALs"); + store.stop(false); + store.start(1); + store.recoverLease(); + + long startTime = currentTimeMillis(); + store.load(loader); + long timeTaken = System.currentTimeMillis() - startTime; + System.out.println("******************************************"); + System.out.println("Load time : " + (timeTaken / 1000.0f) + "sec"); + System.out.println("******************************************"); + } + + public void tearDownProcedureStore() { + store.stop(false); + try { + store.getFileSystem().delete(store.getLogDir(), true); + } catch (IOException e) { + System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up " + + "disk space. Location: " + store.getLogDir().toString()); + System.err.println(e.toString()); + } + } + + @Override + protected int doWork() { + try { + setUpProcedureStore(); + writeWals(); + storeRestart(new LoadCounter()); + return EXIT_SUCCESS; + } catch (IOException e) { + e.printStackTrace(); + return EXIT_FAILURE; + } finally { + tearDownProcedureStore(); + } + } + + public static void main(String[] args) throws IOException { + ProcedureWALLoaderPerformanceEvaluation tool = new ProcedureWALLoaderPerformanceEvaluation(); + tool.setConf(UTIL.getConfiguration()); + tool.run(args); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/e2e55481/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java new file mode 100644 index 0000000..6f1332c --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.store.wal; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.util.*; + +import org.apache.hadoop.hbase.util.AbstractHBaseTool; + +public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { + protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); + + // Command line options and defaults. + public static int DEFAULT_NUM_THREADS = 20; + public static Option NUM_THREADS_OPTION = new Option("threads", true, + "Number of parallel threads which will write insert/updates/deletes to WAL. Default: " + + DEFAULT_NUM_THREADS); + public static int DEFAULT_NUM_PROCS = 1000000; // 1M + public static Option NUM_PROCS_OPTION = new Option("procs", true, + "Total number of procedures. Each procedure writes one insert and one update. Default: " + + DEFAULT_NUM_PROCS); + public static int DEFAULT_NUM_WALS = 0; + public static Option NUM_WALS_OPTION = new Option("wals", true, + "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY + + " conf to roll the logs. Default: " + DEFAULT_NUM_WALS); + public static int DEFAULT_STATE_SIZE = 1024; // 1KB + public static Option STATE_SIZE_OPTION = new Option("size", true, + "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE + + "bytes"); + public static Option SYNC_OPTION = new Option("sync", true, + "Type of sync to use when writing WAL contents to file system. Accepted values: hflush, " + + "hsync, nosync. Default: hflush"); + public static String DEFAULT_SYNC_OPTION = "hflush"; + + public int numThreads; + public long numProcs; + public long numProcsPerWal = Long.MAX_VALUE; // never roll wall based on this value. + public int numWals; + public String syncType; + public int stateSize; + static byte[] serializedState; + private WALProcedureStore store; + + /** Used by {@link Worker}. */ + private AtomicLong procIds = new AtomicLong(0); + private AtomicBoolean workersFailed = new AtomicBoolean(false); + // Timeout for worker threads. + private static final int WORKER_THREADS_TIMEOUT_SEC = 600; // in seconds + + // Non-default configurations. + private void setupConf() { + conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, "hsync".equals(syncType)); + if (numWals > 0) { + conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE); + numProcsPerWal = numProcs / numWals; + } + } + + private void setupProcedureStore() throws IOException { + Path testDir = UTIL.getDataTestDir(); + FileSystem fs = testDir.getFileSystem(conf); + Path logDir = new Path(testDir, "proc-logs"); + System.out.println("Logs directory : " + logDir.toString()); + fs.delete(logDir, true); + if ("nosync".equals(syncType)) { + store = new NoSyncWalProcedureStore(conf, fs, logDir); + } else { + store = ProcedureTestingUtility.createWalStore(conf, fs, logDir); + } + store.start(numThreads); + store.recoverLease(); + store.load(new ProcedureTestingUtility.LoadCounter()); + System.out.println("Starting new log : " + + store.getActiveLogs().get(store.getActiveLogs().size() - 1)); + } + + private void tearDownProcedureStore() { + store.stop(false); + try { + store.getFileSystem().delete(store.getLogDir(), true); + } catch (IOException e) { + System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up " + + "disk space. Location: " + store.getLogDir().toString()); + e.printStackTrace(); + } + } + + /** + * Processes and validates command line options. + */ + @Override + public void processOptions(CommandLine cmd) { + numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS); + numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS); + numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS); + syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION); + assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(syncType): + "sync argument can only accept one of these three values: hsync, hflush, nosync"; + stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE); + serializedState = new byte[stateSize]; + setupConf(); + } + + @Override + public void addOptions() { + addOption(NUM_THREADS_OPTION); + addOption(NUM_PROCS_OPTION); + addOption(NUM_WALS_OPTION); + addOption(SYNC_OPTION); + addOption(STATE_SIZE_OPTION); + } + + @Override + public int doWork() { + try { + setupProcedureStore(); + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + Future<?>[] futures = new Future<?>[numThreads]; + // Start worker threads. + long start = System.currentTimeMillis(); + for (int i = 0; i < numThreads; i++) { + futures[i] = executor.submit(this.new Worker(start)); + } + boolean failure = false; + try { + for (Future<?> future : futures) { + long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - System.currentTimeMillis(); + failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE)); + } + } catch (Exception e) { + System.err.println("Exception in worker thread."); + e.printStackTrace(); + return EXIT_FAILURE; + } + executor.shutdown(); + if (failure) { + return EXIT_FAILURE; + } + long timeTaken = System.currentTimeMillis() - start; + System.out.println("******************************************"); + System.out.println("Num threads : " + numThreads); + System.out.println("Num procedures : " + numProcs); + System.out.println("Sync type : " + syncType); + System.out.println("Time taken : " + (timeTaken / 1000.0f) + "sec"); + System.out.println("******************************************"); + return EXIT_SUCCESS; + } catch (IOException e) { + e.printStackTrace(); + return EXIT_FAILURE; + } finally { + tearDownProcedureStore(); + } + } + + /////////////////////////////// + // HELPER CLASSES + /////////////////////////////// + + /** + * Callable to generate load for wal by inserting/deleting/updating procedures. + * If procedure store fails to roll log file (throws IOException), all threads quit, and at + * least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}. + */ + class Worker implements Callable<Integer> { + final long start; + + public Worker(long start) { + this.start = start; + } + + // TODO: Can also collect #procs, time taken by each thread to measure fairness. + @Override + public Integer call() throws IOException { + while (true) { + if (workersFailed.get()) { + return EXIT_FAILURE; + } + long procId = procIds.getAndIncrement(); + if (procId >= numProcs) { + break; + } + if (procId != 0 && procId % 10000 == 0) { + long ms = System.currentTimeMillis() - start; + System.out.println("Wrote " + procId + " procedures in " + + StringUtils.humanTimeDiff(ms)); + } + try{ + if (procId > 0 && procId % numProcsPerWal == 0) { + store.rollWriterForTesting(); + System.out.println("Starting new log : " + + store.getActiveLogs().get(store.getActiveLogs().size() - 1)); + } + } catch (IOException ioe) { + // Ask other threads to quit too. + workersFailed.set(true); + System.err.println("Exception when rolling log file. Current procId = " + procId); + ioe.printStackTrace(); + return EXIT_FAILURE; + } + ProcedureTestingUtility.TestProcedure proc = + new ProcedureTestingUtility.TestProcedure(procId); + proc.setData(serializedState); + store.insert(proc, null); + store.update(proc); + } + return EXIT_SUCCESS; + } + } + + public class NoSyncWalProcedureStore extends WALProcedureStore { + public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs, + final Path logDir) { + super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() { + @Override + public void recoverFileLease(FileSystem fs, Path path) throws IOException { + // no-op + } + }); + } + + @Override + protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count) + throws IOException { + long totalSynced = 0; + for (int i = 0; i < count; ++i) { + totalSynced += slots[offset + i].size(); + } + return totalSynced; + } + } + + public static void main(String[] args) throws IOException { + ProcedureWALPerformanceEvaluation tool = new ProcedureWALPerformanceEvaluation(); + tool.setConf(UTIL.getConfiguration()); + tool.run(args); + } +} \ No newline at end of file
