Author: cutting Date: Thu Dec 7 10:12:19 2006 New Revision: 483588 URL: http://svn.apache.org/viewvc?view=rev&rev=483588 Log: HADOOP-763. Change DFS namenode benchmark to not use MapReduce. Contributed by Nigel.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=483588&r1=483587&r2=483588 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Dec 7 10:12:19 2006 @@ -8,6 +8,9 @@ 2. HADOOP-779. Fix contrib/streaming to work correctly with gzipped input files. (Hairong Kuang via cutting) + 3. HADOOP-763. Change DFS namenode benchmark to not use MapReduce. + (Nigel Daley via cutting) + Release 0.9.0 - 2006-12-01 Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java?view=diff&rev=483588&r1=483587&r2=483588 ============================================================================== --- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java (original) +++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/NNBench.java Thu Dec 7 10:12:19 2006 @@ -20,206 +20,269 @@ import java.io.IOException; import java.util.Date; -import java.util.Iterator; -import java.util.Random; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FSOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.mapred.ClusterStatus; -import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.util.Progressable; /** - * This program uses map/reduce to run a distributed job where there is - * no interaction between the tasks. Each task creates a configurable - * number of files. Each file has a configurable number of bytes - * written to it, then it is closed, re-opened, and read from, and - * re-closed. This program functions as a stress-test and benchmark - * for namenode, especially when the number of bytes written to - * each file is small. + * This program executes a specified operation that applies load to + * the NameNode. Possible operations include create/writing files, + * opening/reading files, renaming files, and deleting files. * - * @author Milind Bhandarkar + * When run simultaneously on multiple nodes, this program functions + * as a stress-test and benchmark for namenode, especially when + * the number of bytes written to each file is small. + * + * @author Nigel Daley */ -public class NNBench extends MapReduceBase implements Reducer { - - public static class Map extends MapReduceBase implements Mapper { - private FileSystem fileSys = null; - private int numBytesToWrite; - private Random random = new Random(); - private String taskId = null; - private Path topDir = null; - - private void randomizeBytes(byte[] data, int offset, int length) { - for(int i=offset + length - 1; i >= offset; --i) { - data[i] = (byte) random.nextInt(256); +public class NNBench { + // variable initialzed from command line arguments + private static long startTime = 0; + private static int numFiles = 0; + private static long bytesPerBlock = 1; + private static long blocksPerFile = 0; + private static long bytesPerFile = 1; + private static Path baseDir = null; + + // variables initialized in main() + private static FileSystem fileSys = null; + private static Path taskDir = null; + private static String uniqueId = null; + private static byte[] buffer; + + /** + * Returns when the current number of seconds from the epoch equals + * the command line argument given by <code>-startTime</code>. + * This allows multiple instances of this program, running on clock + * synchronized nodes, to start at roughly the same time. + */ + static void barrier() { + long sleepTime; + while ((sleepTime = startTime - System.currentTimeMillis()) > 0) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException ex) { + } } } /** - * Given a number of files to create, create and open those files - * for both writing and reading a given number of bytes. + * Create and write to a given number of files. Repeat each remote + * operation until is suceeds (does not throw an exception). + * + * @return the number of exceptions caught */ - public void map(WritableComparable key, - Writable value, - OutputCollector output, - Reporter reporter) throws IOException { - int nFiles = ((IntWritable) value).get(); - Path taskDir = new Path(topDir, taskId); - if (!fileSys.mkdirs(taskDir)) { - throw new IOException("Mkdirs failed to create " + taskDir.toString()); - } - byte[] buffer = new byte[32768]; - for (int index = 0; index < nFiles; index++) { - FSDataOutputStream out = fileSys.create( - new Path(taskDir, Integer.toString(index))); - int toBeWritten = numBytesToWrite; + static int createWrite() { + int exceptions = 0; + FSOutputStream out = null; + boolean success = false; + for (int index = 0; index < numFiles; index++) { + do { // create file until is succeeds + try { + out = fileSys.createRaw( + new Path(taskDir, "" + index), false, (short)1, bytesPerBlock); + success = true; + } catch (IOException ioe) { success=false; exceptions++; } + } while (!success); + long toBeWritten = bytesPerFile; while (toBeWritten > 0) { - int nbytes = Math.min(buffer.length, toBeWritten); - randomizeBytes(buffer, 0, nbytes); + int nbytes = (int) Math.min(buffer.length, toBeWritten); toBeWritten -= nbytes; - out.write(buffer, 0, nbytes); - reporter.setStatus("wrote " + (numBytesToWrite-toBeWritten) + - " bytes for "+ index +"th file."); + try { // only try once + out.write(buffer, 0, nbytes); + } catch (IOException ioe) { + exceptions++; + } } - out.close(); + do { // close file until is succeeds + try { + out.close(); + success = true; + } catch (IOException ioe) { success=false; exceptions++; } + } while (!success); } - for (int index = 0; index < nFiles; index++) { - FSDataInputStream in = fileSys.open( - new Path(taskDir, Integer.toString(index))); - int toBeRead = numBytesToWrite; - while (toBeRead > 0) { - int nbytes = Math.min(buffer.length, toBeRead); - toBeRead -= nbytes; - in.read(buffer, 0, nbytes); - reporter.setStatus("read " + (numBytesToWrite-toBeRead) + - " bytes for "+ index +"th file."); + return exceptions; + } + + /** + * Open and read a given number of files. + * + * @return the number of exceptions caught + */ + static int openRead() { + int exceptions = 0; + FSInputStream in = null; + for (int index = 0; index < numFiles; index++) { + try { + in = fileSys.openRaw(new Path(taskDir, "" + index)); + long toBeRead = bytesPerFile; + while (toBeRead > 0) { + int nbytes = (int) Math.min(buffer.length, toBeRead); + toBeRead -= nbytes; + try { // only try once + in.read(buffer, 0, nbytes); + } catch (IOException ioe) { + exceptions++; + } + } + in.close(); + } catch (IOException ioe) { + exceptions++; } - in.close(); } - fileSys.delete(taskDir); // clean up after yourself - } + return exceptions; + } + + /** + * Rename a given number of files. Repeat each remote + * operation until is suceeds (does not throw an exception). + * + * @return the number of exceptions caught + */ + static int rename() { + int exceptions = 0; + boolean success = false; + for (int index = 0; index < numFiles; index++) { + do { // rename file until is succeeds + try { + boolean result = fileSys.renameRaw( + new Path(taskDir, "" + index), new Path(taskDir, "A" + index)); + success = true; + } catch (IOException ioe) { success=false; exceptions++; } + } while (!success); + } + return exceptions; + } /** - * Save the values out of the configuaration that we need to write - * the data. + * Delete a given number of files. Repeat each remote + * operation until is suceeds (does not throw an exception). + * + * @return the number of exceptions caught */ - public void configure(JobConf job) { - try { - fileSys = FileSystem.get(job); - } catch (IOException e) { - throw new RuntimeException("Can't get default file system", e); - } - numBytesToWrite = job.getInt("test.nnbench.bytes_per_file", 0); - topDir = new Path(job.get("test.nnbench.topdir", "/nnbench")); - taskId = job.get("mapred.task.id", (new Long(random.nextLong())).toString()); - } - - } - - public void reduce(WritableComparable key, - Iterator values, - OutputCollector output, - Reporter reporter) throws IOException { - // nothing - } - + static int delete() { + int exceptions = 0; + boolean success = false; + for (int index = 0; index < numFiles; index++) { + do { // delete file until is succeeds + try { + boolean result = fileSys.deleteRaw(new Path(taskDir, "A" + index)); + success = true; + } catch (IOException ioe) { success=false; exceptions++; } + } while (!success); + } + return exceptions; + } + /** - * This is the main routine for launching a distributed namenode stress test. - * It runs 10 maps/node. The reduce doesn't do anything. - * - * @throws IOException + * This launches a given namenode operation (<code>-operation</code>), + * starting at a given time (<code>-startTime</code>). The files used + * by the openRead, rename, and delete operations are the same files + * created by the createWrite operation. Typically, the program + * would be run four times, once for each operation in this order: + * createWrite, openRead, rename, delete. + * + * <pre> + * Usage: nnbench + * -operation <one of createWrite, openRead, rename, or delete> + * -baseDir <base output/input DFS path> + * -startTime <time to start, given in seconds from the epoch> + * -numFiles <number of files to create, read, rename, or delete> + * -blocksPerFile <number of blocks to create per file> + * [-bytesPerBlock <number of bytes to write to each block, default is 1>] + * </pre> + * + * @throws IOException indicates a problem with test startup */ public static void main(String[] args) throws IOException { - Configuration defaults = new Configuration(); - if (args.length != 3) { - System.out.println("Usage: nnbench <out-dir> <filesPerMap> <bytesPerFile>"); - return; - } - Path outDir = new Path(args[0]); - int filesPerMap = Integer.parseInt(args[1]); - int numBytesPerFile = Integer.parseInt(args[2]); - - JobConf jobConf = new JobConf(defaults, NNBench.class); - jobConf.setJobName("nnbench"); - jobConf.setInt("test.nnbench.bytes_per_file", numBytesPerFile); - jobConf.set("test.nnbench.topdir", args[0]); - - // turn off speculative execution, because DFS doesn't handle - // multiple writers to the same file. - jobConf.setSpeculativeExecution(false); - jobConf.setInputFormat(SequenceFileInputFormat.class); - jobConf.setOutputKeyClass(BytesWritable.class); - jobConf.setOutputValueClass(BytesWritable.class); - - jobConf.setMapperClass(Map.class); - jobConf.setReducerClass(NNBench.class); - - JobClient client = new JobClient(jobConf); - ClusterStatus cluster = client.getClusterStatus(); - int numMaps = cluster.getTaskTrackers() * - jobConf.getInt("test.nnbench.maps_per_host", 10); - jobConf.setNumMapTasks(numMaps); - System.out.println("Running " + numMaps + " maps."); - jobConf.setNumReduceTasks(1); - - Path tmpDir = new Path("random-work"); - Path inDir = new Path(tmpDir, "in"); - Path fakeOutDir = new Path(tmpDir, "out"); - FileSystem fileSys = FileSystem.get(jobConf); - if (fileSys.exists(outDir)) { - System.out.println("Error: Output directory " + outDir + - " already exists."); - return; - } - fileSys.delete(tmpDir); - if (!fileSys.mkdirs(inDir)) { - System.out.println("Error: Mkdirs failed to create " + - inDir.toString()); - return; - } - - for(int i=0; i < numMaps; ++i) { - Path file = new Path(inDir, "part"+i); - SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, - jobConf, file, - IntWritable.class, IntWritable.class, - CompressionType.NONE, - (Progressable)null); - writer.append(new IntWritable(0), new IntWritable(filesPerMap)); - writer.close(); + String version = "NameNodeBenchmark.0.3"; + System.out.println(version); + + String usage = + "Usage: nnbench " + + "-operation <one of createWrite, openRead, rename, or delete> " + + "-baseDir <base output/input DFS path> " + + "-startTime <time to start, given in seconds from the epoch> " + + "-numFiles <number of files to create> " + + "-blocksPerFile <number of blocks to create per file> " + + "[-bytesPerBlock <number of bytes to write to each block, default is 1>]"; + + String operation = null; + for (int i = 0; i < args.length; i++) { // parse command line + if (args[i].equals("-baseDir")) { + baseDir = new Path(args[++i]); + } else if (args[i].equals("-numFiles")) { + numFiles = Integer.parseInt(args[++i]); + } else if (args[i].equals("-blocksPerFile")) { + blocksPerFile = Integer.parseInt(args[++i]); + } else if (args[i].equals("-bytesPerBlock")) { + bytesPerBlock = Long.parseLong(args[++i]); + } else if (args[i].equals("-startTime")) { + startTime = Long.parseLong(args[++i]) * 1000; + } else if (args[i].equals("-operation")) { + operation = args[++i]; + } else { + System.out.println(usage); + System.exit(-1); + } } - jobConf.setInputPath(inDir); - jobConf.setOutputPath(fakeOutDir); + bytesPerFile = bytesPerBlock * blocksPerFile; - // Uncomment to run locally in a single process - //job_conf.set("mapred.job.tracker", "local"); + System.out.println("Inputs: "); + System.out.println(" operation: " + operation); + System.out.println(" baseDir: " + baseDir); + System.out.println(" startTime: " + startTime); + System.out.println(" numFiles: " + numFiles); + System.out.println(" blocksPerFile: " + blocksPerFile); + System.out.println(" bytesPerBlock: " + bytesPerBlock); + + if (operation == null || // verify args + baseDir == null || + numFiles < 1 || + blocksPerFile < 1 || + bytesPerBlock < 0) + { + System.err.println(usage); + System.exit(-1); + } - Date startTime = new Date(); + JobConf jobConf = new JobConf(new Configuration(), NNBench.class); + fileSys = FileSystem.get(jobConf); + uniqueId = java.net.InetAddress.getLocalHost().getHostName(); + taskDir = new Path(baseDir, uniqueId); + // initialize buffer used for writing/reading file + buffer = new byte[(int) Math.min(bytesPerFile, 32768L)]; + + Date execTime; + Date endTime; + long duration; + int exceptions = 0; + barrier(); // wait for coordinated start time + execTime = new Date(); System.out.println("Job started: " + startTime); - try { - JobClient.runJob(jobConf); - Date endTime = new Date(); - System.out.println("Job ended: " + endTime); - System.out.println("The job took " + - (endTime.getTime() - startTime.getTime()) /1000 + " seconds."); - } finally { - fileSys.delete(tmpDir); + if (operation.equals("createWrite")) { + if (!fileSys.mkdirs(taskDir)) { + throw new IOException("Mkdirs failed to create " + taskDir.toString()); + } + exceptions = createWrite(); + } else if (operation.equals("openRead")) { + exceptions = openRead(); + } else if (operation.equals("rename")) { + exceptions = rename(); + } else if (operation.equals("delete")) { + exceptions = delete(); + } else { + System.err.println(usage); + System.exit(-1); } + endTime = new Date(); + System.out.println("Job ended: " + endTime); + duration = (endTime.getTime() - execTime.getTime()) /1000; + System.out.println("The " + operation + " job took " + duration + " seconds."); + System.out.println("The job recorded " + exceptions + " exceptions."); } }