Author: acmurthy
Date: Fri Jan 11 14:04:59 2008
New Revision: 611315

URL: http://svn.apache.org/viewvc?rev=611315&view=rev
Log:
HADOOP-1965. Interleave sort/spill in teh map-task along with calls to the 
Mapper.map method. This is done by splitting the 'io.sort.mb' buffer into two 
and using one half for collecting map-outputs and the other half for 
sort/spill. Contributed by Amar Kamat.

Added:
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestCollect.java
    
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=611315&r1=611314&r2=611315&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jan 11 14:04:59 2008
@@ -202,6 +202,10 @@
       add mapred.reduce.tasks.speculative.execution
     (Amareshwari Sri Ramadasu via acmurthy) 
       
+    HADOOP-1965. Interleave sort/spill in teh map-task along with calls to the
+    Mapper.map method. This is done by splitting the 'io.sort.mb' buffer into
+    two and using one half for collecting map-outputs and the other half for
+    sort/spill. (Amar Kamat via acmurthy)
 
   OPTIMIZATIONS
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=611315&r1=611314&r2=611315&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Jan 
11 14:04:59 2008
@@ -270,7 +270,14 @@
 
     private DataOutputBuffer keyValBuffer; //the buffer where key/val will
                                            //be stored before they are 
-                                           //spilled to disk
+                                           //passed on to the pending buffer
+    private DataOutputBuffer pendingKeyvalBuffer; // the key value buffer used
+                                                  // while spilling
+    // a lock used for sync sort-spill with collect
+    private final Object pendingKeyvalBufferLock = new Object();
+    // since sort-spill and collect are done concurrently, exceptions are 
+    // passed through shared variable
+    private volatile IOException sortSpillException; 
     private int maxBufferSize; //the max amount of in-memory space after which
                                //we will spill the keyValBuffer to disk
     private int numSpills; //maintains the no. of spills to disk done so far
@@ -282,6 +289,7 @@
     private Class valClass;
     private WritableComparator comparator;
     private BufferSorter []sortImpl;
+    private BufferSorter []pendingSortImpl; // sort impl for the pending buffer
     private SequenceFile.Writer writer;
     private FSDataOutputStream out;
     private FSDataOutputStream indexOut;
@@ -296,7 +304,8 @@
       this.partitions = job.getNumReduceTasks();
       this.partitioner = (Partitioner)ReflectionUtils.newInstance(
                                                                   
job.getPartitionerClass(), job);
-      maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024;
+      maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024 / 2;
+      this.sortSpillException = null;
       keyValBuffer = new DataOutputBuffer();
 
       this.job = job;
@@ -348,8 +357,8 @@
     }
     
     @SuppressWarnings("unchecked")
-    public void collect(WritableComparable key,
-                        Writable value) throws IOException {
+    public synchronized void collect(WritableComparable key,
+                                     Writable value) throws IOException {
       
       if (key.getClass() != keyClass) {
         throw new IOException("Type mismatch in key from map: expected "
@@ -362,45 +371,88 @@
                               + value.getClass().getName());
       }
       
-      synchronized (this) {
-        if (keyValBuffer == null) {
-          keyValBuffer = new DataOutputBuffer();
-        }
-        //dump the key/value to buffer
-        int keyOffset = keyValBuffer.getLength(); 
-        key.write(keyValBuffer);
-        int keyLength = keyValBuffer.getLength() - keyOffset;
-        value.write(keyValBuffer);
-        int valLength = keyValBuffer.getLength() - (keyOffset + keyLength);
+      // check if the earlier sort-spill generated an exception
+      if (sortSpillException != null) {
+        throw sortSpillException;
+      }
       
-        int partNumber = partitioner.getPartition(key, value, partitions);
-        sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
+      if (keyValBuffer == null) {
+        keyValBuffer = new DataOutputBuffer();
+        sortImpl = new BufferSorter[partitions];
+        for (int i = 0; i < partitions; i++)
+          sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance(
+                            job.getClass("map.sort.class", MergeSorter.class, 
+                                         BufferSorter.class), job);
+      }
+      
+      //dump the key/value to buffer
+      int keyOffset = keyValBuffer.getLength(); 
+      key.write(keyValBuffer);
+      int keyLength = keyValBuffer.getLength() - keyOffset;
+      value.write(keyValBuffer);
+      int valLength = keyValBuffer.getLength() - (keyOffset + keyLength);
+      int partNumber = partitioner.getPartition(key, value, partitions);
+      sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
 
-        mapOutputRecordCounter.increment(1);
-        mapOutputByteCounter.increment(keyValBuffer.getLength() - keyOffset);
+      mapOutputRecordCounter.increment(1);
+      mapOutputByteCounter.increment(keyValBuffer.getLength() - keyOffset);
 
-        //now check whether we need to spill to disk
-        long totalMem = 0;
-        for (int i = 0; i < partitions; i++)
-          totalMem += sortImpl[i].getMemoryUtilized();
-        if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
-          sortAndSpillToDisk();
-          //we don't reuse the keyValBuffer. We want to maintain consistency
-          //in the memory model (for negligible performance loss).
-          keyValBuffer = null;
-          for (int i = 0; i < partitions; i++) {
-            sortImpl[i].close();
+      //now check whether we need to spill to disk
+      long totalMem = 0;
+      for (int i = 0; i < partitions; i++)
+        totalMem += sortImpl[i].getMemoryUtilized();
+      totalMem += keyValBuffer.getLength();
+      if (totalMem  >= maxBufferSize) {
+        // check if the earlier spill is pending
+        synchronized (pendingKeyvalBufferLock) {
+          // check if the spill is over, there could be a case where the 
+          // sort-spill is yet to start and collect acquired the lock
+          while (pendingKeyvalBuffer != null) {
+            try {
+              // indicate that we are making progress
+              this.reporter.progress();
+              pendingKeyvalBufferLock.wait(); // wait for the pending spill to
+                                              // start and finish sort-spill
+            } catch (InterruptedException ie) {
+              LOG.warn("Buffer interrupted while waiting for the writer", ie);
+            }
           }
+          // prepare for spilling
+          pendingKeyvalBuffer = keyValBuffer;
+          pendingSortImpl = sortImpl;
+          keyValBuffer = null;
+          sortImpl = null;
         }
+
+        // check if the earlier sort-spill thread generated an exception
+        if (sortSpillException != null) {
+          throw sortSpillException;
+        }
+        
+        // Start the sort-spill thread. While the sort and spill takes place 
+        // using the pending variables, the output collector can collect the 
+        // key-value without getting blocked. Thus making key-value collection 
+        // and sort-spill concurrent.
+        Thread bufferWriter = new Thread() {
+          public void run() {
+            synchronized (pendingKeyvalBufferLock) {
+              sortAndSpillToDisk();
+            }
+          }
+        };
+        bufferWriter.setDaemon(true); // to make sure that the buffer writer 
+                                      // gets killed if collector is killed.
+        bufferWriter.setName("SortSpillThread");
+        bufferWriter.start();
       }
     }
     
     //sort, combine and spill to disk
-    private void sortAndSpillToDisk() throws IOException {
-      synchronized (this) {
+    private void sortAndSpillToDisk() {
+      try {
         //approximate the length of the output file to be the length of the
         //buffer + header lengths for the partitions
-        long size = keyValBuffer.getLength() + 
+        long size = pendingKeyvalBuffer.getLength() + 
                     partitions * APPROX_HEADER_LENGTH;
         Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(), 
                                       numSpills, size);
@@ -414,9 +466,9 @@
           
         //invoke the sort
         for (int i = 0; i < partitions; i++) {
-          sortImpl[i].setInputBuffer(keyValBuffer);
-          sortImpl[i].setProgressable(reporter);
-          RawKeyValueIterator rIter = sortImpl[i].sort();
+          pendingSortImpl[i].setInputBuffer(pendingKeyvalBuffer);
+          pendingSortImpl[i].setProgressable(reporter);
+          RawKeyValueIterator rIter = pendingSortImpl[i].sort();
           
           startPartition(i);
           if (rIter != null) {
@@ -449,6 +501,14 @@
         numSpills++;
         out.close();
         indexOut.close();
+      } catch (IOException ioe) {
+        sortSpillException = ioe;
+      } finally { // make sure that the collector never waits indefinitely
+        pendingKeyvalBuffer = null;
+        for (int i = 0; i < partitions; i++) {
+          pendingSortImpl[i].close();
+        }
+        pendingKeyvalBufferLock.notify();
       }
     }
     
@@ -617,15 +677,46 @@
       }
     }
 
-    public void flush() throws IOException 
+    public synchronized void flush() throws IOException 
     {
       //check whether the length of the key/value buffer is 0. If not, then
       //we need to spill that to disk. Note that we reset the key/val buffer
       //upon each spill (so a length > 0 means that we have not spilled yet)
-      synchronized (this) {
-        if (keyValBuffer != null && keyValBuffer.getLength() > 0) {
+      
+      // check if the earlier spill is pending
+      synchronized (pendingKeyvalBufferLock) {
+        // this could mean that either the sort-spill is over or is yet to 
+        // start so make sure that the earlier sort-spill is over.
+        while (pendingKeyvalBuffer != null) {
+          try {
+            // indicate that we are making progress
+            this.reporter.progress();
+            pendingKeyvalBufferLock.wait();
+          } catch (InterruptedException ie) {
+            LOG.info("Buffer interrupted while for the pending spill", ie);
+          }
+        }
+      }
+      
+      // check if the earlier sort-spill thread generated an exception
+      if (sortSpillException != null) {
+        throw sortSpillException;
+      }
+      
+      if (keyValBuffer != null && keyValBuffer.getLength() > 0) {
+        // prepare for next spill
+        synchronized (pendingKeyvalBufferLock) {
+          pendingKeyvalBuffer = keyValBuffer;
+          pendingSortImpl = sortImpl;
+          keyValBuffer = null;
+          sortImpl = null;
           sortAndSpillToDisk();
         }
+      }  
+      
+      // check if the last sort-spill thread generated an exception
+      if (sortSpillException != null) {
+        throw sortSpillException;
       }
       mergeParts();
     }

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestCollect.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestCollect.java?rev=611315&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestCollect.java 
(added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestCollect.java Fri 
Jan 11 14:04:59 2008
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.ThreadedMapBenchmark.RandomInputFormat;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+
+/** 
+ * TestCollect checks if the collect can handle simultaneous invocations.
+ */
+public class TestCollect extends TestCase 
+{
+  final static Path OUTPUT_DIR = new Path("build/test/test.collect.output");
+  static final int NUM_FEEDERS = 10;
+  static final int NUM_COLLECTS_PER_THREAD = 1000;
+  
+  /** 
+   * Map is a Mapper that spawns threads which simultaneously call collect. 
+   * Each thread has a specific range to write to the buffer and is unique to 
+   * the thread. This is a synchronization test for the map's collect.
+   */
+   
+  static class Map
+    implements Mapper<Text, Text, IntWritable, IntWritable> {
+    
+    public void configure(JobConf job) {
+    }
+    
+    public void map(Text key, Text val,
+                    final OutputCollector<IntWritable, IntWritable> out,
+                    Reporter reporter) throws IOException {
+      // Class for calling collect in separate threads
+      class CollectFeeder extends Thread {
+        int id; // id for the thread
+        
+        public CollectFeeder(int id) {
+          this.id = id;
+        }
+        
+        public void run() {
+          for (int j = 1; j <= NUM_COLLECTS_PER_THREAD; j++) {
+            try {
+              out.collect(new IntWritable((id * NUM_COLLECTS_PER_THREAD) + j), 
+                                          new IntWritable(0));
+            } catch (IOException ioe) { }
+          }
+        }
+      }
+      
+      CollectFeeder [] feeders = new CollectFeeder[NUM_FEEDERS];
+      
+      // start the feeders
+      for (int i = 0; i < NUM_FEEDERS; i++) {
+        feeders[i] = new CollectFeeder(i);
+        feeders[i].start();
+      }
+      // wait for them to finish
+      for (int i = 0; i < NUM_FEEDERS; i++) {
+        try {
+          feeders[i].join();
+        } catch (InterruptedException ie) {
+          throw new IOException(ie.toString());
+        }
+      }
+    }
+    
+    public void close() {
+    }
+  }
+  
+  static class Reduce
+  implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+  
+    static int numSeen;
+    static int actualSum;
+    public void configure(JobConf job) { }
+
+    public void reduce(IntWritable key, Iterator<IntWritable> val,
+                       OutputCollector<IntWritable, IntWritable> out,
+                       Reporter reporter) throws IOException {
+      actualSum += key.get(); // keep the running count of the seen values
+      numSeen++; // number of values seen so far
+      
+      // using '1+2+3+...n =  n*(n+1)/2' to validate
+      int expectedSum = numSeen * (numSeen + 1) / 2;
+      if (expectedSum != actualSum) {
+        throw new IOException("Collect test failed!! Ordering mismatch.");
+      }
+    }
+
+    public void close() { }
+  }
+  
+  public void configure(JobConf conf) throws IOException {
+    conf.setJobName("TestCollect");
+    conf.setJarByClass(TestCollect.class);
+    
+    conf.setInputFormat(RandomInputFormat.class); // for self data generation
+    conf.setOutputKeyClass(IntWritable.class);
+    conf.setOutputValueClass(IntWritable.class);
+    conf.setOutputPath(OUTPUT_DIR);
+    
+    conf.setMapperClass(Map.class);
+    conf.setReducerClass(Reduce.class);
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+  }
+  
+  public void testCollect() throws IOException {
+    JobConf conf = new JobConf();
+    configure(conf);
+    try {
+      JobClient.runJob(conf);
+      // check if all the values were seen by the reducer
+      if (Reduce.numSeen != (NUM_COLLECTS_PER_THREAD * NUM_FEEDERS)) {
+        throw new IOException("Collect test failed!! Total does not match.");
+      }
+    } catch (IOException ioe) {
+      throw ioe;
+    } finally {
+      FileSystem fs = FileSystem.get(conf);
+      fs.delete(OUTPUT_DIR);
+    }
+  }
+  
+  public static void main(String[] args) throws IOException {
+    new TestCollect().testCollect();
+  }
+}
+

Added: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java?rev=611315&view=auto
==============================================================================
--- 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java 
(added)
+++ 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java 
Fri Jan 11 14:04:59 2008
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.File;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.examples.RandomWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import 
org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Distributed threaded map benchmark.
+ * <p>
+ * This benchmark generates random data per map and tests the performance 
+ * of having multiple spills (using multiple threads) over having just one 
+ * spill. Following are the parameters that can be specified
+ * <li>File size per map.
+ * <li>Number of spills per map. 
+ * <li>Number of maps per host.
+ * <p>
+ * Sort is used for benchmarking the performance. 
+ */
+
+public class ThreadedMapBenchmark extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(ThreadedMapBenchmark.class);
+  private static Path BASE_DIR =
+    new Path(System.getProperty("test.build.data", 
+                                File.separator + "benchmarks" + File.separator 
+                                + "ThreadedMapBenchmark"));
+  private static Path INPUT_DIR = new Path(BASE_DIR, "input");
+  private static Path OUTPUT_DIR = new Path(BASE_DIR, "output");
+  private static final float FACTOR = 2.3f; // io.sort.mb set to 
+                                            // (FACTOR * data_size) should 
+                                            // result in only 1 spill
+
+  static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
+  
+  /**
+   * A custom input format that creates virtual inputs of a single string
+   * for each map. Using [EMAIL PROTECTED] RandomWriter} code. 
+   */
+  public static class RandomInputFormat implements InputFormat<Text, Text> {
+    
+    public void validateInput(JobConf job) throws IOException {
+    }
+
+    public InputSplit[] getSplits(JobConf job, 
+                                  int numSplits) throws IOException {
+      InputSplit[] result = new InputSplit[numSplits];
+      Path outDir = job.getOutputPath();
+      for(int i=0; i < result.length; ++i) {
+        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
+                                  job);
+      }
+      return result;
+    }
+
+    static class RandomRecordReader implements RecordReader<Text, Text> {
+      Path name;
+      public RandomRecordReader(Path p) {
+        name = p;
+      }
+      public boolean next(Text key, Text value) {
+        if (name != null) {
+          key.set(name.getName());
+          name = null;
+          return true;
+        }
+        return false;
+      }
+      public Text createKey() {
+        return new Text();
+      }
+      public Text createValue() {
+        return new Text();
+      }
+      public long getPos() {
+        return 0;
+      }
+      public void close() {}
+      public float getProgress() {
+        return 0.0f;
+      }
+    }
+
+    public RecordReader<Text, Text> getRecordReader(InputSplit split,
+                                                    JobConf job, 
+                                                    Reporter reporter) 
+    throws IOException {
+      return new RandomRecordReader(((FileSplit) split).getPath());
+    }
+  }
+
+  /**
+   * Generates random input data of given size with keys and values of given 
+   * sizes. By default it generates 128mb input data with 10 byte keys and 10 
+   * byte values.
+   */
+  public static class Map extends MapReduceBase
+  implements Mapper<WritableComparable, Writable,
+                    BytesWritable, BytesWritable> {
+  
+  private long numBytesToWrite;
+  private int minKeySize;
+  private int keySizeRange;
+  private int minValueSize;
+  private int valueSizeRange;
+  private Random random = new Random();
+  private BytesWritable randomKey = new BytesWritable();
+  private BytesWritable randomValue = new BytesWritable();
+  
+  private void randomizeBytes(byte[] data, int offset, int length) {
+    for(int i = offset + length - 1; i >= offset; --i) {
+      data[i] = (byte) random.nextInt(256);
+    }
+  }
+  
+  public void map(WritableComparable key, 
+                  Writable value,
+                  OutputCollector<BytesWritable, BytesWritable> output, 
+                  Reporter reporter) throws IOException {
+    int itemCount = 0;
+    while (numBytesToWrite > 0) {
+      int keyLength = minKeySize 
+                      + (keySizeRange != 0 
+                         ? random.nextInt(keySizeRange) 
+                         : 0);
+      randomKey.setSize(keyLength);
+      randomizeBytes(randomKey.get(), 0, randomKey.getSize());
+      int valueLength = minValueSize 
+                        + (valueSizeRange != 0 
+                           ? random.nextInt(valueSizeRange) 
+                           : 0);
+      randomValue.setSize(valueLength);
+      randomizeBytes(randomValue.get(), 0, randomValue.getSize());
+      output.collect(randomKey, randomValue);
+      numBytesToWrite -= keyLength + valueLength;
+      reporter.incrCounter(Counters.BYTES_WRITTEN, 1);
+      reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
+      if (++itemCount % 200 == 0) {
+        reporter.setStatus("wrote record " + itemCount + ". " 
+                           + numBytesToWrite + " bytes left.");
+      }
+    }
+    reporter.setStatus("done with " + itemCount + " records.");
+  }
+  
+  @Override
+  public void configure(JobConf job) {
+    numBytesToWrite = job.getLong("test.tmb.bytes_per_map",
+                                  128 * 1024 * 1024);
+    minKeySize = job.getInt("test.tmb.min_key", 10);
+    keySizeRange = job.getInt("test.tmb.max_key", 10) - minKeySize;
+    minValueSize = job.getInt("test.tmb.min_value", 10);
+    valueSizeRange = job.getInt("test.tmb.max_value", 10) - minValueSize;
+  }
+}
+
+  /**
+   * Generate input data for the benchmark
+   */
+  public static void generateInputData(int dataSizePerMap, 
+                                       int numSpillsPerMap, 
+                                       int numMapsPerHost, 
+                                       JobConf masterConf) 
+  throws Exception { 
+    JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class);
+    job.setJobName("threaded-map-benchmark-random-writer");
+    job.setJarByClass(ThreadedMapBenchmark.class);
+    job.setInputFormat(RandomInputFormat.class);
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+    
+    job.setMapperClass(Map.class);
+    job.setReducerClass(IdentityReducer.class);
+    
+    job.setOutputKeyClass(BytesWritable.class);
+    job.setOutputValueClass(BytesWritable.class);
+    
+    JobClient client = new JobClient(job);
+    ClusterStatus cluster = client.getClusterStatus();
+    long totalDataSize = dataSizePerMap * numMapsPerHost 
+                         * cluster.getTaskTrackers();
+    job.set("test.tmb.bytes_per_map", 
+            String.valueOf(dataSizePerMap * 1024 * 1024));
+    job.setNumReduceTasks(0); // none reduce
+    job.setNumMapTasks(numMapsPerHost * cluster.getTaskTrackers());
+    job.setOutputPath(INPUT_DIR);
+    
+    FileSystem fs = FileSystem.get(job);
+    fs.delete(BASE_DIR);
+    
+    LOG.info("Generating random input for the benchmark");
+    LOG.info("Total data : " + totalDataSize + " mb");
+    LOG.info("Data per map: " + dataSizePerMap + " mb");
+    LOG.info("Number of spills : " + numSpillsPerMap);
+    LOG.info("Number of maps per host : " + numMapsPerHost);
+    LOG.info("Number of hosts : " + cluster.getTaskTrackers());
+    
+    JobClient.runJob(job); // generates the input for the benchmark
+  }
+
+  /**
+   * This is the main routine for launching the benchmark. It generates random 
+   * input data. The input is non-splittable. Sort is used for benchmarking. 
+   * This benchmark reports the effect of having multiple sort and spill 
+   * cycles over a single sort and spill. 
+   * 
+   * @throws IOException 
+   */
+  public int run (String[] args) throws Exception {
+    LOG.info("Starting the benchmark for threaded spills");
+    String version = "ThreadedMapBenchmark.0.0.1";
+    System.out.println(version);
+    
+    String usage = 
+      "Usage: threadedmapbenchmark " +
+      "[-dataSizePerMap <data size (in mb) per map, default is 128 mb>] " + 
+      "[-numSpillsPerMap <number of spills per map, default is 2>] " +
+      "[-numMapsPerHost <number of maps per host, default is 1>]";
+    
+    int dataSizePerMap = 128; // in mb
+    int numSpillsPerMap = 2;
+    int numMapsPerHost = 1;
+    JobConf masterConf = new JobConf(getConf());
+    
+    for (int i = 0; i < args.length; i++) { // parse command line
+      if (args[i].equals("-dataSizePerMap")) {
+        dataSizePerMap = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-numSpillsPerMap")) {
+        numSpillsPerMap = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-numMapsPerHost")) {
+        numMapsPerHost = Integer.parseInt(args[++i]);
+      } else {
+        System.err.println(usage);
+        System.exit(-1);
+      }
+    }
+    
+    if (dataSizePerMap <  1 ||  // verify arguments
+        numSpillsPerMap < 1 ||
+        numMapsPerHost < 1)
+      {
+        System.err.println(usage);
+        System.exit(-1);
+      }
+    
+    FileSystem fs = null;
+    try {
+      // using random-writer to generate the input data
+      generateInputData(dataSizePerMap, numSpillsPerMap, numMapsPerHost, 
+                        masterConf);
+      
+      // configure job for sorting
+      JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class);
+      job.setJobName("threaded-map-benchmark-unspilled");
+      job.setJarByClass(ThreadedMapBenchmark.class);
+
+      job.setInputFormat(NonSplitableSequenceFileInputFormat.class);
+      job.setOutputFormat(SequenceFileOutputFormat.class);
+      
+      job.setOutputKeyClass(BytesWritable.class);
+      job.setOutputValueClass(BytesWritable.class);
+      
+      job.setMapperClass(IdentityMapper.class);        
+      job.setReducerClass(IdentityReducer.class);
+      
+      job.addInputPath(INPUT_DIR);
+      job.setOutputPath(OUTPUT_DIR);
+      
+      JobClient client = new JobClient(job);
+      ClusterStatus cluster = client.getClusterStatus();
+      job.setNumMapTasks(numMapsPerHost * cluster.getTaskTrackers());
+      job.setNumReduceTasks(1);
+      
+      // set io.sort.mb to avoid spill
+      int ioSortMb = (int)Math.ceil(FACTOR * dataSizePerMap);
+      job.set("io.sort.mb", String.valueOf(ioSortMb));
+      fs = FileSystem.get(job);
+      
+      LOG.info("Running sort with 1 spill per map");
+      long startTime = System.currentTimeMillis();
+      JobClient.runJob(job);
+      long endTime = System.currentTimeMillis();
+      
+      LOG.info("Total time taken : " + String.valueOf(endTime - startTime) 
+               + " millisec");
+      fs.delete(OUTPUT_DIR);
+      
+      // set io.sort.mb to have multiple spills
+      JobConf spilledJob = new JobConf(job, ThreadedMapBenchmark.class);
+      ioSortMb = (int)Math.ceil(FACTOR 
+                                * Math.ceil((double)dataSizePerMap 
+                                            / numSpillsPerMap));
+      spilledJob.set("io.sort.mb", String.valueOf(ioSortMb));
+      spilledJob.setJobName("threaded-map-benchmark-spilled");
+      spilledJob.setJarByClass(ThreadedMapBenchmark.class);
+      
+      LOG.info("Running sort with " + numSpillsPerMap + " spills per map");
+      startTime = System.currentTimeMillis();
+      JobClient.runJob(spilledJob);
+      endTime = System.currentTimeMillis();
+      
+      LOG.info("Total time taken : " + String.valueOf(endTime - startTime) 
+               + " millisec");
+    } finally {
+      if (fs != null) {
+        fs.delete(BASE_DIR);
+      }
+    }
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new ThreadedMapBenchmark(), args);
+    System.exit(res);
+  }
+}

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java?rev=611315&r1=611314&r2=611315&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java 
(original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java Fri 
Jan 11 14:04:59 2008
@@ -38,6 +38,7 @@
 import org.apache.hadoop.io.TestSetFile;
 import org.apache.hadoop.ipc.TestIPC;
 import org.apache.hadoop.ipc.TestRPC;
+import org.apache.hadoop.mapred.ThreadedMapBenchmark;
 
 public class AllTestDriver {
   
@@ -47,6 +48,9 @@
   public static void main(String argv[]){
     ProgramDriver pgd = new ProgramDriver();
     try {
+      pgd.addClass("threadedmapbench", ThreadedMapBenchmark.class, 
+                   "A map/reduce benchmark that compares the performance " + 
+                   "of maps with multiple spills over maps with 1 spill");
       pgd.addClass("mrbench", MRBench.class, "A map/reduce benchmark that can 
create many small jobs");
       pgd.addClass("nnbench", NNBench.class, "A benchmark that stresses the 
namenode.");
       pgd.addClass("mapredtest", TestMapRed.class, "A map/reduce test check.");


Reply via email to