Author: ddas Date: Thu Jan 3 04:15:53 2008 New Revision: 608462 URL: http://svn.apache.org/viewvc?rev=608462&view=rev Log: HADOOP-1719. Improves the utilization of shuffle copier threads. Contributed by Amar Kamat.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=608462&r1=608461&r2=608462&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Jan 3 04:15:53 2008 @@ -144,6 +144,9 @@ performance. Make NullWritable implement Comparable. Make TextOutputFormat treat NullWritable like null. (omalley) + HADOOP-1719. Improves the utilization of shuffle copier threads. + (Amar Kamat via ddas) + OPTIMIZATIONS HADOOP-1898. Release the lock protecting the last time of the last stack Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=608462&r1=608461&r2=608462&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Jan 3 04:15:53 2008 @@ -959,21 +959,30 @@ // MapOutputLocations as values knownOutputs.addAll(retryFetches); - // The call getMapCompletionEvents will update fromEventId to - // used for the next call to getMapCompletionEvents - int currentNumKnownMaps = knownOutputs.size(); - int currentNumObsoleteMapIds = obsoleteMapIds.size(); - getMapCompletionEvents(fromEventId, knownOutputs); + // ensure we have enough to keep us busy + boolean busy = isBusy(numInFlight, numCopiers, lowThreshold, + uniqueHosts.size(), probe_sample_size, + numOutputs - numCopied); + if (!busy) { + // The call getMapCompletionEvents will update fromEventId to + // used for the next call to getMapCompletionEvents + int currentNumKnownMaps = knownOutputs.size(); + int currentNumObsoleteMapIds = obsoleteMapIds.size(); + getMapCompletionEvents(fromEventId, knownOutputs); - LOG.info(reduceTask.getTaskId() + ": " + + LOG.info(reduceTask.getTaskId() + ": " + "Got " + (knownOutputs.size()-currentNumKnownMaps) + " new map-outputs & " + (obsoleteMapIds.size()-currentNumObsoleteMapIds) + " obsolete map-outputs from tasktracker and " + retryFetches.size() + " map-outputs from previous failures" ); - + } else { + LOG.info(" Busy enough - did not query the tasktracker for " + + "new map outputs. Have "+ retryFetches.size() + + " map outputs from previous failures"); + } // clear the "failed" fetches hashmap retryFetches.clear(); } @@ -1181,16 +1190,10 @@ numInFlight--; } - boolean busy = true; - // ensure we have enough to keep us busy - if (numInFlight < lowThreshold && (numOutputs-numCopied) > - probe_sample_size) { - busy = false; - } //Check whether we have more CopyResult to check. If there is none, - //and we are not busy enough, break + //break synchronized (copyResults) { - if (copyResults.size() == 0 && !busy) { + if (copyResults.size() == 0) { break; } } @@ -1276,12 +1279,30 @@ } } + /** Added a check for whether #uniqueHosts < #copiers, and if so conclude + * we are not busy enough. The logic is that we fetch only one map output + * at a time from any given host and uniqueHosts keep a track of that. + * As soon as we add a host to uniqueHosts, a 'copy' from that is + * scheduled as well. Thus, when the size of uniqueHosts is >= numCopiers, + * it means that all copiers are busy. Although the converse is not true + * (e.g. in the case where we have more copiers than the number of hosts + * in the cluster), but it should generally be useful to do this check. + **/ + private boolean isBusy(int numInFlight, int numCopiers, int lowThreshold, + int uniqueHostsSize, int probeSampleSize, + int remainCopy) { + if ((numInFlight < lowThreshold && remainCopy > probeSampleSize) || + uniqueHostsSize < numCopiers) { + return false; + } + return true; + } private CopyResult getCopyResult() { synchronized (copyResults) { - while (copyResults.isEmpty()) { + if (copyResults.isEmpty()) { try { - copyResults.wait(); + copyResults.wait(2000); // wait for 2 sec } catch (InterruptedException e) { } } if (copyResults.isEmpty()) {