Author: dhruba Date: Mon Dec 10 14:43:44 2007 New Revision: 603084 URL: http://svn.apache.org/viewvc?rev=603084&view=rev Log: HADOOP-2000. Rewrite NNBench to measure namenode performance accurately. It now uses the map-reduce framework for load generation. (Mukund Madhugiri via dhruba)
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=603084&r1=603083&r2=603084&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Dec 10 14:43:44 2007 @@ -109,6 +109,10 @@ HADOOP-1327. Include website documentation for streaming. (Rob Weltman via omalley) + HADOOP-2000. Rewrite NNBench to measure namenode performance accurately. + It now uses the map-reduce framework for load generation. + (Mukund Madhugiri via dhruba) + OPTIMIZATIONS HADOOP-1898. Release the lock protecting the last time of the last stack Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java?rev=603084&r1=603083&r2=603084&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBench.java Mon Dec 10 14:43:44 2007 @@ -20,322 +20,938 @@ import java.io.IOException; import java.util.Date; +import java.io.DataInputStream; +import java.io.FileOutputStream; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.io.File; +import java.io.BufferedReader; +import java.util.StringTokenizer; +import java.net.InetAddress; +import java.text.SimpleDateFormat; +import java.util.Iterator; -import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.conf.Configured; + +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.SequenceFile; + +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.TaskTracker; /** * This program executes a specified operation that applies load to - * the NameNode. Possible operations include create/writing files, - * opening/reading files, renaming files, and deleting files. + * the NameNode. * * When run simultaneously on multiple nodes, this program functions * as a stress-test and benchmark for namenode, especially when * the number of bytes written to each file is small. + * + * Valid operations are: + * create_write + * open_read + * rename + * delete + * + * NOTE: The open_read, rename and delete operations assume that the files + * they operate on are already available. The create_write operation + * must be run before running the other operations. */ + public class NNBench { + protected static final Log LOG = LogFactory.getLog( + "org.apache.hadoop.dfs.NNBench"); + + protected static String CONTROL_DIR_NAME = "control"; + protected static String OUTPUT_DIR_NAME = "output"; + protected static String DATA_DIR_NAME = "data"; + protected static final String DEFAULT_RES_FILE_NAME = "NNBench_results.log"; + protected static final String NNBENCH_VERSION = "NameNode Benchmark 0.4"; + + public static String operation = "none"; + public static long numberOfMaps = 1l; // default is 1 + public static long numberOfReduces = 1l; // default is 1 + public static long startTime = + System.currentTimeMillis() + (120 * 1000); // default is 'now' + 2min + public static long blockSize = 1l; // default is 1 + public static int bytesToWrite = 0; // default is 0 + public static long bytesPerChecksum = 1l; // default is 1 + public static long numberOfFiles = 1l; // default is 1 + public static short replicationFactorPerFile = 1; // default is 1 + public static String baseDir = "/benchmarks/NNBench"; // default + public static boolean readFileAfterOpen = false; // default is to not read + + // Supported operations + private static final String OP_CREATE_WRITE = "create_write"; + private static final String OP_OPEN_READ = "open_read"; + private static final String OP_RENAME = "rename"; + private static final String OP_DELETE = "delete"; + + // To display in the format that matches the NN and DN log format + // Example: 2007-10-26 00:01:19,853 + static SimpleDateFormat sdf = + new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss','S"); + + private static Configuration config = new Configuration(); - private static final Log LOG = LogFactory.getLog( - "org.apache.hadoop.dfs.NNBench"); + /** + * Clean up the files before a test run + * + * @throws IOException on error + */ + private static void cleanupBeforeTestrun() throws IOException { + FileSystem tempFS = FileSystem.get(config); + + // Delete the data directory only if it is the create/write operation + if (operation.equals(OP_CREATE_WRITE)) { + LOG.info("Deleting data directory"); + tempFS.delete(new Path(baseDir, DATA_DIR_NAME)); + } + tempFS.delete(new Path(baseDir, CONTROL_DIR_NAME)); + tempFS.delete(new Path(baseDir, OUTPUT_DIR_NAME)); + } - // variable initialzed from command line arguments - private static long startTime = 0; - private static int numFiles = 0; - private static long bytesPerBlock = 1; - private static long blocksPerFile = 0; - private static long bytesPerFile = 1; - private static Path baseDir = null; - - // variables initialized in main() - private static FileSystem fileSys = null; - private static Path taskDir = null; - private static String uniqueId = null; - private static byte[] buffer; - private static long maxExceptionsPerFile = 200; - - /** - * Returns when the current number of seconds from the epoch equals - * the command line argument given by <code>-startTime</code>. - * This allows multiple instances of this program, running on clock - * synchronized nodes, to start at roughly the same time. - */ - static void barrier() { - long sleepTime; - while ((sleepTime = startTime - System.currentTimeMillis()) > 0) { + /** + * Create control files before a test run. + * Number of files created is equal to the number of maps specified + * + * @throws IOException on error + */ + private static void createControlFiles() throws IOException { + FileSystem tempFS = FileSystem.get(config); + LOG.info("Creating " + numberOfMaps + " control files"); + + for (int i = 0; i < numberOfMaps; i++) { + String strFileName = "NNBench_Controlfile_" + i; + Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME), + strFileName); + + SequenceFile.Writer writer = null; try { - Thread.sleep(sleepTime); - } catch (InterruptedException ex) { + writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class, + LongWritable.class, CompressionType.NONE); + writer.append(new Text(strFileName), new LongWritable(0l)); + } catch(Exception e) { + throw new IOException(e.getLocalizedMessage()); + } finally { + if (writer != null) { + writer.close(); + } + writer = null; } } } + /** + * Display version + */ + private static void displayVersion() { + System.out.println(NNBENCH_VERSION); + } + + /** + * Display usage + */ + private static void displayUsage() { + String usage = + "Usage: nnbench <options>\n" + + "Options:\n" + + "\t-operation <Available operations are " + OP_CREATE_WRITE + " " + + OP_OPEN_READ + " " + OP_RENAME + " " + OP_DELETE + ". " + + "This option is mandatory>\n" + + "\t * NOTE: The open_read, rename and delete operations assume " + + "that the files they operate on, are already available. " + + "The create_write operation must be run before running the " + + "other operations.\n" + + "\t-maps <number of maps. default is 1. This is not mandatory>\n" + + "\t-reduces <number of reduces. default is 1. This is not mandatory>\n" + + "\t-startTime <time to start, given in seconds from the epoch. " + + "Make sure this is far enough into the future, so all maps " + + "(operations) will start at the same time>. " + + "default is launch time + 2 mins. This is not mandatory \n" + + "\t-blockSize <Block size in bytes. default is 1. " + + "This is not mandatory>\n" + + "\t-bytesToWrite <Bytes to write. default is 0. " + + "This is not mandatory>\n" + + "\t-bytesPerChecksum <Bytes per checksum for the files. default is 1. " + + "This is not mandatory>\n" + + "\t-numberOfFiles <number of files to create. default is 1. " + + "This is not mandatory>\n" + + "\t-replicationFactorPerFile <Replication factor for the files." + + " default is 1. This is not mandatory>\n" + + "\t-baseDir <base DFS path. default is /becnhmarks/NNBench. " + + "This is not mandatory>\n" + + "\t-readFileAfterOpen <true or false. if true, it reads the file and " + + "reports the average time to read. This is valid with the open_read " + + "operation. default is false. This is not mandatory>\n" + + "\t-help: Display the help statement\n"; + - static private void handleException(String operation, Throwable e, - int singleFileExceptions) { - LOG.warn("Exception while " + operation + ": " + - StringUtils.stringifyException(e)); - if (singleFileExceptions >= maxExceptionsPerFile) { - throw new RuntimeException(singleFileExceptions + - " exceptions for a single file exceeds threshold. Aborting"); + System.out.println(usage); + } + + /** + * check for arguments and fail if the values are not specified + */ + public static void checkArgs(final int index, final int length) { + if (index == length) { + displayUsage(); + System.exit(-1); } } /** - * Create and write to a given number of files. Repeat each remote - * operation until is suceeds (does not throw an exception). - * - * @return the number of exceptions caught + * Parse input arguments + * + * @params args Command line inputs */ - static int createWrite() { - int totalExceptions = 0; - FSDataOutputStream out = null; - boolean success = false; - for (int index = 0; index < numFiles; index++) { - int singleFileExceptions = 0; - do { // create file until is succeeds or max exceptions reached - try { - out = fileSys.create( - new Path(taskDir, "" + index), false, 512, (short)1, bytesPerBlock); - success = true; - } catch (IOException ioe) { - success=false; - totalExceptions++; - handleException("creating file #" + index, ioe, ++singleFileExceptions); - } - } while (!success); - long toBeWritten = bytesPerFile; - while (toBeWritten > 0) { - int nbytes = (int) Math.min(buffer.length, toBeWritten); - toBeWritten -= nbytes; - try { // only try once - out.write(buffer, 0, nbytes); - } catch (IOException ioe) { - totalExceptions++; - handleException("writing to file #" + index, ioe, ++singleFileExceptions); - } + public static void parseInputs(final String[] args) { + // If there are no command line arguments, exit + if (args.length == 0) { + displayUsage(); + System.exit(-1); + } + + // Parse command line args + for (int i = 0; i < args.length; i++) { + if (args[i].equals("-operation")) { + operation = args[++i]; + } else if (args[i].equals("-maps")) { + checkArgs(i + 1, args.length); + numberOfMaps = Long.parseLong(args[++i]); + } else if (args[i].equals("-reduces")) { + checkArgs(i + 1, args.length); + numberOfReduces = Long.parseLong(args[++i]); + } else if (args[i].equals("-startTime")) { + checkArgs(i + 1, args.length); + startTime = Long.parseLong(args[++i]) * 1000; + } else if (args[i].equals("-blockSize")) { + checkArgs(i + 1, args.length); + blockSize = Long.parseLong(args[++i]); + } else if (args[i].equals("-bytesToWrite")) { + checkArgs(i + 1, args.length); + bytesToWrite = Integer.parseInt(args[++i]); + } else if (args[i].equals("-bytesPerChecksum")) { + checkArgs(i + 1, args.length); + bytesPerChecksum = Long.parseLong(args[++i]); + } else if (args[i].equals("-numberOfFiles")) { + checkArgs(i + 1, args.length); + numberOfFiles = Long.parseLong(args[++i]); + } else if (args[i].equals("-replicationFactorPerFile")) { + checkArgs(i + 1, args.length); + replicationFactorPerFile = Short.parseShort(args[++i]); + } else if (args[i].equals("-baseDir")) { + checkArgs(i + 1, args.length); + baseDir = args[++i]; + } else if (args[i].equals("-readFileAfterOpen")) { + checkArgs(i + 1, args.length); + readFileAfterOpen = Boolean.parseBoolean(args[++i]); + } else if (args[i].equals("-help")) { + displayUsage(); + System.exit(-1); } - do { // close file until is succeeds - try { - out.close(); - success = true; - } catch (IOException ioe) { - success=false; - totalExceptions++; - handleException("closing file #" + index, ioe, ++singleFileExceptions); - } - } while (!success); } - return totalExceptions; - } + LOG.info("Test Inputs: "); + LOG.info(" Test Operation: " + operation); + LOG.info(" Start time: " + sdf.format(new Date(startTime))); + LOG.info(" Number of maps: " + numberOfMaps); + LOG.info(" Number of reduces: " + numberOfReduces); + LOG.info(" Block Size: " + blockSize); + LOG.info(" Bytes to write: " + bytesToWrite); + LOG.info(" Bytes per checksum: " + bytesPerChecksum); + LOG.info(" Number of files: " + numberOfFiles); + LOG.info(" Replication factor: " + replicationFactorPerFile); + LOG.info(" Base dir: " + baseDir); + LOG.info(" Read file after open: " + readFileAfterOpen); + + // Set user-defined parameters, so the map method can access the values + config.set("test.nnbench.operation", operation); + config.setLong("test.nnbench.maps", numberOfMaps); + config.setLong("test.nnbench.reduces", numberOfReduces); + config.setLong("test.nnbench.starttime", startTime); + config.setLong("test.nnbench.blocksize", blockSize); + config.setInt("test.nnbench.bytestowrite", bytesToWrite); + config.setLong("test.nnbench.bytesperchecksum", bytesPerChecksum); + config.setLong("test.nnbench.numberoffiles", numberOfFiles); + config.setInt("test.nnbench.replicationfactor", + (int) replicationFactorPerFile); + config.set("test.nnbench.basedir", baseDir); + config.setBoolean("test.nnbench.readFileAfterOpen", readFileAfterOpen); + + config.set("test.nnbench.datadir.name", DATA_DIR_NAME); + config.set("test.nnbench.outputdir.name", OUTPUT_DIR_NAME); + config.set("test.nnbench.controldir.name", CONTROL_DIR_NAME); + } + /** - * Open and read a given number of files. - * - * @return the number of exceptions caught + * Analyze the results + * + * @throws IOException on error */ - static int openRead() { - int totalExceptions = 0; - FSDataInputStream in = null; - for (int index = 0; index < numFiles; index++) { - int singleFileExceptions = 0; - try { - in = fileSys.open(new Path(taskDir, "" + index), 512); - long toBeRead = bytesPerFile; - while (toBeRead > 0) { - int nbytes = (int) Math.min(buffer.length, toBeRead); - toBeRead -= nbytes; - try { // only try once - in.read(buffer, 0, nbytes); - } catch (IOException ioe) { - totalExceptions++; - handleException("reading from file #" + index, ioe, ++singleFileExceptions); - } - } - in.close(); - } catch (IOException ioe) { - totalExceptions++; - handleException("opening file #" + index, ioe, ++singleFileExceptions); + private static void analyzeResults() throws IOException { + final FileSystem fs = FileSystem.get(config); + Path reduceFile = new Path(new Path(baseDir, OUTPUT_DIR_NAME), + "part-00000"); + + DataInputStream in; + in = new DataInputStream(fs.open(reduceFile)); + + BufferedReader lines; + lines = new BufferedReader(new InputStreamReader(in)); + + long totalTimeAL1 = 0l; + long totalTimeAL2 = 0l; + long totalTimeTPmS = 0l; + long lateMaps = 0l; + long numOfExceptions = 0l; + long successfulFileOps = 0l; + + long mapStartTimeTPmS = 0l; + long mapEndTimeTPmS = 0l; + + String resultTPSLine1 = null; + String resultTPSLine2 = null; + String resultALLine1 = null; + String resultALLine2 = null; + + String line; + while((line = lines.readLine()) != null) { + StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%;"); + String attr = tokens.nextToken(); + if (attr.endsWith(":totalTimeAL1")) { + totalTimeAL1 = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":totalTimeAL2")) { + totalTimeAL2 = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":totalTimeTPmS")) { + totalTimeTPmS = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":latemaps")) { + lateMaps = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":numOfExceptions")) { + numOfExceptions = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":successfulFileOps")) { + successfulFileOps = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":mapStartTimeTPmS")) { + mapStartTimeTPmS = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":mapEndTimeTPmS")) { + mapEndTimeTPmS = Long.parseLong(tokens.nextToken()); } } - return totalExceptions; - } + // Average latency is the average time to perform 'n' number of + // operations, n being the number of files + double avgLatency1 = (double) totalTimeAL1 / (double) successfulFileOps; + double avgLatency2 = (double) totalTimeAL2 / (double) successfulFileOps; + + // The time it takes for the longest running map is measured. Using that, + // cluster transactions per second is calculated. It includes time to + // retry any of the failed operations + double longestMapTimeTPmS = (double) (mapEndTimeTPmS - mapStartTimeTPmS); + double totalTimeTPS = (longestMapTimeTPmS == 0) ? + (1000 * successfulFileOps) : + (double) (1000 * successfulFileOps) / (double) longestMapTimeTPmS; + + // The time it takes to perform 'n' operations is calculated (in ms), + // n being the number of files. Using that time, the average execution + // time is calculated. It includes time to retry any of the + // failed operations + double AverageExecutionTime = (totalTimeTPmS == 0) ? + (double) successfulFileOps : + (double) (totalTimeTPmS / successfulFileOps); + + if (operation.equals(OP_CREATE_WRITE)) { + // For create/write/close, it is treated as two transactions, + // since a file create from a client perspective involves create and close + resultTPSLine1 = " TPS: Create/Write/Close: " + + (int) (totalTimeTPS * 2); + resultTPSLine2 = "Avg exec time (ms): Create/Write/Close: " + + (double) AverageExecutionTime; + resultALLine1 = " Avg Lat (ms): Create/Write: " + avgLatency1; + resultALLine2 = " Avg Lat (ms): Close: " + avgLatency2; + } else if (operation.equals(OP_OPEN_READ)) { + resultTPSLine1 = " TPS: Open/Read: " + + (int) totalTimeTPS; + resultTPSLine2 = " Avg Exec time (ms): Open/Read: " + + (double) AverageExecutionTime; + resultALLine1 = " Avg Lat (ms): Open: " + avgLatency1; + if (readFileAfterOpen) { + resultALLine2 = " Avg Lat (ms): Read: " + avgLatency2; + } + } else if (operation.equals(OP_RENAME)) { + resultTPSLine1 = " TPS: Rename: " + + (int) totalTimeTPS; + resultTPSLine2 = " Avg Exec time (ms): Rename: " + + (double) AverageExecutionTime; + resultALLine1 = " Avg Lat (ms): Rename: " + avgLatency1; + } else if (operation.equals(OP_DELETE)) { + resultTPSLine1 = " TPS: Delete: " + + (int) totalTimeTPS; + resultTPSLine2 = " Avg Exec time (ms): Delete: " + + (double) AverageExecutionTime; + resultALLine1 = " Avg Lat (ms): Delete: " + avgLatency1; + } + + String resultLines[] = { + "-------------- NNBench -------------- : ", + " Version: " + NNBENCH_VERSION, + " Date & time: " + sdf.format(new Date( + System.currentTimeMillis())), + "", + " Test Operation: " + operation, + " Start time: " + + sdf.format(new Date(startTime)), + " Maps to run: " + numberOfMaps, + " Reduces to run: " + numberOfReduces, + " Block Size (bytes): " + blockSize, + " Bytes to write: " + bytesToWrite, + " Bytes per checksum: " + bytesPerChecksum, + " Number of files: " + numberOfFiles, + " Replication factor: " + replicationFactorPerFile, + " Successful file operations: " + successfulFileOps, + "", + " # maps that missed the barrier: " + lateMaps, + " # exceptions: " + numOfExceptions, + "", + resultTPSLine1, + resultTPSLine2, + resultALLine1, + resultALLine2, + "", + " RAW DATA: AL Total #1: " + totalTimeAL1, + " RAW DATA: AL Total #2: " + totalTimeAL2, + " RAW DATA: TPS Total (ms): " + totalTimeTPmS, + " RAW DATA: Longest Map Time (ms): " + longestMapTimeTPmS, + " RAW DATA: Late maps: " + lateMaps, + " RAW DATA: # of exceptions: " + numOfExceptions, + "" }; + + PrintStream res = new PrintStream(new FileOutputStream( + new File(DEFAULT_RES_FILE_NAME), true)); + + // Write to a file and also dump to log + for(int i = 0; i < resultLines.length; i++) { + LOG.info(resultLines[i]); + res.println(resultLines[i]); + } + } + /** - * Rename a given number of files. Repeat each remote - * operation until is suceeds (does not throw an exception). - * - * @return the number of exceptions caught + * Run the test + * + * @throws IOException on error */ - static int rename() { - int totalExceptions = 0; - boolean success = false; - for (int index = 0; index < numFiles; index++) { - int singleFileExceptions = 0; - do { // rename file until is succeeds - try { - boolean result = fileSys.rename( - new Path(taskDir, "" + index), new Path(taskDir, "A" + index)); - success = true; - } catch (IOException ioe) { - success=false; - totalExceptions++; - handleException("creating file #" + index, ioe, ++singleFileExceptions); - } - } while (!success); - } - return totalExceptions; - } + public static void runTests() throws IOException { + config.setLong("io.bytes.per.checksum", bytesPerChecksum); + + JobConf job = new JobConf(config, NNBench.class); + + job.setJobName("NNBench-" + operation); + job.setInputPath(new Path(baseDir, CONTROL_DIR_NAME)); + job.setInputFormat(SequenceFileInputFormat.class); + + // Explicitly set number of max map attempts to 1. + job.setMaxMapAttempts(1); + // Explicitly turn off speculative execution + job.setSpeculativeExecution(false); + + job.setMapperClass(NNBenchMapper.class); + job.setReducerClass(NNBenchReducer.class); + + job.setOutputPath(new Path(baseDir, OUTPUT_DIR_NAME)); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setNumReduceTasks((int) numberOfReduces); + JobClient.runJob(job); + } + /** - * Delete a given number of files. Repeat each remote - * operation until is suceeds (does not throw an exception). - * - * @return the number of exceptions caught - */ - static int delete() { - int totalExceptions = 0; - boolean success = false; - for (int index = 0; index < numFiles; index++) { - int singleFileExceptions = 0; - do { // delete file until is succeeds - try { - boolean result = fileSys.delete(new Path(taskDir, "A" + index)); - success = true; - } catch (IOException ioe) { - success=false; - totalExceptions++; - handleException("creating file #" + index, ioe, ++singleFileExceptions); - } - } while (!success); + * Validate the inputs + */ + public static void validateInputs() { + // If it is not one of the four operations, then fail + if (!operation.equals(OP_CREATE_WRITE) && + !operation.equals(OP_OPEN_READ) && + !operation.equals(OP_RENAME) && + !operation.equals(OP_DELETE)) { + System.err.println("Error: Unknown operation: " + operation); + displayUsage(); + System.exit(-1); + } + + // If number of maps is a negative number, then fail + // Hadoop allows the number of maps to be 0 + if (numberOfMaps < 0) { + System.err.println("Error: Number of maps must be a positive number"); + displayUsage(); + System.exit(-1); + } + + // If number of reduces is a negative number or 0, then fail + if (numberOfReduces <= 0) { + System.err.println("Error: Number of reduces must be a positive number"); + displayUsage(); + System.exit(-1); + } + + // If blocksize is a negative number or 0, then fail + if (blockSize <= 0) { + System.err.println("Error: Block size must be a positive number"); + displayUsage(); + System.exit(-1); + } + + // If bytes to write is a negative number, then fail + if (bytesToWrite < 0) { + System.err.println("Error: Bytes to write must be a positive number"); + displayUsage(); + System.exit(-1); + } + + // If bytes per checksum is a negative number, then fail + if (bytesPerChecksum < 0) { + System.err.println("Error: Bytes per checksum must be a positive number"); + displayUsage(); + System.exit(-1); + } + + // If number of files is a negative number, then fail + if (numberOfFiles < 0) { + System.err.println("Error: Number of files must be a positive number"); + displayUsage(); + System.exit(-1); + } + + // If replication factor is a negative number, then fail + if (replicationFactorPerFile < 0) { + System.err.println("Error: Replication factor must be a positive number"); + displayUsage(); + System.exit(-1); + } + + // If block size is not a multiple of bytesperchecksum, fail + if (blockSize % bytesPerChecksum != 0) { + System.err.println("Error: Block Size in bytes must be a multiple of " + + "bytes per checksum: "); + displayUsage(); + System.exit(-1); } - return totalExceptions; } + /** + * Main method for running the NNBench benchmarks + * + * @throws IOException indicates a problem with test startup + */ + public static void main(String[] args) throws IOException { + // Display the application version string + displayVersion(); + + // Parse the inputs + parseInputs(args); + + // Validate inputs + validateInputs(); + + // Clean up files before the test run + cleanupBeforeTestrun(); + + // Create control files before test run + createControlFiles(); + + // Run the tests as a map reduce job + runTests(); + // Analyze results + analyzeResults(); + } + + /** - * This launches a given namenode operation (<code>-operation</code>), - * starting at a given time (<code>-startTime</code>). The files used - * by the openRead, rename, and delete operations are the same files - * created by the createWrite operation. Typically, the program - * would be run four times, once for each operation in this order: - * createWrite, openRead, rename, delete. - * - * <pre> - * Usage: nnbench - * -operation <one of createWrite, openRead, rename, or delete> - * -baseDir <base output/input DFS path> - * -startTime <time to start, given in seconds from the epoch> - * -numFiles <number of files to create, read, rename, or delete> - * -blocksPerFile <number of blocks to create per file> - * [-bytesPerBlock <number of bytes to write to each block, default is 1>] - * [-bytesPerChecksum <value for io.bytes.per.checksum>] - * </pre> - * - * @throws IOException indicates a problem with test startup + * Mapper class */ - public static void main(String[] args) throws IOException { - String version = "NameNodeBenchmark.0.3"; - System.out.println(version); - int bytesPerChecksum = -1; + static class NNBenchMapper extends Configured + implements Mapper<Text, LongWritable, Text, Text> { + FileSystem filesystem = null; + private String hostName = null; + + long numberOfFiles = 1l; + long blkSize = 1l; + short replFactor = 1; + int bytesToWrite = 0; + String baseDir = null; + String dataDirName = null; + String op = null; + boolean readFile = false; + final int MAX_OPERATION_EXCEPTIONS = 1000; + + // Data to collect from the operation + int numOfExceptions = 0; + long startTimeAL = 0l; + long totalTimeAL1 = 0l; + long totalTimeAL2 = 0l; + long successfulFileOps = 0l; + + /** + * Constructor + */ + public NNBenchMapper() { + super(config); + + try { + filesystem = FileSystem.get(config); + } catch(Exception e) { + throw new RuntimeException("Cannot get file system.", e); + } + + try { + hostName = InetAddress.getLocalHost().getHostName(); + } catch(Exception e) { + throw new RuntimeException("Error getting hostname", e); + } + } - String usage = - "Usage: nnbench " + - " -operation <one of createWrite, openRead, rename, or delete> " + - " -baseDir <base output/input DFS path> " + - " -startTime <time to start, given in seconds from the epoch> " + - " -numFiles <number of files to create> " + - " -blocksPerFile <number of blocks to create per file> " + - " [-bytesPerBlock <number of bytes to write to each block, default is 1>] " + - " [-bytesPerChecksum <value for io.bytes.per.checksum>]" + - "Note: bytesPerBlock MUST be a multiple of bytesPerChecksum"; - - String operation = null; - for (int i = 0; i < args.length; i++) { // parse command line - if (args[i].equals("-baseDir")) { - baseDir = new Path(args[++i]); - } else if (args[i].equals("-numFiles")) { - numFiles = Integer.parseInt(args[++i]); - } else if (args[i].equals("-blocksPerFile")) { - blocksPerFile = Integer.parseInt(args[++i]); - } else if (args[i].equals("-bytesPerBlock")) { - bytesPerBlock = Long.parseLong(args[++i]); - } else if (args[i].equals("-bytesPerChecksum")) { - bytesPerChecksum = Integer.parseInt(args[++i]); - } else if (args[i].equals("-startTime")) { - startTime = Long.parseLong(args[++i]) * 1000; - } else if (args[i].equals("-operation")) { - operation = args[++i]; + /** + * Mapper base implementation + */ + public void configure(JobConf conf) { + setConf(conf); + } + + /** + * Mapper base implementation + */ + public void close() throws IOException { + } + + /** + * Returns when the current number of seconds from the epoch equals + * the command line argument given by <code>-startTime</code>. + * This allows multiple instances of this program, running on clock + * synchronized nodes, to start at roughly the same time. + */ + private boolean barrier() { + Configuration conf = filesystem.getConf(); + long startTime = conf.getLong("test.nnbench.starttime", 0l); + long currentTime = System.currentTimeMillis(); + long sleepTime = startTime - currentTime; + boolean retVal = false; + + // If the sleep time is greater than 0, then sleep and return + if (sleepTime > 0) { + TaskTracker.LOG.info("Waiting in barrier for: " + sleepTime + " ms"); + + try { + Thread.sleep(sleepTime); + retVal = true; + } catch (Exception e) { + retVal = false; + } + } + + return retVal; + } + + /** + * Map method + */ + public void map(Text key, + LongWritable value, + OutputCollector<Text, Text> output, + Reporter reporter) throws IOException { + Configuration conf = filesystem.getConf(); + + numberOfFiles = conf.getLong("test.nnbench.numberoffiles", 1l); + blkSize = conf.getLong("test.nnbench.blocksize", 1l); + replFactor = (short) (conf.getInt("test.nnbench.replicationfactor", 1)); + bytesToWrite = conf.getInt("test.nnbench.bytestowrite", 0); + baseDir = conf.get("test.nnbench.basedir"); + dataDirName = conf.get("test.nnbench.datadir.name"); + op = conf.get("test.nnbench.operation"); + readFile = conf.getBoolean("test.nnbench.readFileAfterOpen", false); + + long totalTimeTPmS = 0l; + long startTimeTPmS = 0l; + long endTimeTPms = 0l; + + numOfExceptions = 0; + startTimeAL = 0l; + totalTimeAL1 = 0l; + totalTimeAL2 = 0l; + successfulFileOps = 0l; + + if (barrier()) { + if (op.equals(OP_CREATE_WRITE)) { + startTimeTPmS = System.currentTimeMillis(); + doCreateWriteOp("file_" + hostName + "_", output, reporter); + } else if (op.equals(OP_OPEN_READ)) { + startTimeTPmS = System.currentTimeMillis(); + doOpenReadOp("file_" + hostName + "_", output, reporter); + } else if (op.equals(OP_RENAME)) { + startTimeTPmS = System.currentTimeMillis(); + doRenameOp("file_" + hostName + "_", output, reporter); + } else if (op.equals(OP_DELETE)) { + startTimeTPmS = System.currentTimeMillis(); + doDeleteOp("file_" + hostName + "_", output, reporter); + } + + endTimeTPms = System.currentTimeMillis(); + totalTimeTPmS = endTimeTPms - startTimeTPmS; } else { - System.out.println(usage); - System.exit(-1); + output.collect(new Text("l:latemaps"), new Text("1")); } + + // collect after the map end time is measured + output.collect(new Text("l:totalTimeAL1"), + new Text(String.valueOf(totalTimeAL1))); + output.collect(new Text("l:totalTimeAL2"), + new Text(String.valueOf(totalTimeAL2))); + output.collect(new Text("l:numOfExceptions"), + new Text(String.valueOf(numOfExceptions))); + output.collect(new Text("l:successfulFileOps"), + new Text(String.valueOf(successfulFileOps))); + output.collect(new Text("l:totalTimeTPmS"), + new Text(String.valueOf(totalTimeTPmS))); + output.collect(new Text("min:mapStartTimeTPmS"), + new Text(String.valueOf(startTimeTPmS))); + output.collect(new Text("max:mapEndTimeTPmS"), + new Text(String.valueOf(endTimeTPms))); } - bytesPerFile = bytesPerBlock * blocksPerFile; - JobConf jobConf = new JobConf(new Configuration(), NNBench.class); + /** + * Create and Write operation. + */ + private void doCreateWriteOp(String name, + OutputCollector<Text, Text> output, + Reporter reporter) { + FSDataOutputStream out = null; + byte[] buffer = new byte[bytesToWrite]; + + for (long l = 0l; l < numberOfFiles; l++) { + Path filePath = new Path(new Path(baseDir, dataDirName), + name + "_" + l); + + boolean successfulOp = false; + while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) { + try { + // Set up timer for measuring AL (transaction #1) + startTimeAL = System.currentTimeMillis(); + // Create the file + // Use a buffer size of 512 + out = filesystem.create(filePath, + true, + 512, + replFactor, + blkSize); + out.write(buffer); + totalTimeAL1 += (System.currentTimeMillis() - startTimeAL); + + // Close the file / file output stream + // Set up timers for measuring AL (transaction #2) + startTimeAL = System.currentTimeMillis(); + out.close(); + + totalTimeAL2 += (System.currentTimeMillis() - startTimeAL); + successfulOp = true; + successfulFileOps ++; + } catch (IOException e) { + TaskTracker.LOG.info("Exception recorded in op: " + + "Create/Write/Close"); + + numOfExceptions++; + } + } + } + } - if ( bytesPerChecksum < 0 ) { // if it is not set in cmdline - bytesPerChecksum = jobConf.getInt("io.bytes.per.checksum", 512); + /** + * Open operation + */ + private void doOpenReadOp(String name, + OutputCollector<Text, Text> output, + Reporter reporter) { + FSDataInputStream input = null; + byte[] buffer = new byte[bytesToWrite]; + + for (long l = 0l; l < numberOfFiles; l++) { + Path filePath = new Path(new Path(baseDir, dataDirName), + name + "_" + l); + + boolean successfulOp = false; + while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) { + try { + // Set up timer for measuring AL + startTimeAL = System.currentTimeMillis(); + input = filesystem.open(filePath); + totalTimeAL1 += (System.currentTimeMillis() - startTimeAL); + + // If the file needs to be read (specified at command line) + if (readFile) { + startTimeAL = System.currentTimeMillis(); + input.readFully(buffer); + + totalTimeAL2 += (System.currentTimeMillis() - startTimeAL); + } + input.close(); + successfulOp = true; + successfulFileOps ++; + } catch (IOException e) { + TaskTracker.LOG.info("Exception recorded in op: OpenRead " + e); + numOfExceptions++; + } + } + } } - jobConf.set("io.bytes.per.checksum", Integer.toString(bytesPerChecksum)); - System.out.println("Inputs: "); - System.out.println(" operation: " + operation); - System.out.println(" baseDir: " + baseDir); - System.out.println(" startTime: " + startTime); - System.out.println(" numFiles: " + numFiles); - System.out.println(" blocksPerFile: " + blocksPerFile); - System.out.println(" bytesPerBlock: " + bytesPerBlock); - System.out.println(" bytesPerChecksum: " + bytesPerChecksum); - - if (operation == null || // verify args - baseDir == null || - numFiles < 1 || - blocksPerFile < 1 || - bytesPerBlock < 0 || - bytesPerBlock % bytesPerChecksum != 0) - { - System.err.println(usage); - System.exit(-1); + /** + * Rename operation + */ + private void doRenameOp(String name, + OutputCollector<Text, Text> output, + Reporter reporter) { + for (long l = 0l; l < numberOfFiles; l++) { + Path filePath = new Path(new Path(baseDir, dataDirName), + name + "_" + l); + Path filePathR = new Path(new Path(baseDir, dataDirName), + name + "_r_" + l); + + boolean successfulOp = false; + while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) { + try { + // Set up timer for measuring AL + startTimeAL = System.currentTimeMillis(); + filesystem.rename(filePath, filePathR); + totalTimeAL1 += (System.currentTimeMillis() - startTimeAL); + + successfulOp = true; + successfulFileOps ++; + } catch (IOException e) { + TaskTracker.LOG.info("Exception recorded in op: Rename"); + + numOfExceptions++; + } + } } + } - fileSys = FileSystem.get(jobConf); - uniqueId = java.net.InetAddress.getLocalHost().getHostName(); - taskDir = new Path(baseDir, uniqueId); - // initialize buffer used for writing/reading file - buffer = new byte[(int) Math.min(bytesPerFile, 32768L)]; - - Date execTime; - Date endTime; - long duration; - int exceptions = 0; - barrier(); // wait for coordinated start time - execTime = new Date(); - System.out.println("Job started: " + startTime); - if (operation.equals("createWrite")) { - if (!fileSys.mkdirs(taskDir)) { - throw new IOException("Mkdirs failed to create " + taskDir.toString()); - } - exceptions = createWrite(); - } else if (operation.equals("openRead")) { - exceptions = openRead(); - } else if (operation.equals("rename")) { - exceptions = rename(); - } else if (operation.equals("delete")) { - exceptions = delete(); - } else { - System.err.println(usage); - System.exit(-1); - } - endTime = new Date(); - System.out.println("Job ended: " + endTime); - duration = (endTime.getTime() - execTime.getTime()) /1000; - System.out.println("The " + operation + " job took " + duration + " seconds."); - System.out.println("The job recorded " + exceptions + " exceptions."); + /** + * Delete operation + */ + private void doDeleteOp(String name, + OutputCollector<Text, Text> output, + Reporter reporter) { + for (long l = 0l; l < numberOfFiles; l++) { + Path filePath = new Path(new Path(baseDir, dataDirName), + name + "_" + l); + + boolean successfulOp = false; + while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) { + try { + // Set up timer for measuring AL + startTimeAL = System.currentTimeMillis(); + filesystem.delete(filePath); + totalTimeAL1 += (System.currentTimeMillis() - startTimeAL); + + successfulOp = true; + successfulFileOps ++; + } catch (IOException e) { + TaskTracker.LOG.info("Exception in recorded op: Delete"); + + numOfExceptions++; + } + } + } + } + } + + /** + * Reducer class + */ + static class NNBenchReducer extends MapReduceBase + implements Reducer<Text, Text, Text, Text> { + + protected String hostName; + + public NNBenchReducer () { + TaskTracker.LOG.info("Starting NNBenchReducer !!!"); + try { + hostName = java.net.InetAddress.getLocalHost().getHostName(); + } catch(Exception e) { + hostName = "localhost"; + } + TaskTracker.LOG.info("Starting NNBenchReducer on " + hostName); + } + + /** + * Reduce method + */ + public void reduce(Text key, + Iterator<Text> values, + OutputCollector<Text, Text> output, + Reporter reporter + ) throws IOException { + String field = key.toString(); + + reporter.setStatus("starting " + field + " ::host = " + hostName); + + // sum long values + if (field.startsWith("l:")) { + long lSum = 0; + while (values.hasNext()) { + lSum += Long.parseLong(values.next().toString()); + } + output.collect(key, new Text(String.valueOf(lSum))); + } + + if (field.startsWith("min:")) { + long minVal = -1; + while (values.hasNext()) { + long value = Long.parseLong(values.next().toString()); + + if (minVal == -1) { + minVal = value; + } else { + if (value != 0 && value < minVal) { + minVal = value; + } + } + } + output.collect(key, new Text(String.valueOf(minVal))); + } + + if (field.startsWith("max:")) { + long maxVal = -1; + while (values.hasNext()) { + long value = Long.parseLong(values.next().toString()); + + if (maxVal == -1) { + maxVal = value; + } else { + if (value > maxVal) { + maxVal = value; + } + } + } + output.collect(key, new Text(String.valueOf(maxVal))); + } + + reporter.setStatus("finished " + field + " ::host = " + hostName); + } } }