Author: cutting Date: Mon Feb 26 12:35:57 2007 New Revision: 511993 URL: http://svn.apache.org/viewvc?view=rev&rev=511993 Log: HADOOP-1027. Fix problems with in-memory merging during shuffle and re-enable this optimization. Contributed by Devaraj.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/hadoop-default.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=511993&r1=511992&r2=511993 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Feb 26 12:35:57 2007 @@ -141,6 +141,9 @@ 41. HADOOP-1040. Update RandomWriter example to use counters and user-defined input and output formats. (omalley via cutting) +42. HADOOP-1027. Fix problems with in-memory merging during shuffle + and re-enable this optimization. (Devaraj Das via cutting) + Release 0.11.2 - 2007-02-16 Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=511993&r1=511992&r2=511993 ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Mon Feb 26 12:35:57 2007 @@ -132,7 +132,7 @@ <property> <name>fs.inmemory.size.mb</name> - <value>0</value> + <value>75</value> <description>The size of the in-memory filsystem instance in MB</description> </property> @@ -556,6 +556,17 @@ /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: -Xmx1024m -verbose:gc -Xloggc:/tmp/@[EMAIL PROTECTED] + </description> +</property> + +<property> + <name>mapred.inmem.merge.threshold</name> + <value>1000</value> + <description>The threshold, in terms of the number of files + for the in-memory merge process. When we accumulate threshold number of files + we initiate the in-memory merge and spill to disk. A value of 0 or less than + 0 indicates we want to DON'T have any threshold and instead depend only on + the ramfs's memory consumption to trigger the merge. </description> </property> Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java?view=diff&rev=511993&r1=511992&r2=511993 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java Mon Feb 26 12:35:57 2007 @@ -98,10 +98,13 @@ private FileAttributes fAttr; public InMemoryInputStream(Path f) throws IOException { - fAttr = pathToFileAttribs.get(getPath(f)); - if (fAttr == null) throw new FileNotFoundException("File " + f + - " does not exist"); - din.reset(fAttr.data, 0, fAttr.size); + synchronized (InMemoryFileSystem.this) { + fAttr = pathToFileAttribs.get(getPath(f)); + if (fAttr == null) { + throw new FileNotFoundException("File " + f + " does not exist"); + } + din.reset(fAttr.data, 0, fAttr.size); + } } public long getPos() throws IOException { @@ -214,12 +217,16 @@ public void close() throws IOException { super.close(); - if (pathToFileAttribs != null) - pathToFileAttribs.clear(); - pathToFileAttribs = null; - if (tempFileAttribs != null) - tempFileAttribs.clear(); - tempFileAttribs = null; + synchronized (this) { + if (pathToFileAttribs != null) { + pathToFileAttribs.clear(); + } + pathToFileAttribs = null; + if (tempFileAttribs != null) { + tempFileAttribs.clear(); + } + tempFileAttribs = null; + } } /** @@ -236,6 +243,9 @@ public boolean renameRaw(Path src, Path dst) throws IOException { synchronized (this) { + if (exists(dst)) { + throw new IOException ("Path " + dst + " already exists"); + } FileAttributes fAttr = pathToFileAttribs.remove(getPath(src)); if (fAttr == null) return false; pathToFileAttribs.put(getPath(dst), fAttr); @@ -256,7 +266,9 @@ } public boolean exists(Path f) throws IOException { - return pathToFileAttribs.containsKey(getPath(f)); + synchronized (this) { + return pathToFileAttribs.containsKey(getPath(f)); + } } /** @@ -267,7 +279,9 @@ } public long getLength(Path f) throws IOException { - return pathToFileAttribs.get(getPath(f)).size; + synchronized (this) { + return pathToFileAttribs.get(getPath(f)).size; + } } /** @@ -363,13 +377,18 @@ public Path[] getFiles(PathFilter filter) { synchronized (this) { List <String> closedFilesList = new ArrayList(); - Set paths = pathToFileAttribs.keySet(); - if (paths == null || paths.isEmpty()) return new Path[0]; - Iterator iter = paths.iterator(); - while (iter.hasNext()) { - String f = (String)iter.next(); - if (filter.accept(new Path(f))) - closedFilesList.add(f); + synchronized (pathToFileAttribs) { + Set paths = pathToFileAttribs.keySet(); + if (paths == null || paths.isEmpty()) { + return new Path[0]; + } + Iterator iter = paths.iterator(); + while (iter.hasNext()) { + String f = (String)iter.next(); + if (filter.accept(new Path(f))) { + closedFilesList.add(f); + } + } } String [] names = closedFilesList.toArray(new String[closedFilesList.size()]); @@ -381,6 +400,10 @@ } } + public int getNumFiles(PathFilter filter) { + return getFiles(filter).length; + } + public int getFSSize() { return fsSize; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=511993&r1=511992&r2=511993 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Mon Feb 26 12:35:57 2007 @@ -2381,14 +2381,37 @@ do { //get the factor for this pass of merge factor = getPassFactor(passNo, numSegments); - //extract the smallest 'factor' number of segment pointers from the - //TreeMap - SegmentDescriptor[] mStream = getSegmentDescriptors(factor); - + List <SegmentDescriptor> segmentsToMerge = new ArrayList(); + int segmentsConsidered = 0; + int numSegmentsToConsider = factor; + while (true) { + //extract the smallest 'factor' number of segment pointers from the + //TreeMap. Call cleanup on the empty segments (no key/value data) + SegmentDescriptor[] mStream = + getSegmentDescriptors(numSegmentsToConsider); + for (int i = 0; i < mStream.length; i++) { + if (mStream[i].nextRawKey()) { + segmentsToMerge.add(mStream[i]); + segmentsConsidered++; + } + else { + mStream[i].cleanup(); + numSegments--; //we ignore this segment for the merge + } + } + //if we have the desired number of segments + //or looked at all available segments, we break + if (segmentsConsidered == factor || + sortedSegmentSizes.size() == 0) { + break; + } + + numSegmentsToConsider = factor - segmentsConsidered; + } //feed the streams to the priority queue - initialize(mStream.length); clear(); - for (int i = 0; i < mStream.length; i++) { - if (mStream[i].nextRawKey()) put(mStream[i]); + initialize(segmentsToMerge.size()); clear(); + for (int i = 0; i < segmentsToMerge.size(); i++) { + put(segmentsToMerge.get(i)); } //if we have lesser number of segments remaining, then just return the //iterator, else do another single level merge @@ -2396,10 +2419,13 @@ //calculate the length of the remaining segments. Required for //calculating the merge progress long totalBytes = 0; - for (int i = 0; i < numSegments; i++) - totalBytes += mStream[i].segmentLength; + for (int i = 0; i < segmentsToMerge.size(); i++) { + totalBytes += segmentsToMerge.get(i).segmentLength; + } if (totalBytes != 0) //being paranoid progPerByte = 1.0f / (float)totalBytes; + //reset factor to what it originally was + factor = origFactor; return this; } else { //we want to spread the creation of temp files on multiple disks if @@ -2410,8 +2436,8 @@ tmpFilename.toString()); LOG.info("writing intermediate results to " + outputFile); Writer writer = cloneFileAttributes( - fs.makeQualified(mStream[0].segmentPathName), - fs.makeQualified(outputFile), null); + fs.makeQualified(segmentsToMerge.get(0).segmentPathName), + fs.makeQualified(outputFile), null); writer.sync = null; //disable sync for temp files writeFile(this, writer); writer.close(); @@ -2420,17 +2446,6 @@ //queue this.close(); - //this is required to handle the corner case where we have empty - //map outputs to merge. The empty map outputs will just have the - //sequence file header; they won't be inserted in the priority - //queue. Thus, they won't be deleted in the regular process where - //cleanup happens when a stream is popped off (when the key/value - //from that stream has been iterated over) from the queue. - for (int i = 0; i < mStream.length; i++) { - if (mStream[i].in != null) //true if cleanup didn't happen - mStream[i].cleanup(); - } - SegmentDescriptor tempSegment = new SegmentDescriptor(0, fs.getLength(outputFile), outputFile); //put the segment back in the TreeMap @@ -2528,8 +2543,12 @@ */ public boolean nextRawKey() throws IOException { if (in == null) { + int bufferSize = conf.getInt("io.file.buffer.size", 4096); + if (fs.getUri().getScheme().startsWith("ramfs")) { + bufferSize = conf.getInt("io.bytes.per.checksum", 512); + } Reader reader = new Reader(fs, segmentPathName, - conf.getInt("io.file.buffer.size", 4096), segmentOffset, + bufferSize, segmentOffset, segmentLength, conf); //sometimes we ignore syncs especially for temp merge files Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=511993&r1=511992&r2=511993 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Mon Feb 26 12:35:57 2007 @@ -115,6 +115,11 @@ private volatile boolean mergeInProgress = false; /** + * When we accumulate merge_threshold number of files in ram, we merge/spill + */ + private int mergeThreshold = 500; + + /** * The threads for fetching the files. */ private MapOutputCopier[] copiers = null; @@ -316,8 +321,11 @@ " output from " + loc.getHost() + "."); //Create a thread to do merges. Synchronize access/update to //mergeInProgress - if (!mergeInProgress && inMemFileSys.getPercentUsed() >= - MAX_INMEM_FILESYS_USE) { + if (!mergeInProgress && + (inMemFileSys.getPercentUsed() >= MAX_INMEM_FILESYS_USE || + (mergeThreshold > 0 && + inMemFileSys.getNumFiles(MAP_OUTPUT_FILTER) >= mergeThreshold))&& + mergeThrowable == null) { LOG.info(reduceId + " InMemoryFileSystem " + inMemFileSys.getUri().toString() + " is " + inMemFileSys.getPercentUsed() + @@ -383,6 +391,7 @@ this.copyResults = new ArrayList(100); this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5); this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300); + this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000); //we want to distinguish inmem fs instances for different reduces. Hence, //append a unique string in the uri for the inmem fs name @@ -619,20 +628,6 @@ } } - if (mergeThrowable != null) { - //set the task state to FAILED - TaskTracker tracker = ReduceTaskRunner.this.getTracker(); - TaskTracker.TaskInProgress tip = - tracker.runningTasks.get(reduceTask.getTaskId()); - tip.runstate = TaskStatus.State.FAILED; - try { - tip.cleanup(); - } catch (Throwable ie2) { - // Ignore it, we are just trying to cleanup. - } - inMemFileSys.close(); - } - //Do a merge of in-memory files (if there are any) if (!killed && mergeThrowable == null) { try { @@ -644,6 +639,11 @@ " Copying of all map outputs complete. " + "Initiating the last merge on the remaining files in " + inMemFileSys.getUri()); + if (mergeThrowable != null) { + //this could happen if the merge that + //was in progress threw an exception + throw mergeThrowable; + } //initiate merge Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER); if (inMemClosedFiles.length == 0) { @@ -651,16 +651,28 @@ inMemFileSys.getUri()); return numCopied == numOutputs; } - RawKeyValueIterator rIter = - sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length, - new Path(reduceTask.getTaskId())); //name this output file same as the name of the first file that is //there in the current list of inmem files (this is guaranteed to be //absent on the disk currently. So we don't overwrite a prev. - //created spill) + //created spill). Also we need to create the output file now since + //it is not guaranteed that this file will be present after merge + //is called (we delete empty sequence files as soon as we see them + //in the merge method) SequenceFile.Writer writer = sorter.cloneFileAttributes( inMemFileSys.makeQualified(inMemClosedFiles[0]), localFileSys.makeQualified(inMemClosedFiles[0]), null); + + RawKeyValueIterator rIter = null; + try { + rIter = sorter.merge(inMemClosedFiles, true, inMemClosedFiles.length, + new Path(reduceTask.getTaskId())); + } catch (Exception e) { + //make sure that we delete the ondisk file that we created earlier + //when we invoked cloneFileAttributes + writer.close(); + localFileSys.delete(inMemClosedFiles[0]); + throw new IOException (StringUtils.stringifyException(e)); + } sorter.writeFile(rIter, writer); writer.close(); LOG.info(reduceTask.getTaskId() + @@ -668,14 +680,15 @@ " files in InMemoryFileSystem complete." + " Local file is " + inMemClosedFiles[0]); } catch (Throwable t) { - LOG.warn("Merge of the inmemory files threw an exception: " + + LOG.warn(reduceTask.getTaskId() + + " Final merge of the inmemory files threw an exception: " + StringUtils.stringifyException(t)); - inMemFileSys.close(); return false; } } return mergeThrowable == null && numCopied == numOutputs && !killed; } finally { + inMemFileSys.close(); pingTimer.interrupt(); } } @@ -780,15 +793,27 @@ //output files to merge to get the benefit of in-memory merge if (inMemClosedFiles.length >= (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) { - RawKeyValueIterator rIter = sorter.merge(inMemClosedFiles, true, - inMemClosedFiles.length, new Path(reduceTask.getTaskId())); //name this output file same as the name of the first file that is //there in the current list of inmem files (this is guaranteed to be //absent on the disk currently. So we don't overwrite a prev. - //created spill) + //created spill). Also we need to create the output file now since + //it is not guaranteed that this file will be present after merge + //is called (we delete empty sequence files as soon as we see them + //in the merge method) SequenceFile.Writer writer = sorter.cloneFileAttributes( inMemFileSys.makeQualified(inMemClosedFiles[0]), localFileSys.makeQualified(inMemClosedFiles[0]), null); + RawKeyValueIterator rIter; + try { + rIter = sorter.merge(inMemClosedFiles, true, + inMemClosedFiles.length, new Path(reduceTask.getTaskId())); + } catch (Exception e) { + //make sure that we delete the ondisk file that we created earlier + //when we invoked cloneFileAttributes + writer.close(); + localFileSys.delete(inMemClosedFiles[0]); + throw new IOException (StringUtils.stringifyException(e)); + } sorter.writeFile(rIter, writer); writer.close(); LOG.info(reduceTask.getTaskId() + @@ -801,7 +826,8 @@ inMemFileSys.getUri()); } } catch (Throwable t) { - LOG.warn("Merge of the inmemory files threw an exception: " + + LOG.warn(reduceTask.getTaskId() + + " Intermediate Merge of the inmemory files threw an exception: " + StringUtils.stringifyException(t)); ReduceTaskRunner.this.mergeThrowable = t; }