Author: ddas
Date: Fri Dec 14 11:40:41 2007
New Revision: 604275

URL: http://svn.apache.org/viewvc?rev=604275&view=rev
Log:
HADOOP-1965.  Reverted the patch due to the problem reported in HADOOP-2419

Removed:
    
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=604275&r1=604274&r2=604275&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Dec 14 11:40:41 2007
@@ -122,9 +122,6 @@
     HADOOP-1898.  Release the lock protecting the last time of the last stack
     dump while the dump is happening. (Amareshwari Sri Ramadasu via omalley)
 
-    HADOOP-1965. Makes the sortAndSpill in MapTask a separate thread.
-    (Amar Kamat via ddas)
-
     HADOOP-1900. Makes the heartbeat and task event queries interval 
     dependent on the cluster size.  (Amareshwari Sri Ramadasu via ddas)
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=604275&r1=604274&r2=604275&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Dec 
14 11:40:41 2007
@@ -255,13 +255,7 @@
 
     private DataOutputBuffer keyValBuffer; //the buffer where key/val will
                                            //be stored before they are 
-                                           //passed on to the pending buffer
-    private DataOutputBuffer pendingKeyvalBuffer; // the key value buffer used 
 
-                                                  // while spilling
-    private IOException sortSpillException; //since sort-spill and collect are
-                                            //done concurrently, exceptions are
-                                            //passed through shared variables
-    private final Object sortSpillExceptionLock = new Object();
+                                           //spilled to disk
     private int maxBufferSize; //the max amount of in-memory space after which
                                //we will spill the keyValBuffer to disk
     private int numSpills; //maintains the no. of spills to disk done so far
@@ -273,7 +267,6 @@
     private Class valClass;
     private WritableComparator comparator;
     private BufferSorter []sortImpl;
-    private BufferSorter []pendingSortImpl; // sort impl for the pending buffer
     private SequenceFile.Writer writer;
     private FSDataOutputStream out;
     private FSDataOutputStream indexOut;
@@ -283,10 +276,7 @@
       this.partitions = job.getNumReduceTasks();
       this.partitioner = (Partitioner)ReflectionUtils.newInstance(
                                                                   
job.getPartitionerClass(), job);
-      // using one half the total buffer for collecting key-value pairs and 
-      // the other half for sort-spill thus making the two tasks concurrent
-      maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024 / 2;
-      this.sortSpillException = null;
+      maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024;
       keyValBuffer = new DataOutputBuffer();
 
       this.job = job;
@@ -348,149 +338,94 @@
                               + value.getClass().getName());
       }
       
-      // check if the earlier sort-spill generated an exception
-      synchronized (sortSpillExceptionLock) {
-        if (sortSpillException != null) {
-          throw sortSpillException;
+      synchronized (this) {
+        if (keyValBuffer == null) {
+          keyValBuffer = new DataOutputBuffer();
         }
-      }
-      if (keyValBuffer == null) {
-        keyValBuffer = new DataOutputBuffer();
-        sortImpl = new BufferSorter[partitions];
-        for (int i = 0; i < partitions; i++)
-          sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance(
-                                job.getClass("map.sort.class", 
-                                             MergeSorter.class, 
-                                             BufferSorter.class), job);
-      }
-      //dump the key/value to buffer
-      int keyOffset = keyValBuffer.getLength(); 
-      key.write(keyValBuffer);
-      int keyLength = keyValBuffer.getLength() - keyOffset;
-      value.write(keyValBuffer);
-      int valLength = keyValBuffer.getLength() - (keyOffset + keyLength);
-      int partNumber = partitioner.getPartition(key, value, partitions);
-      sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
-      reporter.incrCounter(MAP_OUTPUT_RECORDS, 1);
-      reporter.incrCounter(MAP_OUTPUT_BYTES, 
-                           (keyValBuffer.getLength() - keyOffset));
+        //dump the key/value to buffer
+        int keyOffset = keyValBuffer.getLength(); 
+        key.write(keyValBuffer);
+        int keyLength = keyValBuffer.getLength() - keyOffset;
+        value.write(keyValBuffer);
+        int valLength = keyValBuffer.getLength() - (keyOffset + keyLength);
+      
+        int partNumber = partitioner.getPartition(key, value, partitions);
+        sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
 
-      //now check whether we need to spill to disk
-      long totalMem = 0;
-      for (int i = 0; i < partitions; i++)
-        totalMem += sortImpl[i].getMemoryUtilized();
-      totalMem += keyValBuffer.getLength();
-      if (totalMem  >= maxBufferSize) {
-        // check if the earlier spill is pending
-        synchronized (this) {
-          while (pendingKeyvalBuffer != null) {
-            try {            
-              wait(); // wait for the pending spill to finish
-            } catch (InterruptedException ie) {
-              LOG.warn("Buffer interrupted while waiting for the writer", ie);
-            }
-          }
-        }
-        // check if the earlier sort-spill thread generated an exception
-        synchronized (sortSpillExceptionLock) {
-          if (sortSpillException != null) {
-            throw sortSpillException;
-          }
-        }
-        // prepare for spilling
-        synchronized (this) {
-          pendingKeyvalBuffer = keyValBuffer;
-          pendingSortImpl = sortImpl;
+        reporter.incrCounter(MAP_OUTPUT_RECORDS, 1);
+        reporter.incrCounter(MAP_OUTPUT_BYTES,
+                             (keyValBuffer.getLength() - keyOffset));
+
+        //now check whether we need to spill to disk
+        long totalMem = 0;
+        for (int i = 0; i < partitions; i++)
+          totalMem += sortImpl[i].getMemoryUtilized();
+        if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
+          sortAndSpillToDisk();
+          //we don't reuse the keyValBuffer. We want to maintain consistency
+          //in the memory model (for negligible performance loss).
           keyValBuffer = null;
-          sortImpl = null;
-        }
-        // Start the sort-spill thread. While the sort and spill takes place 
-        // using the pending variables, the output collector can collect the 
-        // key-value without getting blocked. Thus making key-value collection 
-        // and sort-spill concurrent.
-        Thread bufferWriter = new Thread() {
-          public void run() {
-            sortAndSpillToDisk();
+          for (int i = 0; i < partitions; i++) {
+            sortImpl[i].close();
           }
-        };
-        bufferWriter.setDaemon(true); // to make sure that the buffer writer 
-                                      // gets killed if collector gets killed.
-        bufferWriter.setName("SortSpillThread");
-        bufferWriter.start();
+        }
       }
     }
     
     //sort, combine and spill to disk
-    private void sortAndSpillToDisk() {
-      try {
+    private void sortAndSpillToDisk() throws IOException {
+      synchronized (this) {
         //approximate the length of the output file to be the length of the
         //buffer + header lengths for the partitions
-        synchronized (this) {
-          long size = pendingKeyvalBuffer.getLength() 
-                      + partitions  * APPROX_HEADER_LENGTH;
-          Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(), 
-                                                             numSpills, size);
-          //we just create the FSDataOutputStream object here.
-          out = localFs.create(filename);
-          Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-                                       getTaskId(), numSpills, 
-                                       partitions * 16);
-          indexOut = localFs.create(indexFilename);
-          LOG.debug("opened " 
-                    + mapOutputFile.getSpillFile(getTaskId(), 
-                                                 numSpills).getName());
-          //invoke the sort
-          for (int i = 0; i < partitions; i++) {
-            pendingSortImpl[i].setInputBuffer(pendingKeyvalBuffer);
-            pendingSortImpl[i].setProgressable(reporter);
-            RawKeyValueIterator rIter = pendingSortImpl[i].sort();
-                                               
-            startPartition(i);
-            if (rIter != null) {
-              //invoke the combiner if one is defined
-              if (job.getCombinerClass() != null) {
-                //We instantiate and close the combiner for each partition. 
-                //This is required for streaming where the combiner runs as a 
-                //separate process and we want to make sure that the combiner 
-                //process has got all the input key/val, processed, and output 
-                //the result key/vals before we write the partition header in 
-                //the output file.
-                Reducer combiner = (Reducer)ReflectionUtils.newInstance(
-                                                    job.getCombinerClass(), 
-                                                    job);
-                // make collector
-                OutputCollector combineCollector = new OutputCollector() {
+        long size = keyValBuffer.getLength() + 
+                    partitions * APPROX_HEADER_LENGTH;
+        Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(), 
+                                      numSpills, size);
+        //we just create the FSDataOutputStream object here.
+        out = localFs.create(filename);
+        Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
+                             getTaskId(), numSpills, partitions * 16);
+        indexOut = localFs.create(indexFilename);
+        LOG.debug("opened "+
+                  mapOutputFile.getSpillFile(getTaskId(), 
numSpills).getName());
+          
+        //invoke the sort
+        for (int i = 0; i < partitions; i++) {
+          sortImpl[i].setInputBuffer(keyValBuffer);
+          sortImpl[i].setProgressable(reporter);
+          RawKeyValueIterator rIter = sortImpl[i].sort();
+          
+          startPartition(i);
+          if (rIter != null) {
+            //invoke the combiner if one is defined
+            if (job.getCombinerClass() != null) {
+              //we instantiate and close the combiner for each partition. This
+              //is required for streaming where the combiner runs as a separate
+              //process and we want to make sure that the combiner process has
+              //got all the input key/val, processed, and output the result 
+              //key/vals before we write the partition header in the output 
file
+              Reducer combiner = (Reducer)ReflectionUtils.newInstance(
+                                                                      
job.getCombinerClass(), job);
+              // make collector
+              OutputCollector combineCollector = new OutputCollector() {
                   public void collect(WritableComparable key, Writable value)
-                  throws IOException {
+                    throws IOException {
                     synchronized (this) {
                       writer.append(key, value);
                     }
                   }
                 };
-                combineAndSpill(rIter, combiner, combineCollector);
-                combiner.close();
-              }
-              else //just spill the sorted data
-                spill(rIter);
+              combineAndSpill(rIter, combiner, combineCollector);
+              combiner.close();
             }
-            endPartition(i);
-          }
-          numSpills++;
-          out.close();
-          indexOut.close();
-        }
-      } catch (IOException ioe) {
-        synchronized (sortSpillExceptionLock) {
-          sortSpillException = ioe;
-        }
-      } finally { // make sure that the collector never waits indefinitely
-        synchronized (this) {
-          pendingKeyvalBuffer = null;
-          for (int i = 0; i < partitions; i++) {
-            pendingSortImpl[i].close();
+            else //just spill the sorted data
+              spill(rIter);
           }
-          this.notify();
+          endPartition(i);
         }
+        numSpills++;
+        out.close();
+        indexOut.close();
       }
     }
     
@@ -664,35 +599,9 @@
       //check whether the length of the key/value buffer is 0. If not, then
       //we need to spill that to disk. Note that we reset the key/val buffer
       //upon each spill (so a length > 0 means that we have not spilled yet)
-      
-      // check if the earlier spill is pending
       synchronized (this) {
-        while (pendingKeyvalBuffer != null) {
-          try {
-            wait();
-          } catch (InterruptedException ie) {
-            LOG.info("Buffer interrupted while for the pending spill", ie);
-          }
-        }
-      }
-      // check if the earlier sort-spill thread generated an exception
-      synchronized (sortSpillExceptionLock) {
-        if (sortSpillException != null) {
-          throw sortSpillException;
-        }
-      }
-      // prepare for next spill
-      if (keyValBuffer != null && keyValBuffer.getLength() > 0) {
-        synchronized (this) {
-          pendingKeyvalBuffer = keyValBuffer;
-          pendingSortImpl = sortImpl;
-        }
-        sortAndSpillToDisk();
-        // check if the last sort-spill thread generated an exception
-        synchronized (sortSpillExceptionLock) {
-          if (sortSpillException != null) {
-            throw sortSpillException;
-          }
+        if (keyValBuffer != null && keyValBuffer.getLength() > 0) {
+          sortAndSpillToDisk();
         }
       }
       mergeParts();

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java?rev=604275&r1=604274&r2=604275&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java 
(original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java Fri 
Dec 14 11:40:41 2007
@@ -36,7 +36,6 @@
 import org.apache.hadoop.io.TestSequenceFile;
 import org.apache.hadoop.ipc.TestIPC;
 import org.apache.hadoop.ipc.TestRPC;
-import org.apache.hadoop.mapred.ThreadedMapBenchmark;
 
 public class AllTestDriver {
   
@@ -46,9 +45,6 @@
   public static void main(String argv[]){
     ProgramDriver pgd = new ProgramDriver();
     try {
-      pgd.addClass("threadedmapbench", ThreadedMapBenchmark.class, 
-                   "A map/reduce benchmark that compares the performance " + 
-                   "of maps with multiple spills over maps with 1 spill");
       pgd.addClass("mrbench", MRBench.class, "A map/reduce benchmark that can 
create many small jobs");
       pgd.addClass("nnbench", NNBench.class, "A benchmark that stresses the 
namenode.");
       pgd.addClass("mapredtest", TestMapRed.class, "A map/reduce test check.");


Reply via email to