Author: omalley Date: Mon Aug 13 22:27:23 2007 New Revision: 565628 URL: http://svn.apache.org/viewvc?view=rev&rev=565628 Log: HADOOP-1698. Fixes performance problems in the map output sorting with many reducers. Contributed by Devaraj Das.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=565628&r1=565627&r2=565628 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Aug 13 22:27:23 2007 @@ -520,6 +520,9 @@ 152. HADOOP-1629. Added a upgrade test for HADOOP-1134. (Raghu Angadi via nigel) +153. HADOOP-1698. Fix performance problems on map output sorting for jobs + with large numbers of reduces. (Devaraj Das via omalley) + Release 0.13.0 - 2007-06-08 1. HADOOP-1047. Fix TestReplication to succeed more reliably. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=565628&r1=565627&r2=565628 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Mon Aug 13 22:27:23 2007 @@ -648,9 +648,10 @@ long lastSyncPos; // position of last sync byte[] sync; // 16 random bytes { - try { // use hash of uid + host + try { MessageDigest digester = MessageDigest.getInstance("MD5"); - digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes()); + long time = System.currentTimeMillis(); + digester.update((new UID()+"@"+time).getBytes()); sync = digester.digest(); } catch (Exception e) { throw new RuntimeException(e); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java?view=diff&rev=565628&r1=565627&r2=565628 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java Mon Aug 13 22:27:23 2007 @@ -50,16 +50,21 @@ //4 for indices into startOffsets array in the //pointers array (ignored the partpointers list itself) static private final int BUFFERED_KEY_VAL_OVERHEAD = 16; + static private final int INITIAL_ARRAY_SIZE = 5; + //we maintain the max lengths of the key/val that we encounter. During + //iteration of the sorted results, we will create a DataOutputBuffer to + //return the keys. The max size of the DataOutputBuffer will be the max + //keylength that we encounter. Expose this value to model memory more + //accurately. + private int maxKeyLength = 0; + private int maxValLength = 0; + //Reference to the Progressable object for sending KeepAlive private Progressable reporter; //Implementation of methods of the SorterBase interface // public void configure(JobConf conf) { - startOffsets = new int[1024]; - keyLengths = new int[1024]; - valueLengths = new int[1024]; - pointers = new int[1024]; comparator = conf.getOutputKeyComparator(); } @@ -70,10 +75,16 @@ public void addKeyValue(int recordOffset, int keyLength, int valLength) { //Add the start offset of the key in the startOffsets array and the //length in the keyLengths array. - if (count == startOffsets.length) + if (startOffsets == null || count == startOffsets.length) grow(); startOffsets[count] = recordOffset; keyLengths[count] = keyLength; + if (keyLength > maxKeyLength) { + maxKeyLength = keyLength; + } + if (valLength > maxValLength) { + maxValLength = valLength; + } valueLengths[count] = valLength; pointers[count] = count; count++; @@ -85,14 +96,30 @@ } public long getMemoryUtilized() { - return (startOffsets.length) * BUFFERED_KEY_VAL_OVERHEAD; + //the total length of the arrays + the max{Key,Val}Length (this will be the + //max size of the DataOutputBuffers during the iteration of the sorted + //keys). + if (startOffsets != null) { + return (startOffsets.length) * BUFFERED_KEY_VAL_OVERHEAD + + maxKeyLength + maxValLength; + } + else { //nothing from this yet + return 0; + } } public abstract RawKeyValueIterator sort(); public void close() { - //just set count to 0; we reuse the arrays + //set count to 0; also, we don't reuse the arrays since we want to maintain + //consistency in the memory model count = 0; + startOffsets = null; + keyLengths = null; + valueLengths = null; + pointers = null; + maxKeyLength = 0; + maxValLength = 0; } //A compare method that references the keyValBuffer through the indirect //pointers @@ -106,7 +133,11 @@ } private void grow() { - int newLength = startOffsets.length * 3/2; + int currLength = 0; + if (startOffsets != null) { + currLength = startOffsets.length; + } + int newLength = (int)(currLength * 1.1) + 1; startOffsets = grow(startOffsets, newLength); keyLengths = grow(keyLengths, newLength); valueLengths = grow(valueLengths, newLength); @@ -115,7 +146,9 @@ private int[] grow(int[] old, int newLength) { int[] result = new int[newLength]; - System.arraycopy(old, 0, result, 0, old.length); + if(old != null) { + System.arraycopy(old, 0, result, 0, old.length); + } return result; } } //BasicTypeSorterBase Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=565628&r1=565627&r2=565628 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Aug 13 22:27:23 2007 @@ -330,6 +330,9 @@ } synchronized (this) { + if (keyValBuffer == null) { + keyValBuffer = new DataOutputBuffer(); + } //dump the key/value to buffer int keyOffset = keyValBuffer.getLength(); key.write(keyValBuffer); @@ -350,7 +353,9 @@ totalMem += sortImpl[i].getMemoryUtilized(); if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) { sortAndSpillToDisk(); - keyValBuffer.reset(); + //we don't reuse the keyValBuffer. We want to maintain consistency + //in the memory model (for negligible performance loss). + keyValBuffer = null; for (int i = 0; i < partitions; i++) { sortImpl[i].close(); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java?view=diff&rev=565628&r1=565627&r2=565628 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java Mon Aug 13 22:27:23 2007 @@ -58,4 +58,12 @@ public int compare (IntWritable i, IntWritable j) { return super.compare(i.get(), j.get()); } + + /** Add the extra memory that will be utilized by the sort method */ + public long getMemoryUtilized() { + //this is memory that will be actually utilized (considering the temp + //array that will be allocated by the sort() method (mergesort)) + return super.getMemoryUtilized() + super.count * 4; + } + }