Author: acmurthy Date: Fri Jan 11 14:04:59 2008 New Revision: 611315 URL: http://svn.apache.org/viewvc?rev=611315&view=rev Log: HADOOP-1965. Interleave sort/spill in teh map-task along with calls to the Mapper.map method. This is done by splitting the 'io.sort.mb' buffer into two and using one half for collecting map-outputs and the other half for sort/spill. Contributed by Amar Kamat.
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestCollect.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=611315&r1=611314&r2=611315&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Jan 11 14:04:59 2008 @@ -202,6 +202,10 @@ add mapred.reduce.tasks.speculative.execution (Amareshwari Sri Ramadasu via acmurthy) + HADOOP-1965. Interleave sort/spill in teh map-task along with calls to the + Mapper.map method. This is done by splitting the 'io.sort.mb' buffer into + two and using one half for collecting map-outputs and the other half for + sort/spill. (Amar Kamat via acmurthy) OPTIMIZATIONS Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=611315&r1=611314&r2=611315&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Jan 11 14:04:59 2008 @@ -270,7 +270,14 @@ private DataOutputBuffer keyValBuffer; //the buffer where key/val will //be stored before they are - //spilled to disk + //passed on to the pending buffer + private DataOutputBuffer pendingKeyvalBuffer; // the key value buffer used + // while spilling + // a lock used for sync sort-spill with collect + private final Object pendingKeyvalBufferLock = new Object(); + // since sort-spill and collect are done concurrently, exceptions are + // passed through shared variable + private volatile IOException sortSpillException; private int maxBufferSize; //the max amount of in-memory space after which //we will spill the keyValBuffer to disk private int numSpills; //maintains the no. of spills to disk done so far @@ -282,6 +289,7 @@ private Class valClass; private WritableComparator comparator; private BufferSorter []sortImpl; + private BufferSorter []pendingSortImpl; // sort impl for the pending buffer private SequenceFile.Writer writer; private FSDataOutputStream out; private FSDataOutputStream indexOut; @@ -296,7 +304,8 @@ this.partitions = job.getNumReduceTasks(); this.partitioner = (Partitioner)ReflectionUtils.newInstance( job.getPartitionerClass(), job); - maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024; + maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024 / 2; + this.sortSpillException = null; keyValBuffer = new DataOutputBuffer(); this.job = job; @@ -348,8 +357,8 @@ } @SuppressWarnings("unchecked") - public void collect(WritableComparable key, - Writable value) throws IOException { + public synchronized void collect(WritableComparable key, + Writable value) throws IOException { if (key.getClass() != keyClass) { throw new IOException("Type mismatch in key from map: expected " @@ -362,45 +371,88 @@ + value.getClass().getName()); } - synchronized (this) { - if (keyValBuffer == null) { - keyValBuffer = new DataOutputBuffer(); - } - //dump the key/value to buffer - int keyOffset = keyValBuffer.getLength(); - key.write(keyValBuffer); - int keyLength = keyValBuffer.getLength() - keyOffset; - value.write(keyValBuffer); - int valLength = keyValBuffer.getLength() - (keyOffset + keyLength); + // check if the earlier sort-spill generated an exception + if (sortSpillException != null) { + throw sortSpillException; + } - int partNumber = partitioner.getPartition(key, value, partitions); - sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength); + if (keyValBuffer == null) { + keyValBuffer = new DataOutputBuffer(); + sortImpl = new BufferSorter[partitions]; + for (int i = 0; i < partitions; i++) + sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance( + job.getClass("map.sort.class", MergeSorter.class, + BufferSorter.class), job); + } + + //dump the key/value to buffer + int keyOffset = keyValBuffer.getLength(); + key.write(keyValBuffer); + int keyLength = keyValBuffer.getLength() - keyOffset; + value.write(keyValBuffer); + int valLength = keyValBuffer.getLength() - (keyOffset + keyLength); + int partNumber = partitioner.getPartition(key, value, partitions); + sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength); - mapOutputRecordCounter.increment(1); - mapOutputByteCounter.increment(keyValBuffer.getLength() - keyOffset); + mapOutputRecordCounter.increment(1); + mapOutputByteCounter.increment(keyValBuffer.getLength() - keyOffset); - //now check whether we need to spill to disk - long totalMem = 0; - for (int i = 0; i < partitions; i++) - totalMem += sortImpl[i].getMemoryUtilized(); - if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) { - sortAndSpillToDisk(); - //we don't reuse the keyValBuffer. We want to maintain consistency - //in the memory model (for negligible performance loss). - keyValBuffer = null; - for (int i = 0; i < partitions; i++) { - sortImpl[i].close(); + //now check whether we need to spill to disk + long totalMem = 0; + for (int i = 0; i < partitions; i++) + totalMem += sortImpl[i].getMemoryUtilized(); + totalMem += keyValBuffer.getLength(); + if (totalMem >= maxBufferSize) { + // check if the earlier spill is pending + synchronized (pendingKeyvalBufferLock) { + // check if the spill is over, there could be a case where the + // sort-spill is yet to start and collect acquired the lock + while (pendingKeyvalBuffer != null) { + try { + // indicate that we are making progress + this.reporter.progress(); + pendingKeyvalBufferLock.wait(); // wait for the pending spill to + // start and finish sort-spill + } catch (InterruptedException ie) { + LOG.warn("Buffer interrupted while waiting for the writer", ie); + } } + // prepare for spilling + pendingKeyvalBuffer = keyValBuffer; + pendingSortImpl = sortImpl; + keyValBuffer = null; + sortImpl = null; } + + // check if the earlier sort-spill thread generated an exception + if (sortSpillException != null) { + throw sortSpillException; + } + + // Start the sort-spill thread. While the sort and spill takes place + // using the pending variables, the output collector can collect the + // key-value without getting blocked. Thus making key-value collection + // and sort-spill concurrent. + Thread bufferWriter = new Thread() { + public void run() { + synchronized (pendingKeyvalBufferLock) { + sortAndSpillToDisk(); + } + } + }; + bufferWriter.setDaemon(true); // to make sure that the buffer writer + // gets killed if collector is killed. + bufferWriter.setName("SortSpillThread"); + bufferWriter.start(); } } //sort, combine and spill to disk - private void sortAndSpillToDisk() throws IOException { - synchronized (this) { + private void sortAndSpillToDisk() { + try { //approximate the length of the output file to be the length of the //buffer + header lengths for the partitions - long size = keyValBuffer.getLength() + + long size = pendingKeyvalBuffer.getLength() + partitions * APPROX_HEADER_LENGTH; Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(), numSpills, size); @@ -414,9 +466,9 @@ //invoke the sort for (int i = 0; i < partitions; i++) { - sortImpl[i].setInputBuffer(keyValBuffer); - sortImpl[i].setProgressable(reporter); - RawKeyValueIterator rIter = sortImpl[i].sort(); + pendingSortImpl[i].setInputBuffer(pendingKeyvalBuffer); + pendingSortImpl[i].setProgressable(reporter); + RawKeyValueIterator rIter = pendingSortImpl[i].sort(); startPartition(i); if (rIter != null) { @@ -449,6 +501,14 @@ numSpills++; out.close(); indexOut.close(); + } catch (IOException ioe) { + sortSpillException = ioe; + } finally { // make sure that the collector never waits indefinitely + pendingKeyvalBuffer = null; + for (int i = 0; i < partitions; i++) { + pendingSortImpl[i].close(); + } + pendingKeyvalBufferLock.notify(); } } @@ -617,15 +677,46 @@ } } - public void flush() throws IOException + public synchronized void flush() throws IOException { //check whether the length of the key/value buffer is 0. If not, then //we need to spill that to disk. Note that we reset the key/val buffer //upon each spill (so a length > 0 means that we have not spilled yet) - synchronized (this) { - if (keyValBuffer != null && keyValBuffer.getLength() > 0) { + + // check if the earlier spill is pending + synchronized (pendingKeyvalBufferLock) { + // this could mean that either the sort-spill is over or is yet to + // start so make sure that the earlier sort-spill is over. + while (pendingKeyvalBuffer != null) { + try { + // indicate that we are making progress + this.reporter.progress(); + pendingKeyvalBufferLock.wait(); + } catch (InterruptedException ie) { + LOG.info("Buffer interrupted while for the pending spill", ie); + } + } + } + + // check if the earlier sort-spill thread generated an exception + if (sortSpillException != null) { + throw sortSpillException; + } + + if (keyValBuffer != null && keyValBuffer.getLength() > 0) { + // prepare for next spill + synchronized (pendingKeyvalBufferLock) { + pendingKeyvalBuffer = keyValBuffer; + pendingSortImpl = sortImpl; + keyValBuffer = null; + sortImpl = null; sortAndSpillToDisk(); } + } + + // check if the last sort-spill thread generated an exception + if (sortSpillException != null) { + throw sortSpillException; } mergeParts(); } Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestCollect.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestCollect.java?rev=611315&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestCollect.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestCollect.java Fri Jan 11 14:04:59 2008 @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.ThreadedMapBenchmark.RandomInputFormat; + +import junit.framework.TestCase; +import java.io.*; +import java.util.*; + +/** + * TestCollect checks if the collect can handle simultaneous invocations. + */ +public class TestCollect extends TestCase +{ + final static Path OUTPUT_DIR = new Path("build/test/test.collect.output"); + static final int NUM_FEEDERS = 10; + static final int NUM_COLLECTS_PER_THREAD = 1000; + + /** + * Map is a Mapper that spawns threads which simultaneously call collect. + * Each thread has a specific range to write to the buffer and is unique to + * the thread. This is a synchronization test for the map's collect. + */ + + static class Map + implements Mapper<Text, Text, IntWritable, IntWritable> { + + public void configure(JobConf job) { + } + + public void map(Text key, Text val, + final OutputCollector<IntWritable, IntWritable> out, + Reporter reporter) throws IOException { + // Class for calling collect in separate threads + class CollectFeeder extends Thread { + int id; // id for the thread + + public CollectFeeder(int id) { + this.id = id; + } + + public void run() { + for (int j = 1; j <= NUM_COLLECTS_PER_THREAD; j++) { + try { + out.collect(new IntWritable((id * NUM_COLLECTS_PER_THREAD) + j), + new IntWritable(0)); + } catch (IOException ioe) { } + } + } + } + + CollectFeeder [] feeders = new CollectFeeder[NUM_FEEDERS]; + + // start the feeders + for (int i = 0; i < NUM_FEEDERS; i++) { + feeders[i] = new CollectFeeder(i); + feeders[i].start(); + } + // wait for them to finish + for (int i = 0; i < NUM_FEEDERS; i++) { + try { + feeders[i].join(); + } catch (InterruptedException ie) { + throw new IOException(ie.toString()); + } + } + } + + public void close() { + } + } + + static class Reduce + implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { + + static int numSeen; + static int actualSum; + public void configure(JobConf job) { } + + public void reduce(IntWritable key, Iterator<IntWritable> val, + OutputCollector<IntWritable, IntWritable> out, + Reporter reporter) throws IOException { + actualSum += key.get(); // keep the running count of the seen values + numSeen++; // number of values seen so far + + // using '1+2+3+...n = n*(n+1)/2' to validate + int expectedSum = numSeen * (numSeen + 1) / 2; + if (expectedSum != actualSum) { + throw new IOException("Collect test failed!! Ordering mismatch."); + } + } + + public void close() { } + } + + public void configure(JobConf conf) throws IOException { + conf.setJobName("TestCollect"); + conf.setJarByClass(TestCollect.class); + + conf.setInputFormat(RandomInputFormat.class); // for self data generation + conf.setOutputKeyClass(IntWritable.class); + conf.setOutputValueClass(IntWritable.class); + conf.setOutputPath(OUTPUT_DIR); + + conf.setMapperClass(Map.class); + conf.setReducerClass(Reduce.class); + conf.setNumMapTasks(1); + conf.setNumReduceTasks(1); + } + + public void testCollect() throws IOException { + JobConf conf = new JobConf(); + configure(conf); + try { + JobClient.runJob(conf); + // check if all the values were seen by the reducer + if (Reduce.numSeen != (NUM_COLLECTS_PER_THREAD * NUM_FEEDERS)) { + throw new IOException("Collect test failed!! Total does not match."); + } + } catch (IOException ioe) { + throw ioe; + } finally { + FileSystem fs = FileSystem.get(conf); + fs.delete(OUTPUT_DIR); + } + } + + public static void main(String[] args) throws IOException { + new TestCollect().testCollect(); + } +} + Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java?rev=611315&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java Fri Jan 11 14:04:59 2008 @@ -0,0 +1,348 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.io.File; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.examples.RandomWriter; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat; +import org.apache.hadoop.mapred.lib.IdentityMapper; +import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * Distributed threaded map benchmark. + * <p> + * This benchmark generates random data per map and tests the performance + * of having multiple spills (using multiple threads) over having just one + * spill. Following are the parameters that can be specified + * <li>File size per map. + * <li>Number of spills per map. + * <li>Number of maps per host. + * <p> + * Sort is used for benchmarking the performance. + */ + +public class ThreadedMapBenchmark extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(ThreadedMapBenchmark.class); + private static Path BASE_DIR = + new Path(System.getProperty("test.build.data", + File.separator + "benchmarks" + File.separator + + "ThreadedMapBenchmark")); + private static Path INPUT_DIR = new Path(BASE_DIR, "input"); + private static Path OUTPUT_DIR = new Path(BASE_DIR, "output"); + private static final float FACTOR = 2.3f; // io.sort.mb set to + // (FACTOR * data_size) should + // result in only 1 spill + + static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN } + + /** + * A custom input format that creates virtual inputs of a single string + * for each map. Using [EMAIL PROTECTED] RandomWriter} code. + */ + public static class RandomInputFormat implements InputFormat<Text, Text> { + + public void validateInput(JobConf job) throws IOException { + } + + public InputSplit[] getSplits(JobConf job, + int numSplits) throws IOException { + InputSplit[] result = new InputSplit[numSplits]; + Path outDir = job.getOutputPath(); + for(int i=0; i < result.length; ++i) { + result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, + job); + } + return result; + } + + static class RandomRecordReader implements RecordReader<Text, Text> { + Path name; + public RandomRecordReader(Path p) { + name = p; + } + public boolean next(Text key, Text value) { + if (name != null) { + key.set(name.getName()); + name = null; + return true; + } + return false; + } + public Text createKey() { + return new Text(); + } + public Text createValue() { + return new Text(); + } + public long getPos() { + return 0; + } + public void close() {} + public float getProgress() { + return 0.0f; + } + } + + public RecordReader<Text, Text> getRecordReader(InputSplit split, + JobConf job, + Reporter reporter) + throws IOException { + return new RandomRecordReader(((FileSplit) split).getPath()); + } + } + + /** + * Generates random input data of given size with keys and values of given + * sizes. By default it generates 128mb input data with 10 byte keys and 10 + * byte values. + */ + public static class Map extends MapReduceBase + implements Mapper<WritableComparable, Writable, + BytesWritable, BytesWritable> { + + private long numBytesToWrite; + private int minKeySize; + private int keySizeRange; + private int minValueSize; + private int valueSizeRange; + private Random random = new Random(); + private BytesWritable randomKey = new BytesWritable(); + private BytesWritable randomValue = new BytesWritable(); + + private void randomizeBytes(byte[] data, int offset, int length) { + for(int i = offset + length - 1; i >= offset; --i) { + data[i] = (byte) random.nextInt(256); + } + } + + public void map(WritableComparable key, + Writable value, + OutputCollector<BytesWritable, BytesWritable> output, + Reporter reporter) throws IOException { + int itemCount = 0; + while (numBytesToWrite > 0) { + int keyLength = minKeySize + + (keySizeRange != 0 + ? random.nextInt(keySizeRange) + : 0); + randomKey.setSize(keyLength); + randomizeBytes(randomKey.get(), 0, randomKey.getSize()); + int valueLength = minValueSize + + (valueSizeRange != 0 + ? random.nextInt(valueSizeRange) + : 0); + randomValue.setSize(valueLength); + randomizeBytes(randomValue.get(), 0, randomValue.getSize()); + output.collect(randomKey, randomValue); + numBytesToWrite -= keyLength + valueLength; + reporter.incrCounter(Counters.BYTES_WRITTEN, 1); + reporter.incrCounter(Counters.RECORDS_WRITTEN, 1); + if (++itemCount % 200 == 0) { + reporter.setStatus("wrote record " + itemCount + ". " + + numBytesToWrite + " bytes left."); + } + } + reporter.setStatus("done with " + itemCount + " records."); + } + + @Override + public void configure(JobConf job) { + numBytesToWrite = job.getLong("test.tmb.bytes_per_map", + 128 * 1024 * 1024); + minKeySize = job.getInt("test.tmb.min_key", 10); + keySizeRange = job.getInt("test.tmb.max_key", 10) - minKeySize; + minValueSize = job.getInt("test.tmb.min_value", 10); + valueSizeRange = job.getInt("test.tmb.max_value", 10) - minValueSize; + } +} + + /** + * Generate input data for the benchmark + */ + public static void generateInputData(int dataSizePerMap, + int numSpillsPerMap, + int numMapsPerHost, + JobConf masterConf) + throws Exception { + JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class); + job.setJobName("threaded-map-benchmark-random-writer"); + job.setJarByClass(ThreadedMapBenchmark.class); + job.setInputFormat(RandomInputFormat.class); + job.setOutputFormat(SequenceFileOutputFormat.class); + + job.setMapperClass(Map.class); + job.setReducerClass(IdentityReducer.class); + + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(BytesWritable.class); + + JobClient client = new JobClient(job); + ClusterStatus cluster = client.getClusterStatus(); + long totalDataSize = dataSizePerMap * numMapsPerHost + * cluster.getTaskTrackers(); + job.set("test.tmb.bytes_per_map", + String.valueOf(dataSizePerMap * 1024 * 1024)); + job.setNumReduceTasks(0); // none reduce + job.setNumMapTasks(numMapsPerHost * cluster.getTaskTrackers()); + job.setOutputPath(INPUT_DIR); + + FileSystem fs = FileSystem.get(job); + fs.delete(BASE_DIR); + + LOG.info("Generating random input for the benchmark"); + LOG.info("Total data : " + totalDataSize + " mb"); + LOG.info("Data per map: " + dataSizePerMap + " mb"); + LOG.info("Number of spills : " + numSpillsPerMap); + LOG.info("Number of maps per host : " + numMapsPerHost); + LOG.info("Number of hosts : " + cluster.getTaskTrackers()); + + JobClient.runJob(job); // generates the input for the benchmark + } + + /** + * This is the main routine for launching the benchmark. It generates random + * input data. The input is non-splittable. Sort is used for benchmarking. + * This benchmark reports the effect of having multiple sort and spill + * cycles over a single sort and spill. + * + * @throws IOException + */ + public int run (String[] args) throws Exception { + LOG.info("Starting the benchmark for threaded spills"); + String version = "ThreadedMapBenchmark.0.0.1"; + System.out.println(version); + + String usage = + "Usage: threadedmapbenchmark " + + "[-dataSizePerMap <data size (in mb) per map, default is 128 mb>] " + + "[-numSpillsPerMap <number of spills per map, default is 2>] " + + "[-numMapsPerHost <number of maps per host, default is 1>]"; + + int dataSizePerMap = 128; // in mb + int numSpillsPerMap = 2; + int numMapsPerHost = 1; + JobConf masterConf = new JobConf(getConf()); + + for (int i = 0; i < args.length; i++) { // parse command line + if (args[i].equals("-dataSizePerMap")) { + dataSizePerMap = Integer.parseInt(args[++i]); + } else if (args[i].equals("-numSpillsPerMap")) { + numSpillsPerMap = Integer.parseInt(args[++i]); + } else if (args[i].equals("-numMapsPerHost")) { + numMapsPerHost = Integer.parseInt(args[++i]); + } else { + System.err.println(usage); + System.exit(-1); + } + } + + if (dataSizePerMap < 1 || // verify arguments + numSpillsPerMap < 1 || + numMapsPerHost < 1) + { + System.err.println(usage); + System.exit(-1); + } + + FileSystem fs = null; + try { + // using random-writer to generate the input data + generateInputData(dataSizePerMap, numSpillsPerMap, numMapsPerHost, + masterConf); + + // configure job for sorting + JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class); + job.setJobName("threaded-map-benchmark-unspilled"); + job.setJarByClass(ThreadedMapBenchmark.class); + + job.setInputFormat(NonSplitableSequenceFileInputFormat.class); + job.setOutputFormat(SequenceFileOutputFormat.class); + + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(BytesWritable.class); + + job.setMapperClass(IdentityMapper.class); + job.setReducerClass(IdentityReducer.class); + + job.addInputPath(INPUT_DIR); + job.setOutputPath(OUTPUT_DIR); + + JobClient client = new JobClient(job); + ClusterStatus cluster = client.getClusterStatus(); + job.setNumMapTasks(numMapsPerHost * cluster.getTaskTrackers()); + job.setNumReduceTasks(1); + + // set io.sort.mb to avoid spill + int ioSortMb = (int)Math.ceil(FACTOR * dataSizePerMap); + job.set("io.sort.mb", String.valueOf(ioSortMb)); + fs = FileSystem.get(job); + + LOG.info("Running sort with 1 spill per map"); + long startTime = System.currentTimeMillis(); + JobClient.runJob(job); + long endTime = System.currentTimeMillis(); + + LOG.info("Total time taken : " + String.valueOf(endTime - startTime) + + " millisec"); + fs.delete(OUTPUT_DIR); + + // set io.sort.mb to have multiple spills + JobConf spilledJob = new JobConf(job, ThreadedMapBenchmark.class); + ioSortMb = (int)Math.ceil(FACTOR + * Math.ceil((double)dataSizePerMap + / numSpillsPerMap)); + spilledJob.set("io.sort.mb", String.valueOf(ioSortMb)); + spilledJob.setJobName("threaded-map-benchmark-spilled"); + spilledJob.setJarByClass(ThreadedMapBenchmark.class); + + LOG.info("Running sort with " + numSpillsPerMap + " spills per map"); + startTime = System.currentTimeMillis(); + JobClient.runJob(spilledJob); + endTime = System.currentTimeMillis(); + + LOG.info("Total time taken : " + String.valueOf(endTime - startTime) + + " millisec"); + } finally { + if (fs != null) { + fs.delete(BASE_DIR); + } + } + return 0; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new ThreadedMapBenchmark(), args); + System.exit(res); + } +} Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java?rev=611315&r1=611314&r2=611315&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java Fri Jan 11 14:04:59 2008 @@ -38,6 +38,7 @@ import org.apache.hadoop.io.TestSetFile; import org.apache.hadoop.ipc.TestIPC; import org.apache.hadoop.ipc.TestRPC; +import org.apache.hadoop.mapred.ThreadedMapBenchmark; public class AllTestDriver { @@ -47,6 +48,9 @@ public static void main(String argv[]){ ProgramDriver pgd = new ProgramDriver(); try { + pgd.addClass("threadedmapbench", ThreadedMapBenchmark.class, + "A map/reduce benchmark that compares the performance " + + "of maps with multiple spills over maps with 1 spill"); pgd.addClass("mrbench", MRBench.class, "A map/reduce benchmark that can create many small jobs"); pgd.addClass("nnbench", NNBench.class, "A benchmark that stresses the namenode."); pgd.addClass("mapredtest", TestMapRed.class, "A map/reduce test check.");