Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=667040&r1=667039&r2=667040&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu Jun 12 03:42:31 2008 @@ -422,11 +422,6 @@ */ private volatile boolean exitLocalFSMerge = false; - /** - * A flag to indicate when to exit InMemMerge - */ - private volatile boolean exitInMemMerge = false; - /** * When we accumulate mergeThreshold number of files in ram, we merge/spill */ @@ -712,42 +707,60 @@ class ShuffleRamManager implements RamManager { /* Maximum percentage of the in-memory limit that a single shuffle can * consume*/ - private static final float MAX_SINGLE_SHUFFLE_SEGMENT_PERCENT = 0.25f; + private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f; - private boolean closed = false; + /* Maximum percentage of shuffle-threads which can be stalled + * simultaneously after which a merge is triggered. */ + private static final float MAX_STALLED_SHUFFLE_THREADS_FRACTION = 0.75f; - volatile private int numClosed = 0; - volatile private int size = 0; private final int maxSize; private final int maxSingleShuffleLimit; + private int size = 0; + private Object dataAvailable = new Object(); - private volatile int fullSize = 0; + private int fullSize = 0; + private int numPendingRequests = 0; + private int numRequiredMapOutputs = 0; + private int numClosed = 0; + private boolean closed = false; public ShuffleRamManager(Configuration conf) { maxSize = conf.getInt("fs.inmemory.size.mb", 100) * 1024 * 1024; - maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_PERCENT); + maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION); LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit); } - public synchronized boolean reserve(int requestedSize, InputStream in) { + public synchronized boolean reserve(int requestedSize, InputStream in) + throws InterruptedException { + // Wait till the request can be fulfilled... while ((size + requestedSize) > maxSize) { - try { - // Close the connection - if (in != null) { - try { - in.close(); - } catch (IOException ie) { - LOG.info("Failed to close connection with: " + ie); - } finally { - in = null; - } + + // Close the input... + if (in != null) { + try { + in.close(); + } catch (IOException ie) { + LOG.info("Failed to close connection with: " + ie); + } finally { + in = null; } - - // Wait for memory to free up - wait(); - } catch (InterruptedException ie) {} + } + + // Track pending requests + synchronized (dataAvailable) { + ++numPendingRequests; + dataAvailable.notify(); + } + + // Wait for memory to free up + wait(); + + // Track pending requests + synchronized (dataAvailable) { + --numPendingRequests; + } } size += requestedSize; @@ -767,20 +780,25 @@ notifyAll(); } - public void waitForDataToMerge() { + public boolean waitForDataToMerge() throws InterruptedException { + boolean done = false; synchronized (dataAvailable) { while (!closed && (getPercentUsed() < MAX_INMEM_FILESYS_USE || - getReservedFiles() < + numClosed < (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION) ) && - (mergeThreshold <= 0 || getReservedFiles() < mergeThreshold)) { - try { - dataAvailable.wait(); - } catch (InterruptedException ie) {} + (mergeThreshold <= 0 || numClosed < mergeThreshold) + && + (numPendingRequests < + numCopiers*MAX_STALLED_SHUFFLE_THREADS_FRACTION && + numPendingRequests < numRequiredMapOutputs)) { + dataAvailable.wait(); } + done = closed; } + return done; } public void closeInMemoryFile(int requestedSize) { @@ -791,6 +809,13 @@ } } + public void setNumCopiedMapOutputs(int numRequiredMapOutputs) { + synchronized (dataAvailable) { + this.numRequiredMapOutputs = numRequiredMapOutputs; + dataAvailable.notify(); + } + } + public void close() { synchronized (dataAvailable) { closed = true; @@ -799,14 +824,10 @@ } } - float getPercentUsed() { + private float getPercentUsed() { return (float)fullSize/maxSize; } - - int getReservedFiles() { - return numClosed; - } - + int getMemoryLimit() { return maxSize; } @@ -978,7 +999,8 @@ } // Note that we successfully copied the map-output - copiedMapOutputs.add(loc.getTaskId()); + noteCopiedMapOutput(loc.getTaskId()); + return bytes; } @@ -1004,12 +1026,22 @@ } // Note that we successfully copied the map-output - copiedMapOutputs.add(loc.getTaskId()); + noteCopiedMapOutput(loc.getTaskId()); } return bytes; } + /** + * Save the map taskid whose output we just copied. + * This function assumes that it has been synchronized on ReduceTask.this. + * + * @param taskId map taskid + */ + private void noteCopiedMapOutput(TaskID taskId) { + copiedMapOutputs.add(taskId); + ramManager.setNumCopiedMapOutputs(numMaps - copiedMapOutputs.size()); + } /** * Get the map output into a local file (either in the inmemory fs or on the @@ -1248,6 +1280,7 @@ this.shuffleClientMetrics = new ShuffleClientMetrics(conf); this.umbilical = umbilical; this.reduceTask = ReduceTask.this; + this.scheduledCopies = new ArrayList<MapOutputLocation>(100); this.copyResults = new ArrayList<CopyResult>(100); this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5); @@ -1304,7 +1337,6 @@ @SuppressWarnings("unchecked") public boolean fetchOutputs() throws IOException { - final int numOutputs = reduceTask.getNumMaps(); List<MapOutputLocation> knownOutputs = new ArrayList<MapOutputLocation>(numCopiers); int totalFailures = 0; @@ -1316,7 +1348,7 @@ LocalFSMerger localFSMergerThread = null; InMemFSMergeThread inMemFSMergeThread = null; - for (int i = 0; i < numOutputs; i++) { + for (int i = 0; i < numMaps; i++) { copyPhase.addPhase(); // add sub-phase per file } @@ -1346,7 +1378,7 @@ IntWritable fromEventId = new IntWritable(0); // loop until we get all required outputs - while (copiedMapOutputs.size() < numOutputs && mergeThrowable == null) { + while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) { currentTime = System.currentTimeMillis(); boolean logNow = false; @@ -1356,7 +1388,7 @@ } if (logNow) { LOG.info(reduceTask.getTaskID() + " Need another " - + (numOutputs - copiedMapOutputs.size()) + " map output(s) " + + (numMaps - copiedMapOutputs.size()) + " map output(s) " + "where " + numInFlight + " is already in progress"); } @@ -1503,7 +1535,7 @@ float transferRate = mbs/secsSinceStart; copyPhase.startNextPhase(); - copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + copyPhase.setStatus("copy (" + numCopied + " of " + numMaps + " at " + mbpsFormat.format(transferRate) + " MB/s)"); @@ -1640,7 +1672,6 @@ mapOutputFilesOnDisk.notify(); } - exitInMemMerge = true; ramManager.close(); //Do a merge of in-memory files (if there are any) @@ -1648,9 +1679,13 @@ try { // Wait for the on-disk merge to complete localFSMergerThread.join(); + LOG.info("Interleaved on-disk merge complete: " + + mapOutputFilesOnDisk.size() + " files left."); //wait for an ongoing merge (if it is in flight) to complete inMemFSMergeThread.join(); + LOG.info("In-memory merge complete: " + + mapOutputsFilesInMemory.size() + " files left."); } catch (Throwable t) { LOG.warn(reduceTask.getTaskID() + " Final merge of the inmemory files threw an exception: " + @@ -1662,7 +1697,7 @@ return false; } } - return mergeThrowable == null && copiedMapOutputs.size() == numOutputs; + return mergeThrowable == null && copiedMapOutputs.size() == numMaps; } private List<Segment<K, V>> createInMemorySegments() { @@ -1908,10 +1943,11 @@ public void run() { LOG.info(reduceTask.getTaskID() + " Thread started: " + getName()); try { - while (!exitInMemMerge) { - ramManager.waitForDataToMerge(); + boolean exit = false; + do { + exit = ramManager.waitForDataToMerge(); doInMemMerge(); - } + } while (!exit); } catch (Throwable t) { LOG.warn(reduceTask.getTaskID() + " Merge of the inmemory files threw an exception: " @@ -1923,7 +1959,6 @@ @SuppressWarnings("unchecked") private void doInMemMerge() throws IOException{ if (mapOutputsFilesInMemory.size() == 0) { - LOG.info("Noting to merge... "); return; } @@ -1953,12 +1988,16 @@ RawKeyValueIterator rIter = null; final Reporter reporter = getReporter(umbilical); try { + LOG.info("Initiating in-memory merge with " + noInMemorySegments + + " segments..."); + rIter = Merger.merge(conf, localFileSys, (Class<K>)conf.getMapOutputKeyClass(), (Class<V>)conf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(reduceTask.getTaskID().toString()), conf.getOutputKeyComparator(), reporter); + if (null == combinerClass) { Merger.writeFile(rIter, writer, reporter); } else { @@ -1966,6 +2005,12 @@ combineAndSpill(rIter, reduceCombineInputCounter); } writer.close(); + + LOG.info(reduceTask.getTaskID() + + " Merge of the " + noInMemorySegments + + " files in-memory complete." + + " Local file is " + outputPath + " of size " + + localFileSys.getFileStatus(outputPath).getLen()); } catch (Exception e) { //make sure that we delete the ondisk file that we created //earlier when we invoked cloneFileAttributes @@ -1973,12 +2018,8 @@ throw (IOException)new IOException ("Intermedate merge failed").initCause(e); } - LOG.info(reduceTask.getTaskID() + - " Merge of the " + noInMemorySegments + - " files in-memory complete." + - " Local file is " + outputPath + " of size " + - localFileSys.getFileStatus(outputPath).getLen()); - + + // Note the output of the merge FileStatus status = localFileSys.getFileStatus(outputPath); synchronized (mapOutputFilesOnDisk) { addToMapOutputFilesOnDisk(status);
