Author: cutting Date: Wed Mar 7 15:13:21 2007 New Revision: 515840 URL: http://svn.apache.org/viewvc?view=rev&rev=515840 Log: HADOOP-1077. Fix a race condition fetching map outputs that could hang reduces. Contributed by Devaraj.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=515840&r1=515839&r2=515840 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Mar 7 15:13:21 2007 @@ -18,6 +18,8 @@ 4. HADOOP-1060. Fix an IndexOutOfBoundsException in the JobTracker that could cause jobs to hang. (Arun C Murthy via cutting) + 5. HADOOP-1077. Fix a race condition fetching map outputs that could + hang reduces. (Devaraj Das via cutting) Release 0.12.0 - 2007-03-02 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=515840&r1=515839&r2=515840 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Wed Mar 7 15:13:21 2007 @@ -144,6 +144,12 @@ */ private Map<Integer, MapOutputLocation> retryFetches = new HashMap(); + /** + * a TreeSet for needed map outputs + */ + private Set <Integer> neededOutputs = + Collections.synchronizedSet(new TreeSet<Integer>()); + /** Represents the result of an attempt to copy a map output */ private class CopyResult { @@ -152,7 +158,10 @@ // the size of the file copied, -1 if the transfer failed private final long size; - + + //a flag signifying whether a copy result is obsolete + private static final int OBSOLETE = -2; + CopyResult(MapOutputLocation loc, long size) { this.loc = loc; this.size = size; @@ -160,6 +169,9 @@ public int getMapId() { return loc.getMapId(); } public boolean getSuccess() { return size >= 0; } + public boolean isObsolete() { + return size == OBSOLETE; + } public long getSize() { return size; } public String getHost() { return loc.getHost(); } public MapOutputLocation getLocation() { return loc; } @@ -284,7 +296,9 @@ */ private long copyOutput(MapOutputLocation loc ) throws IOException, InterruptedException { - + if (!neededOutputs.contains(loc.getMapId())) { + return CopyResult.OBSOLETE; + } String reduceId = reduceTask.getTaskId(); LOG.info(reduceId + " Copying " + loc.getMapTaskId() + " output from " + loc.getHost() + "."); @@ -297,16 +311,28 @@ tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics, tmpFilename, reduceTask.getPartition(), STALLED_COPY_TIMEOUT); + if (!neededOutputs.contains(loc.getMapId())) { + if (tmpFilename != null) { + FileSystem fs = tmpFilename.getFileSystem(conf); + fs.delete(tmpFilename); + } + return CopyResult.OBSOLETE; + } if (tmpFilename == null) throw new IOException("File " + finalFilename + "-" + id + " not created"); long bytes = -1; // lock the ReduceTaskRunner while we do the rename synchronized (ReduceTaskRunner.this) { - // if we can't rename the file, something is broken (and IOException - // will be thrown). This file could have been created in the inmemory + // This file could have been created in the inmemory // fs or the localfs. So need to get the filesystem owning the path. FileSystem fs = tmpFilename.getFileSystem(conf); + if (!neededOutputs.contains(loc.getMapId())) { + fs.delete(tmpFilename); + return CopyResult.OBSOLETE; + } + // if we can't rename the file, something is broken (and IOException + // will be thrown). if (!fs.rename(tmpFilename, finalFilename)) { fs.delete(tmpFilename); throw new IOException("failure to rename map output " + tmpFilename); @@ -332,6 +358,7 @@ mergeInProgress = true; m.start(); } + neededOutputs.remove(loc.getMapId()); } return bytes; } @@ -424,7 +451,6 @@ this.mapOutputFile.removeAll(reduceTask.getTaskId()); final int numOutputs = reduceTask.getNumMaps(); - List neededOutputs = new ArrayList(numOutputs); Map<Integer, MapOutputLocation> knownOutputs = new HashMap<Integer, MapOutputLocation>(); int numInFlight = 0, numCopied = 0; @@ -484,23 +510,12 @@ List <MapOutputLocation> locs = queryJobTracker(fromEventId, jobClient); - // remove discovered outputs from needed list - // and put them on the known list - int gotLocs = (locs == null ? 0 : locs.size()); + // put discovered them on the known list for (int i=0; i < locs.size(); i++) { - // check whether we actually need an output. It could happen - // that a map task that successfully ran earlier got lost, but - // if we already have copied the output of that unfortunate task - // we need not copy it again from the new TT (we will ignore - // the event for the new rescheduled execution) - if(neededOutputs.remove(new Integer(locs.get(i).getMapId()))) { - knownOutputs.put(new Integer(locs.get(i).getMapId()), locs.get(i)); - } - else gotLocs--; //we don't need this output - + knownOutputs.put(new Integer(locs.get(i).getMapId()), locs.get(i)); } LOG.info(reduceTask.getTaskId() + - " Got " + gotLocs + + " Got " + locs.size() + " new map outputs from jobtracker and " + retryFetches.size() + " map outputs from previous failures"); // clear the "failed" fetches hashmap @@ -575,9 +590,13 @@ copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs + " at " + mbpsFormat.format(transferRate) + " MB/s)"); + } else if (cr.isObsolete()) { + //ignore + LOG.info(reduceTask.getTaskId() + + " Ignoring obsolete copy result for Map Task: " + + cr.getLocation().getMapTaskId() + " from host: " + + cr.getHost()); } else { - // this copy failed, put it back onto neededOutputs - neededOutputs.add(new Integer(cr.getMapId())); retryFetches.put(new Integer(cr.getMapId()), cr.getLocation()); // wait a random amount of time for next contact @@ -600,7 +619,6 @@ if (cr.getHost().equals(loc.getHost())) { retryFetches.put(new Integer(loc.getMapId()), loc); locIt.remove(); - neededOutputs.add(new Integer(loc.getMapId())); } } }