Author: cutting Date: Mon Nov 20 15:35:06 2006 New Revision: 477407 URL: http://svn.apache.org/viewvc?view=rev&rev=477407 Log: HADOOP-723. Fix a race condition during the shuffle. Contributed by Owen.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=477407&r1=477406&r2=477407 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Nov 20 15:35:06 2006 @@ -94,6 +94,9 @@ 28. HADOOP-725. In DFS, optimize block placement algorithm, previously a performance bottleneck. (Milind Bhandarkar via cutting) +29. HADOOP-723. In MapReduce, fix a race condition during the + shuffle, which resulted in FileNotFoundExceptions. (omalley via cutting) + Release 0.8.0 - 2006-11-03 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?view=diff&rev=477407&r1=477406&r2=477407 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Mon Nov 20 15:35:06 2006 @@ -100,28 +100,41 @@ * @param localFilename the filename to write the data into * @param reduce the reduce id to get for * @param pingee a status object that wants to know when we make progress + * @param timeout number of ms for connection and read timeout * @throws IOException when something goes wrong */ public long getFile(FileSystem fileSys, Path localFilename, int reduce, - Progressable pingee) throws IOException { + Progressable pingee, + int timeout) throws IOException, InterruptedException { boolean good = false; long totalBytes = 0; + Thread currentThread = Thread.currentThread(); URL path = new URL(toString() + "&reduce=" + reduce); try { URLConnection connection = path.openConnection(); + if (timeout > 0) { + connection.setConnectTimeout(timeout); + connection.setReadTimeout(timeout); + } InputStream input = connection.getInputStream(); try { OutputStream output = fileSys.create(localFilename); try { byte[] buffer = new byte[64 * 1024]; + if (currentThread.isInterrupted()) { + throw new InterruptedException(); + } int len = input.read(buffer); while (len > 0) { totalBytes += len; output.write(buffer, 0 ,len); if (pingee != null) { pingee.progress(); + } + if (currentThread.isInterrupted()) { + throw new InterruptedException(); } len = input.read(buffer); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=477407&r1=477406&r2=477407 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Mon Nov 20 15:35:06 2006 @@ -28,6 +28,8 @@ /** Runs a reduce task. */ class ReduceTaskRunner extends TaskRunner { + /** Number of ms before timing out a copy */ + private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000; /** * for cleaning up old map outputs @@ -137,11 +139,14 @@ } } + private static int nextMapOutputCopierId = 0; + /** Copies map outputs as they become available */ private class MapOutputCopier extends Thread { private PingTimer pingTimer = new PingTimer(); private MapOutputLocation currentLocation = null; + private int id = nextMapOutputCopierId++; public MapOutputCopier() { } @@ -192,8 +197,8 @@ * The thread exits when it is interrupted by the [EMAIL PROTECTED] ReduceTaskRunner} */ public void run() { - try { - while (true) { + while (true) { + try { MapOutputLocation loc = null; long size = -1; @@ -215,8 +220,13 @@ LOG.warn(StringUtils.stringifyException(e)); } finish(size); + } catch (InterruptedException e) { + return; // ALL DONE + } catch (Throwable th) { + LOG.error("Map output copy failure: " + + StringUtils.stringifyException(th)); } - } catch (InterruptedException e) { } // ALL DONE! + } } /** Copies a a map output from a remote host, using raw RPC. @@ -224,44 +234,48 @@ * @param pingee a status object to ping as we make progress * @return the size of the copied file * @throws IOException if there is an error copying the file + * @throws InterruptedException if the copier should give up */ private long copyOutput(MapOutputLocation loc, - Progressable pingee) - throws IOException { + Progressable pingee + ) throws IOException, InterruptedException { String reduceId = reduceTask.getTaskId(); LOG.info(reduceId + " Copying " + loc.getMapTaskId() + " output from " + loc.getHost() + "."); - - try { - // this copies the map output file - Path filename = conf.getLocalPath(reduceId + "/map_" + - loc.getMapId() + ".out"); - long bytes = loc.getFile(localFileSys, filename, - reduceTask.getPartition(), pingee); - - LOG.info(reduceTask.getTaskId() + " done copying " + loc.getMapTaskId() + - " output from " + loc.getHost() + "."); - - return bytes; - } - catch (IOException e) { - LOG.warn(reduceTask.getTaskId() + " failed to copy " + loc.getMapTaskId() + - " output from " + loc.getHost() + "."); - throw e; + // the place where the file should end up + Path finalFilename = conf.getLocalPath(reduceId + "/map_" + + loc.getMapId() + ".out"); + // a working filename that will be unique to this attempt + Path tmpFilename = new Path(finalFilename + "-" + id); + // this copies the map output file + long bytes = loc.getFile(localFileSys, tmpFilename, + reduceTask.getPartition(), pingee, + STALLED_COPY_TIMEOUT); + // lock the ReduceTaskRunner while we do the rename + synchronized (ReduceTaskRunner.this) { + // if we can't rename the file, something is broken + if (!(new File(tmpFilename.toString()). + renameTo(new File(finalFilename.toString())))) { + localFileSys.delete(tmpFilename); + throw new IOException("failure to rename map output " + tmpFilename); + } } + LOG.info(reduceTask.getTaskId() + " done copying " + loc.getMapTaskId() + + " output from " + loc.getHost() + "."); + + return bytes; } } private class MapCopyLeaseChecker extends Thread { - private static final long STALLED_COPY_TIMEOUT = 3 * 60 * 1000; private static final long STALLED_COPY_CHECK = 60 * 1000; private long lastStalledCheck = 0; public void run() { - try { - while (true) { + while (true) { + try { long currentTime = System.currentTimeMillis(); if (currentTime - lastStalledCheck > STALLED_COPY_CHECK) { lastStalledCheck = currentTime; @@ -288,9 +302,13 @@ } else { Thread.sleep(lastStalledCheck + STALLED_COPY_CHECK - currentTime); } + } catch (InterruptedException ie) { + return; + } catch (Throwable th) { + LOG.error("MapCopyLeaseChecker error: " + + StringUtils.stringifyException(th)); } - } catch (InterruptedException ie) {} - + } } }