Author: shv Date: Wed Jan 16 17:11:51 2008 New Revision: 612670 URL: http://svn.apache.org/viewvc?rev=612670&view=rev Log: HADOOP-2449. A return of the non-MR version of NNBench. Contributed by Sanjay Radia.
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBenchWithoutMR.java Modified: lucene/hadoop/trunk/CHANGES.txt Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=612670&r1=612669&r2=612670&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Jan 16 17:11:51 2008 @@ -245,6 +245,9 @@ Add support for accessing instrumentation statistics via JMX. (Sanjay radia via dhruba) + HADOOP-2449. A return of the non-MR version of NNBench. + (Sanjay Radia via shv) + OPTIMIZATIONS HADOOP-1898. Release the lock protecting the last time of the last stack Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBenchWithoutMR.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBenchWithoutMR.java?rev=612670&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBenchWithoutMR.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNBenchWithoutMR.java Wed Jan 16 17:11:51 2008 @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.dfs; + +import java.io.IOException; +import java.util.Date; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.StringUtils; + +/** + * This program executes a specified operation that applies load to + * the NameNode. Possible operations include create/writing files, + * opening/reading files, renaming files, and deleting files. + * + * When run simultaneously on multiple nodes, this program functions + * as a stress-test and benchmark for namenode, especially when + * the number of bytes written to each file is small. + * + * This version does not use the map reduce framework + * + */ +public class NNBenchWithoutMR { + + private static final Log LOG = LogFactory.getLog( + "org.apache.hadoop.dfs.NNBench"); + + // variable initialzed from command line arguments + private static long startTime = 0; + private static int numFiles = 0; + private static long bytesPerBlock = 1; + private static long blocksPerFile = 0; + private static long bytesPerFile = 1; + private static Path baseDir = null; + + // variables initialized in main() + private static FileSystem fileSys = null; + private static Path taskDir = null; + private static String uniqueId = null; + private static byte[] buffer; + private static long maxExceptionsPerFile = 200; + + /** + * Returns when the current number of seconds from the epoch equals + * the command line argument given by <code>-startTime</code>. + * This allows multiple instances of this program, running on clock + * synchronized nodes, to start at roughly the same time. + */ + static void barrier() { + long sleepTime; + while ((sleepTime = startTime - System.currentTimeMillis()) > 0) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException ex) { + } + } + } + + static private void handleException(String operation, Throwable e, + int singleFileExceptions) { + LOG.warn("Exception while " + operation + ": " + + StringUtils.stringifyException(e)); + if (singleFileExceptions >= maxExceptionsPerFile) { + throw new RuntimeException(singleFileExceptions + + " exceptions for a single file exceeds threshold. Aborting"); + } + } + + /** + * Create and write to a given number of files. Repeat each remote + * operation until is suceeds (does not throw an exception). + * + * @return the number of exceptions caught + */ + static int createWrite() { + int totalExceptions = 0; + FSDataOutputStream out = null; + boolean success = false; + for (int index = 0; index < numFiles; index++) { + int singleFileExceptions = 0; + do { // create file until is succeeds or max exceptions reached + try { + out = fileSys.create( + new Path(taskDir, "" + index), false, 512, (short)1, bytesPerBlock); + success = true; + } catch (IOException ioe) { + success=false; + totalExceptions++; + handleException("creating file #" + index, ioe, ++singleFileExceptions); + } + } while (!success); + long toBeWritten = bytesPerFile; + while (toBeWritten > 0) { + int nbytes = (int) Math.min(buffer.length, toBeWritten); + toBeWritten -= nbytes; + try { // only try once + out.write(buffer, 0, nbytes); + } catch (IOException ioe) { + totalExceptions++; + handleException("writing to file #" + index, ioe, ++singleFileExceptions); + } + } + do { // close file until is succeeds + try { + out.close(); + success = true; + } catch (IOException ioe) { + success=false; + totalExceptions++; + handleException("closing file #" + index, ioe, ++singleFileExceptions); + } + } while (!success); + } + return totalExceptions; + } + + /** + * Open and read a given number of files. + * + * @return the number of exceptions caught + */ + static int openRead() { + int totalExceptions = 0; + FSDataInputStream in = null; + for (int index = 0; index < numFiles; index++) { + int singleFileExceptions = 0; + try { + in = fileSys.open(new Path(taskDir, "" + index), 512); + long toBeRead = bytesPerFile; + while (toBeRead > 0) { + int nbytes = (int) Math.min(buffer.length, toBeRead); + toBeRead -= nbytes; + try { // only try once + in.read(buffer, 0, nbytes); + } catch (IOException ioe) { + totalExceptions++; + handleException("reading from file #" + index, ioe, ++singleFileExceptions); + } + } + in.close(); + } catch (IOException ioe) { + totalExceptions++; + handleException("opening file #" + index, ioe, ++singleFileExceptions); + } + } + return totalExceptions; + } + + /** + * Rename a given number of files. Repeat each remote + * operation until is suceeds (does not throw an exception). + * + * @return the number of exceptions caught + */ + static int rename() { + int totalExceptions = 0; + boolean success = false; + for (int index = 0; index < numFiles; index++) { + int singleFileExceptions = 0; + do { // rename file until is succeeds + try { + boolean result = fileSys.rename( + new Path(taskDir, "" + index), new Path(taskDir, "A" + index)); + success = true; + } catch (IOException ioe) { + success=false; + totalExceptions++; + handleException("creating file #" + index, ioe, ++singleFileExceptions); + } + } while (!success); + } + return totalExceptions; + } + + /** + * Delete a given number of files. Repeat each remote + * operation until is suceeds (does not throw an exception). + * + * @return the number of exceptions caught + */ + static int delete() { + int totalExceptions = 0; + boolean success = false; + for (int index = 0; index < numFiles; index++) { + int singleFileExceptions = 0; + do { // delete file until is succeeds + try { + boolean result = fileSys.delete(new Path(taskDir, "A" + index)); + success = true; + } catch (IOException ioe) { + success=false; + totalExceptions++; + handleException("creating file #" + index, ioe, ++singleFileExceptions); + } + } while (!success); + } + return totalExceptions; + } + + /** + * This launches a given namenode operation (<code>-operation</code>), + * starting at a given time (<code>-startTime</code>). The files used + * by the openRead, rename, and delete operations are the same files + * created by the createWrite operation. Typically, the program + * would be run four times, once for each operation in this order: + * createWrite, openRead, rename, delete. + * + * <pre> + * Usage: nnbench + * -operation <one of createWrite, openRead, rename, or delete> + * -baseDir <base output/input DFS path> + * -startTime <time to start, given in seconds from the epoch> + * -numFiles <number of files to create, read, rename, or delete> + * -blocksPerFile <number of blocks to create per file> + * [-bytesPerBlock <number of bytes to write to each block, default is 1>] + * [-bytesPerChecksum <value for io.bytes.per.checksum>] + * </pre> + * + * @throws IOException indicates a problem with test startup + */ + public static void main(String[] args) throws IOException { + String version = "NameNodeBenchmark.0.3"; + System.out.println(version); + int bytesPerChecksum = -1; + + String usage = + "Usage: nnbench " + + " -operation <one of createWrite, openRead, rename, or delete> " + + " -baseDir <base output/input DFS path> " + + " -startTime <time to start, given in seconds from the epoch> " + + " -numFiles <number of files to create> " + + " -blocksPerFile <number of blocks to create per file> " + + " [-bytesPerBlock <number of bytes to write to each block, default is 1>] " + + " [-bytesPerChecksum <value for io.bytes.per.checksum>]" + + "Note: bytesPerBlock MUST be a multiple of bytesPerChecksum"; + + String operation = null; + for (int i = 0; i < args.length; i++) { // parse command line + if (args[i].equals("-baseDir")) { + baseDir = new Path(args[++i]); + } else if (args[i].equals("-numFiles")) { + numFiles = Integer.parseInt(args[++i]); + } else if (args[i].equals("-blocksPerFile")) { + blocksPerFile = Integer.parseInt(args[++i]); + } else if (args[i].equals("-bytesPerBlock")) { + bytesPerBlock = Long.parseLong(args[++i]); + } else if (args[i].equals("-bytesPerChecksum")) { + bytesPerChecksum = Integer.parseInt(args[++i]); + } else if (args[i].equals("-startTime")) { + startTime = Long.parseLong(args[++i]) * 1000; + } else if (args[i].equals("-operation")) { + operation = args[++i]; + } else { + System.out.println(usage); + System.exit(-1); + } + } + bytesPerFile = bytesPerBlock * blocksPerFile; + + JobConf jobConf = new JobConf(new Configuration(), NNBench.class); + + if ( bytesPerChecksum < 0 ) { // if it is not set in cmdline + bytesPerChecksum = jobConf.getInt("io.bytes.per.checksum", 512); + } + jobConf.set("io.bytes.per.checksum", Integer.toString(bytesPerChecksum)); + + System.out.println("Inputs: "); + System.out.println(" operation: " + operation); + System.out.println(" baseDir: " + baseDir); + System.out.println(" startTime: " + startTime); + System.out.println(" numFiles: " + numFiles); + System.out.println(" blocksPerFile: " + blocksPerFile); + System.out.println(" bytesPerBlock: " + bytesPerBlock); + System.out.println(" bytesPerChecksum: " + bytesPerChecksum); + + if (operation == null || // verify args + baseDir == null || + numFiles < 1 || + blocksPerFile < 1 || + bytesPerBlock < 0 || + bytesPerBlock % bytesPerChecksum != 0) + { + System.err.println(usage); + System.exit(-1); + } + + fileSys = FileSystem.get(jobConf); + uniqueId = java.net.InetAddress.getLocalHost().getHostName(); + taskDir = new Path(baseDir, uniqueId); + // initialize buffer used for writing/reading file + buffer = new byte[(int) Math.min(bytesPerFile, 32768L)]; + + Date execTime; + Date endTime; + long duration; + int exceptions = 0; + barrier(); // wait for coordinated start time + execTime = new Date(); + System.out.println("Job started: " + startTime); + if (operation.equals("createWrite")) { + if (!fileSys.mkdirs(taskDir)) { + throw new IOException("Mkdirs failed to create " + taskDir.toString()); + } + exceptions = createWrite(); + } else if (operation.equals("openRead")) { + exceptions = openRead(); + } else if (operation.equals("rename")) { + exceptions = rename(); + } else if (operation.equals("delete")) { + exceptions = delete(); + } else { + System.err.println(usage); + System.exit(-1); + } + endTime = new Date(); + System.out.println("Job ended: " + endTime); + duration = (endTime.getTime() - execTime.getTime()) /1000; + System.out.println("The " + operation + " job took " + duration + " seconds."); + System.out.println("The job recorded " + exceptions + " exceptions."); + } +}