Added: lucene/hadoop/trunk/src/c++/libhdfs/hdfs_write.c URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/c%2B%2B/libhdfs/hdfs_write.c?rev=410637&view=auto ============================================================================== --- lucene/hadoop/trunk/src/c++/libhdfs/hdfs_write.c (added) +++ lucene/hadoop/trunk/src/c++/libhdfs/hdfs_write.c Wed May 31 12:08:38 2006 @@ -0,0 +1,69 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfs.h" + +int main(int argc, char **argv) { + + if (argc != 4) { + fprintf(stderr, "Usage: hdfs_write <filename> <filesize> <buffersize>\n"); + exit(-1); + } + + hdfsFS fs = hdfsConnect("default", 0); + if (!fs) { + fprintf(stderr, "Oops! Failed to connect to hdfs!\n"); + exit(-1); + } + + const char* writeFileName = argv[1]; + tSize fileTotalSize = strtoul(argv[2], NULL, 10); + tSize bufferSize = strtoul(argv[3], NULL, 10); + + hdfsFile writeFile = hdfsOpenFile(fs, writeFileName, O_WRONLY, bufferSize, 0, 0); + if (!writeFile) { + fprintf(stderr, "Failed to open %s for writing!\n", writeFileName); + exit(-2); + } + + // data to be written to the file + char* buffer = malloc(sizeof(char) * bufferSize); + if(buffer == NULL) { + return -2; + } + int i = 0; + for (i=0; i < bufferSize; ++i) { + buffer[i] = 'a' + (i%26); + } + + // write to the file + tSize nrRemaining; + for (nrRemaining = fileTotalSize; nrRemaining > 0; nrRemaining -= bufferSize ) { + int curSize = ( bufferSize < nrRemaining ) ? bufferSize : (int)nrRemaining; + hdfsWrite(fs, writeFile, (void*)buffer, curSize); + } + + free(buffer); + hdfsCloseFile(fs, writeFile); + hdfsDisconnect(fs); + + return 0; +} + +/** + * vim: ts=4: sw=4: et: + */ +
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSCIO.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSCIO.java?rev=410637&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSCIO.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestDFSCIO.java Wed May 31 12:08:38 2006 @@ -0,0 +1,536 @@ + /** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.*; + +import junit.framework.TestCase; +import java.util.logging.*; +import java.util.Date; +import java.util.StringTokenizer; + +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.conf.*; + +/** + * Distributed i/o benchmark. + * <p> + * This test writes into or reads from a specified number of files. + * File size is specified as a parameter to the test. + * Each file is accessed in a separate map task. + * <p> + * The reducer collects the following statistics: + * <ul> + * <li>number of tasks completed</li> + * <li>number of bytes written/read</li> + * <li>execution time</li> + * <li>io rate</li> + * <li>io rate squared</li> + * </ul> + * + * Finally, the following information is appended to a local file + * <ul> + * <li>read or write test</li> + * <li>date and time the test finished</li> + * <li>number of files</li> + * <li>total number of bytes processed</li> + * <li>throughput in mb/sec (total number of bytes / sum of processing times)</li> + * <li>average i/o rate in mb/sec per file</li> + * <li>standard i/o rate deviation</li> + * </ul> + * + * @author Konstantin Shvachko + */ +public class TestDFSCIO extends TestCase { + // Constants + private static final int TEST_TYPE_READ = 0; + private static final int TEST_TYPE_WRITE = 1; + private static final int TEST_TYPE_CLEANUP = 2; + private static final int DEFAULT_BUFFER_SIZE = 1000000; + private static final String BASE_FILE_NAME = "test_io_"; + private static final String DEFAULT_RES_FILE_NAME = "TestDFSCIO_results.log"; + + private static final Logger LOG = InputFormatBase.LOG; + private static Configuration fsConfig = new Configuration(); + private static final long MEGA = 0x100000; + private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSCIO"); + private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control"); + private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write"); + private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read"); + private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data"); + + private static Path HDFS_TEST_DIR = new Path("/tmp/TestDFSCIO"); + private static String HDFS_LIB_VERSION = System.getProperty("libhdfs.version", "1"); + private static String CHMOD = new String("/usr/bin/chmod"); + private static Path HDFS_SHLIB = new Path(HDFS_TEST_DIR + "/libhdfs.so." + HDFS_LIB_VERSION ); + private static Path HDFS_READ = new Path(HDFS_TEST_DIR + "/hdfs_read"); + private static Path HDFS_WRITE = new Path(HDFS_TEST_DIR + "/hdfs_write"); + + /** + * Run the test with default parameters. + * + * @throws Exception + */ + public void testIOs() throws Exception { + testIOs(10, 10); + } + + /** + * Run the test with the specified parameters. + * + * @param fileSize file size + * @param nrFiles number of files + * @throws IOException + */ + public static void testIOs(int fileSize, int nrFiles) + throws IOException { + + FileSystem fs = FileSystem.get(fsConfig); + + createControlFile(fs, fileSize, nrFiles); + writeTest(fs); + readTest(fs); + } + + private static void createControlFile( + FileSystem fs, + int fileSize, // in MB + int nrFiles + ) throws IOException { + LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files"); + + fs.delete(CONTROL_DIR); + + for( int i=0; i < nrFiles; i++ ) { + String name = getFileName(i); + Path controlFile = new Path(CONTROL_DIR, "in_file_" + name); + SequenceFile.Writer writer = null; + try { + writer = new SequenceFile.Writer(fs, controlFile, + UTF8.class, LongWritable.class); + writer.append(new UTF8(name), new LongWritable(fileSize)); + } catch(Exception e) { + throw new IOException(e.getLocalizedMessage()); + } finally { + if( writer != null ) + writer.close(); + writer = null; + } + } + LOG.info("created control files for: "+nrFiles+" files"); + } + + private static String getFileName( int fIdx ) { + return BASE_FILE_NAME + Integer.toString(fIdx); + } + + /** + * Write/Read mapper base class. + * <p> + * Collects the following statistics per task: + * <ul> + * <li>number of tasks completed</li> + * <li>number of bytes written/read</li> + * <li>execution time</li> + * <li>i/o rate</li> + * <li>i/o rate squared</li> + * </ul> + */ + private abstract static class IOStatMapper extends IOMapperBase { + IOStatMapper() { + super(fsConfig); + } + + void collectStats(OutputCollector output, + String name, + long execTime, + Object objSize ) throws IOException { + long totalSize = ((Long)objSize).longValue(); + float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA); + LOG.info("Number of bytes processed = " + totalSize ); + LOG.info("Exec time = " + execTime ); + LOG.info("IO rate = " + ioRateMbSec ); + + output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1))); + output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize))); + output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime))); + output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000))); + output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000))); + } + } + + /** + * Write mapper class. + */ + public static class WriteMapper extends IOStatMapper { + + public WriteMapper() { + super(); + for( int i=0; i < bufferSize; i++ ) + buffer[i] = (byte)('0' + i % 50); + } + + public Object doIO( Reporter reporter, + String name, + long totalSize + ) throws IOException { + // create file + totalSize *= MEGA; + + // create instance of local filesystem + FileSystem localFS = FileSystem.getNamed("local", fsConfig); + + try { + // native runtime + Runtime runTime = Runtime.getRuntime(); + + // copy the dso and executable from dfs and chmod them + synchronized (this) { + localFS.delete(HDFS_TEST_DIR); + if (!(localFS.mkdirs(HDFS_TEST_DIR))) { + throw new IOException("Failed to create " + HDFS_TEST_DIR + " on local filesystem"); + } + } + + synchronized (this) { + if (!localFS.exists(HDFS_SHLIB)) { + FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig); + + String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB); + Process process = runTime.exec(chmodCmd); + int exitStatus = process.waitFor(); + if (exitStatus != 0) { + throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); + } + } + } + + synchronized (this) { + if (!localFS.exists(HDFS_WRITE)) { + FileUtil.copy(fs, HDFS_WRITE, localFS, HDFS_WRITE, false, fsConfig); + + String chmodCmd = new String(CHMOD + " a+x " + HDFS_WRITE); + Process process = runTime.exec(chmodCmd); + int exitStatus = process.waitFor(); + if (exitStatus != 0) { + throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); + } + } + } + + // exec the C program + Path outFile = new Path(DATA_DIR, name); + String writeCmd = new String(HDFS_WRITE + " " + outFile + " " + totalSize + " " + bufferSize); + Process process = runTime.exec(writeCmd, null, new File("/tmp/")); + int exitStatus = process.waitFor(); + if (exitStatus != 0) { + throw new IOException(writeCmd + ": Failed with exitStatus: " + exitStatus); + } + } catch (InterruptedException interruptedException) { + reporter.setStatus(interruptedException.toString()); + } finally { + localFS.close(); + } + return new Long(totalSize); + } + } + + private static void writeTest(FileSystem fs) + throws IOException { + + fs.delete(DATA_DIR); + fs.delete(WRITE_DIR); + + runIOTest( WriteMapper.class, WRITE_DIR ); + } + + private static void runIOTest( Class mapperClass, + Path outputDir + ) throws IOException { + JobConf job = new JobConf( fsConfig, TestDFSCIO.class ); + + job.setInputPath(CONTROL_DIR); + job.setInputFormat(SequenceFileInputFormat.class); + job.setInputKeyClass(UTF8.class); + job.setInputValueClass(LongWritable.class); + + job.setMapperClass(mapperClass); + job.setReducerClass(AccumulatingReducer.class); + + job.setOutputPath(outputDir); + job.setOutputKeyClass(UTF8.class); + job.setOutputValueClass(UTF8.class); + job.setNumReduceTasks(1); + JobClient.runJob(job); + } + + /** + * Read mapper class. + */ + public static class ReadMapper extends IOStatMapper { + + public ReadMapper() { + super(); + } + + public Object doIO( Reporter reporter, + String name, + long totalSize + ) throws IOException { + totalSize *= MEGA; + + // create instance of local filesystem + FileSystem localFS = FileSystem.getNamed("local", fsConfig); + + try { + // native runtime + Runtime runTime = Runtime.getRuntime(); + + // copy the dso and executable from dfs + synchronized (this) { + localFS.delete(HDFS_TEST_DIR); + if (!(localFS.mkdirs(HDFS_TEST_DIR))) { + throw new IOException("Failed to create " + HDFS_TEST_DIR + " on local filesystem"); + } + } + + synchronized (this) { + if (!localFS.exists(HDFS_SHLIB)) { + if (!FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig)) { + throw new IOException("Failed to copy " + HDFS_SHLIB + " to local filesystem"); + } + + String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB); + Process process = runTime.exec(chmodCmd); + int exitStatus = process.waitFor(); + if (exitStatus != 0) { + throw new IOException( chmodCmd + ": Failed with exitStatus: " + exitStatus ); + } + } + } + + synchronized (this) { + if (!localFS.exists(HDFS_READ)) { + if (!FileUtil.copy(fs, HDFS_READ, localFS, HDFS_READ, false, fsConfig)) { + throw new IOException("Failed to copy " + HDFS_READ + " to local filesystem"); + } + + String chmodCmd = new String(CHMOD + " a+x " + HDFS_READ); + Process process = runTime.exec(chmodCmd); + int exitStatus = process.waitFor(); + + if (exitStatus != 0) { + throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); + } + } + } + + // exec the C program + Path inFile = new Path(DATA_DIR, name); + String readCmd = new String(HDFS_READ + " " + inFile + " " + totalSize + " " + + bufferSize); + Process process = runTime.exec(readCmd, null, new File("/tmp/")); + int exitStatus = process.waitFor(); + + if (exitStatus != 0) { + throw new IOException(HDFS_READ + ": Failed with exitStatus: " + exitStatus); + } + } catch (InterruptedException interruptedException) { + reporter.setStatus(interruptedException.toString()); + } finally { + localFS.close(); + } + return new Long(totalSize); + } + } + + private static void readTest(FileSystem fs) throws IOException { + fs.delete(READ_DIR); + runIOTest( ReadMapper.class, READ_DIR ); + } + + private static void sequentialTest( + FileSystem fs, + int testType, + int fileSize, + int nrFiles + ) throws Exception { + IOStatMapper ioer = null; + if( testType == TEST_TYPE_READ ) + ioer = new ReadMapper(); + else if( testType == TEST_TYPE_WRITE ) + ioer = new WriteMapper(); + else + return; + for( int i=0; i < nrFiles; i++) + ioer.doIO(new Reporter() { + public void setStatus(String status) throws IOException {} + }, + BASE_FILE_NAME+Integer.toString(i), + MEGA*fileSize ); + } + + public static void main(String[] args) { + int testType = TEST_TYPE_READ; + int bufferSize = DEFAULT_BUFFER_SIZE; + int fileSize = 1; + int nrFiles = 1; + String resFileName = DEFAULT_RES_FILE_NAME; + boolean isSequential = false; + + String version="TestDFSCIO.0.0.1"; + String usage = "Usage: TestFDSIO -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] "; + + System.out.println(version); + if (args.length == 0) { + System.err.println(usage); + System.exit(-1); + } + for (int i = 0; i < args.length; i++) { // parse command line + if (args[i].startsWith("-r")) { + testType = TEST_TYPE_READ; + } else if (args[i].startsWith("-w")) { + testType = TEST_TYPE_WRITE; + } else if (args[i].startsWith("-clean")) { + testType = TEST_TYPE_CLEANUP; + } else if (args[i].startsWith("-seq")) { + isSequential = true; + } else if (args[i].equals("-nrFiles")) { + nrFiles = Integer.parseInt(args[++i]); + } else if (args[i].equals("-fileSize")) { + fileSize = Integer.parseInt(args[++i]); + } else if (args[i].equals("-bufferSize")) { + bufferSize = Integer.parseInt(args[++i]); + } else if (args[i].equals("-resFile")) { + resFileName = args[++i]; + } + } + + LOG.info("nrFiles = " + nrFiles); + LOG.info("fileSize (MB) = " + fileSize); + LOG.info("bufferSize = " + bufferSize); + + try { + fsConfig.setInt("test.io.file.buffer.size", bufferSize); + FileSystem fs = FileSystem.get(fsConfig); + + if (testType != TEST_TYPE_CLEANUP) { + fs.delete(HDFS_TEST_DIR); + fs.mkdirs(HDFS_TEST_DIR); + + //Copy the executables over to the remote filesystem + String hadoopHome = System.getenv("HADOOP_HOME"); + fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/libhdfs.so." + HDFS_LIB_VERSION), + HDFS_SHLIB); + fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_read"), HDFS_READ); + fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_write"), HDFS_WRITE); + } + + if( isSequential ) { + long tStart = System.currentTimeMillis(); + sequentialTest( fs, testType, fileSize, nrFiles ); + long execTime = System.currentTimeMillis() - tStart; + String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000; + LOG.info( resultLine ); + return; + } + if( testType == TEST_TYPE_CLEANUP ) { + cleanup( fs ); + return; + } + createControlFile(fs, fileSize, nrFiles); + long tStart = System.currentTimeMillis(); + if( testType == TEST_TYPE_WRITE ) + writeTest(fs); + if( testType == TEST_TYPE_READ ) + readTest(fs); + long execTime = System.currentTimeMillis() - tStart; + + analyzeResult( fs, testType, execTime, resFileName ); + } catch( Exception e ) { + System.err.print( e.getLocalizedMessage()); + System.exit(-1); + } + } + + private static void analyzeResult( FileSystem fs, + int testType, + long execTime, + String resFileName + ) throws IOException { + Path reduceFile; + if( testType == TEST_TYPE_WRITE ) + reduceFile = new Path( WRITE_DIR, "part-00000" ); + else + reduceFile = new Path( READ_DIR, "part-00000" ); + DataInputStream in; + in = new DataInputStream(fs.open( reduceFile )); + + BufferedReader lines; + lines = new BufferedReader(new InputStreamReader(in)); + long tasks = 0; + long size = 0; + long time = 0; + float rate = 0; + float sqrate = 0; + String line; + while( (line = lines.readLine()) != null ) { + StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%"); + String attr = tokens.nextToken(); + if( attr.endsWith(":tasks") ) + tasks = Long.parseLong( tokens.nextToken() ); + else if( attr.endsWith(":size") ) + size = Long.parseLong( tokens. nextToken() ); + else if( attr.endsWith(":time") ) + time = Long.parseLong( tokens.nextToken() ); + else if( attr.endsWith(":rate") ) + rate = Float.parseFloat( tokens.nextToken() ); + else if( attr.endsWith(":sqrate") ) + sqrate = Float.parseFloat( tokens.nextToken() ); + } + + double med = rate / 1000 / tasks; + double stdDev = Math.sqrt( Math.abs(sqrate / 1000 / tasks - med*med )); + String resultLines[] = { + "----- TestDFSCIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" : + (testType == TEST_TYPE_READ) ? "read" : + "unknown"), + " Date & time: " + new Date(System.currentTimeMillis()), + " Number of files: " + tasks, + "Total MBytes processed: " + size/MEGA, + " Throughput mb/sec: " + size * 1000.0 / (time * MEGA), + "Average IO rate mb/sec: " + med, + " Std IO rate deviation: " + stdDev, + " Test exec time sec: " + (float)execTime / 1000, + "" }; + + PrintStream res = new PrintStream( + new FileOutputStream( + new File(resFileName), true )); + for( int i = 0; i < resultLines.length; i++ ) { + LOG.info( resultLines[i] ); + res.println( resultLines[i] ); + } + } + + private static void cleanup( FileSystem fs ) throws Exception { + LOG.info( "Cleaning up test files" ); + fs.delete(new Path(TEST_ROOT_DIR)); + fs.delete(HDFS_TEST_DIR); + } +} Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java?rev=410637&r1=410636&r2=410637&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java Wed May 31 12:08:38 2006 @@ -28,6 +28,7 @@ import org.apache.hadoop.ipc.TestIPC; import org.apache.hadoop.ipc.TestRPC; import org.apache.hadoop.fs.TestDFSIO; +import org.apache.hadoop.fs.TestDFSCIO; public class AllTestDriver { @@ -50,6 +51,7 @@ pgd.addClass("testsequencefileinputformat", TestSequenceFileInputFormat.class, "A test for sequence file input format."); pgd.addClass("testtextinputformat", TestTextInputFormat.class, "A test for text input format."); pgd.addClass("TestDFSIO", TestDFSIO.class, "Distributed i/o benchmark."); + pgd.addClass("TestDFSCIO", TestDFSCIO.class, "Distributed i/o benchmark of libhdfs."); pgd.addClass("DistributedFSCheck", TestDFSIO.class, "Distributed checkup of the file system consistency."); pgd.driver(argv); }