Author: ddas Date: Fri Dec 14 11:40:41 2007 New Revision: 604275 URL: http://svn.apache.org/viewvc?rev=604275&view=rev Log: HADOOP-1965. Reverted the patch due to the problem reported in HADOOP-2419
Removed: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=604275&r1=604274&r2=604275&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Dec 14 11:40:41 2007 @@ -122,9 +122,6 @@ HADOOP-1898. Release the lock protecting the last time of the last stack dump while the dump is happening. (Amareshwari Sri Ramadasu via omalley) - HADOOP-1965. Makes the sortAndSpill in MapTask a separate thread. - (Amar Kamat via ddas) - HADOOP-1900. Makes the heartbeat and task event queries interval dependent on the cluster size. (Amareshwari Sri Ramadasu via ddas) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=604275&r1=604274&r2=604275&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Dec 14 11:40:41 2007 @@ -255,13 +255,7 @@ private DataOutputBuffer keyValBuffer; //the buffer where key/val will //be stored before they are - //passed on to the pending buffer - private DataOutputBuffer pendingKeyvalBuffer; // the key value buffer used - // while spilling - private IOException sortSpillException; //since sort-spill and collect are - //done concurrently, exceptions are - //passed through shared variables - private final Object sortSpillExceptionLock = new Object(); + //spilled to disk private int maxBufferSize; //the max amount of in-memory space after which //we will spill the keyValBuffer to disk private int numSpills; //maintains the no. of spills to disk done so far @@ -273,7 +267,6 @@ private Class valClass; private WritableComparator comparator; private BufferSorter []sortImpl; - private BufferSorter []pendingSortImpl; // sort impl for the pending buffer private SequenceFile.Writer writer; private FSDataOutputStream out; private FSDataOutputStream indexOut; @@ -283,10 +276,7 @@ this.partitions = job.getNumReduceTasks(); this.partitioner = (Partitioner)ReflectionUtils.newInstance( job.getPartitionerClass(), job); - // using one half the total buffer for collecting key-value pairs and - // the other half for sort-spill thus making the two tasks concurrent - maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024 / 2; - this.sortSpillException = null; + maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024; keyValBuffer = new DataOutputBuffer(); this.job = job; @@ -348,149 +338,94 @@ + value.getClass().getName()); } - // check if the earlier sort-spill generated an exception - synchronized (sortSpillExceptionLock) { - if (sortSpillException != null) { - throw sortSpillException; + synchronized (this) { + if (keyValBuffer == null) { + keyValBuffer = new DataOutputBuffer(); } - } - if (keyValBuffer == null) { - keyValBuffer = new DataOutputBuffer(); - sortImpl = new BufferSorter[partitions]; - for (int i = 0; i < partitions; i++) - sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance( - job.getClass("map.sort.class", - MergeSorter.class, - BufferSorter.class), job); - } - //dump the key/value to buffer - int keyOffset = keyValBuffer.getLength(); - key.write(keyValBuffer); - int keyLength = keyValBuffer.getLength() - keyOffset; - value.write(keyValBuffer); - int valLength = keyValBuffer.getLength() - (keyOffset + keyLength); - int partNumber = partitioner.getPartition(key, value, partitions); - sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength); - reporter.incrCounter(MAP_OUTPUT_RECORDS, 1); - reporter.incrCounter(MAP_OUTPUT_BYTES, - (keyValBuffer.getLength() - keyOffset)); + //dump the key/value to buffer + int keyOffset = keyValBuffer.getLength(); + key.write(keyValBuffer); + int keyLength = keyValBuffer.getLength() - keyOffset; + value.write(keyValBuffer); + int valLength = keyValBuffer.getLength() - (keyOffset + keyLength); + + int partNumber = partitioner.getPartition(key, value, partitions); + sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength); - //now check whether we need to spill to disk - long totalMem = 0; - for (int i = 0; i < partitions; i++) - totalMem += sortImpl[i].getMemoryUtilized(); - totalMem += keyValBuffer.getLength(); - if (totalMem >= maxBufferSize) { - // check if the earlier spill is pending - synchronized (this) { - while (pendingKeyvalBuffer != null) { - try { - wait(); // wait for the pending spill to finish - } catch (InterruptedException ie) { - LOG.warn("Buffer interrupted while waiting for the writer", ie); - } - } - } - // check if the earlier sort-spill thread generated an exception - synchronized (sortSpillExceptionLock) { - if (sortSpillException != null) { - throw sortSpillException; - } - } - // prepare for spilling - synchronized (this) { - pendingKeyvalBuffer = keyValBuffer; - pendingSortImpl = sortImpl; + reporter.incrCounter(MAP_OUTPUT_RECORDS, 1); + reporter.incrCounter(MAP_OUTPUT_BYTES, + (keyValBuffer.getLength() - keyOffset)); + + //now check whether we need to spill to disk + long totalMem = 0; + for (int i = 0; i < partitions; i++) + totalMem += sortImpl[i].getMemoryUtilized(); + if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) { + sortAndSpillToDisk(); + //we don't reuse the keyValBuffer. We want to maintain consistency + //in the memory model (for negligible performance loss). keyValBuffer = null; - sortImpl = null; - } - // Start the sort-spill thread. While the sort and spill takes place - // using the pending variables, the output collector can collect the - // key-value without getting blocked. Thus making key-value collection - // and sort-spill concurrent. - Thread bufferWriter = new Thread() { - public void run() { - sortAndSpillToDisk(); + for (int i = 0; i < partitions; i++) { + sortImpl[i].close(); } - }; - bufferWriter.setDaemon(true); // to make sure that the buffer writer - // gets killed if collector gets killed. - bufferWriter.setName("SortSpillThread"); - bufferWriter.start(); + } } } //sort, combine and spill to disk - private void sortAndSpillToDisk() { - try { + private void sortAndSpillToDisk() throws IOException { + synchronized (this) { //approximate the length of the output file to be the length of the //buffer + header lengths for the partitions - synchronized (this) { - long size = pendingKeyvalBuffer.getLength() - + partitions * APPROX_HEADER_LENGTH; - Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(), - numSpills, size); - //we just create the FSDataOutputStream object here. - out = localFs.create(filename); - Path indexFilename = mapOutputFile.getSpillIndexFileForWrite( - getTaskId(), numSpills, - partitions * 16); - indexOut = localFs.create(indexFilename); - LOG.debug("opened " - + mapOutputFile.getSpillFile(getTaskId(), - numSpills).getName()); - //invoke the sort - for (int i = 0; i < partitions; i++) { - pendingSortImpl[i].setInputBuffer(pendingKeyvalBuffer); - pendingSortImpl[i].setProgressable(reporter); - RawKeyValueIterator rIter = pendingSortImpl[i].sort(); - - startPartition(i); - if (rIter != null) { - //invoke the combiner if one is defined - if (job.getCombinerClass() != null) { - //We instantiate and close the combiner for each partition. - //This is required for streaming where the combiner runs as a - //separate process and we want to make sure that the combiner - //process has got all the input key/val, processed, and output - //the result key/vals before we write the partition header in - //the output file. - Reducer combiner = (Reducer)ReflectionUtils.newInstance( - job.getCombinerClass(), - job); - // make collector - OutputCollector combineCollector = new OutputCollector() { + long size = keyValBuffer.getLength() + + partitions * APPROX_HEADER_LENGTH; + Path filename = mapOutputFile.getSpillFileForWrite(getTaskId(), + numSpills, size); + //we just create the FSDataOutputStream object here. + out = localFs.create(filename); + Path indexFilename = mapOutputFile.getSpillIndexFileForWrite( + getTaskId(), numSpills, partitions * 16); + indexOut = localFs.create(indexFilename); + LOG.debug("opened "+ + mapOutputFile.getSpillFile(getTaskId(), numSpills).getName()); + + //invoke the sort + for (int i = 0; i < partitions; i++) { + sortImpl[i].setInputBuffer(keyValBuffer); + sortImpl[i].setProgressable(reporter); + RawKeyValueIterator rIter = sortImpl[i].sort(); + + startPartition(i); + if (rIter != null) { + //invoke the combiner if one is defined + if (job.getCombinerClass() != null) { + //we instantiate and close the combiner for each partition. This + //is required for streaming where the combiner runs as a separate + //process and we want to make sure that the combiner process has + //got all the input key/val, processed, and output the result + //key/vals before we write the partition header in the output file + Reducer combiner = (Reducer)ReflectionUtils.newInstance( + job.getCombinerClass(), job); + // make collector + OutputCollector combineCollector = new OutputCollector() { public void collect(WritableComparable key, Writable value) - throws IOException { + throws IOException { synchronized (this) { writer.append(key, value); } } }; - combineAndSpill(rIter, combiner, combineCollector); - combiner.close(); - } - else //just spill the sorted data - spill(rIter); + combineAndSpill(rIter, combiner, combineCollector); + combiner.close(); } - endPartition(i); - } - numSpills++; - out.close(); - indexOut.close(); - } - } catch (IOException ioe) { - synchronized (sortSpillExceptionLock) { - sortSpillException = ioe; - } - } finally { // make sure that the collector never waits indefinitely - synchronized (this) { - pendingKeyvalBuffer = null; - for (int i = 0; i < partitions; i++) { - pendingSortImpl[i].close(); + else //just spill the sorted data + spill(rIter); } - this.notify(); + endPartition(i); } + numSpills++; + out.close(); + indexOut.close(); } } @@ -664,35 +599,9 @@ //check whether the length of the key/value buffer is 0. If not, then //we need to spill that to disk. Note that we reset the key/val buffer //upon each spill (so a length > 0 means that we have not spilled yet) - - // check if the earlier spill is pending synchronized (this) { - while (pendingKeyvalBuffer != null) { - try { - wait(); - } catch (InterruptedException ie) { - LOG.info("Buffer interrupted while for the pending spill", ie); - } - } - } - // check if the earlier sort-spill thread generated an exception - synchronized (sortSpillExceptionLock) { - if (sortSpillException != null) { - throw sortSpillException; - } - } - // prepare for next spill - if (keyValBuffer != null && keyValBuffer.getLength() > 0) { - synchronized (this) { - pendingKeyvalBuffer = keyValBuffer; - pendingSortImpl = sortImpl; - } - sortAndSpillToDisk(); - // check if the last sort-spill thread generated an exception - synchronized (sortSpillExceptionLock) { - if (sortSpillException != null) { - throw sortSpillException; - } + if (keyValBuffer != null && keyValBuffer.getLength() > 0) { + sortAndSpillToDisk(); } } mergeParts(); Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java?rev=604275&r1=604274&r2=604275&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/test/AllTestDriver.java Fri Dec 14 11:40:41 2007 @@ -36,7 +36,6 @@ import org.apache.hadoop.io.TestSequenceFile; import org.apache.hadoop.ipc.TestIPC; import org.apache.hadoop.ipc.TestRPC; -import org.apache.hadoop.mapred.ThreadedMapBenchmark; public class AllTestDriver { @@ -46,9 +45,6 @@ public static void main(String argv[]){ ProgramDriver pgd = new ProgramDriver(); try { - pgd.addClass("threadedmapbench", ThreadedMapBenchmark.class, - "A map/reduce benchmark that compares the performance " + - "of maps with multiple spills over maps with 1 spill"); pgd.addClass("mrbench", MRBench.class, "A map/reduce benchmark that can create many small jobs"); pgd.addClass("nnbench", NNBench.class, "A benchmark that stresses the namenode."); pgd.addClass("mapredtest", TestMapRed.class, "A map/reduce test check.");