Author: cutting Date: Fri Dec 1 14:26:30 2006 New Revision: 481429 URL: http://svn.apache.org/viewvc?view=rev&rev=481429 Log: HADOOP-750. Fix a potential race condition during the mapreduce shuffle. Contributed by Owen.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=481429&r1=481428&r2=481429 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Dec 1 14:26:30 2006 @@ -143,6 +143,9 @@ 42. HADOOP-430. Stop datanode's HTTP server when registration with namenode fails. (Wendy Chien via cutting) +43. HADOOP-750. Fix a potential race condition during mapreduce + shuffle. (omalley via cutting) + Release 0.8.0 - 2006-11-03 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=481429&r1=481428&r2=481429 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri Dec 1 14:26:30 2006 @@ -121,24 +121,14 @@ } private class PingTimer implements Progressable { - private long pingTime; - - public synchronized void reset() { - pingTime = 0; - } - - public synchronized long getLastPing() { - return pingTime; - } + Task task = getTask(); + TaskTracker tracker = getTracker(); public void progress() { - synchronized (this) { - pingTime = System.currentTimeMillis(); - getTask().reportProgress(getTracker()); - } + task.reportProgress(tracker); } } - + private static int nextMapOutputCopierId = 0; /** Copies map outputs as they become available */ @@ -149,14 +139,8 @@ private int id = nextMapOutputCopierId++; public MapOutputCopier() { - } - - /** - * Get the last time that this copier made progress. - * @return the System.currentTimeMillis when this copier last made progress - */ - public long getLastProgressTime() { - return pingTimer.getLastPing(); + setName("MapOutputCopier " + reduceTask.getTaskId() + "." + id); + LOG.debug(getName() + " created"); } /** @@ -185,6 +169,7 @@ private synchronized void finish(long size) { if (currentLocation != null) { + LOG.debug(getName() + " finishing " + currentLocation + " = " + size); synchronized (copyResults) { copyResults.add(new CopyResult(currentLocation, size)); copyResults.notify(); @@ -211,15 +196,14 @@ try { start(loc); - pingTimer.progress(); size = copyOutput(loc, pingTimer); - pingTimer.reset(); } catch (IOException e) { LOG.warn(reduceTask.getTaskId() + " copy failed: " + loc.getMapTaskId() + " from " + loc.getHost()); LOG.warn(StringUtils.stringifyException(e)); + } finally { + finish(size); } - finish(size); } catch (InterruptedException e) { return; // ALL DONE } catch (Throwable th) { @@ -268,49 +252,6 @@ } } - - private class MapCopyLeaseChecker extends Thread { - private static final long STALLED_COPY_CHECK = 60 * 1000; - private long lastStalledCheck = 0; - - public void run() { - while (true) { - try { - long currentTime = System.currentTimeMillis(); - if (currentTime - lastStalledCheck > STALLED_COPY_CHECK) { - lastStalledCheck = currentTime; - synchronized (copiers) { - for(int i=0; i < copiers.length; ++i) { - if (copiers[i] == null) { - break; - } - long lastProgress = copiers[i].getLastProgressTime(); - if (lastProgress != 0 && - currentTime - lastProgress > STALLED_COPY_TIMEOUT) { - LOG.warn("Map output copy stalled on " + - copiers[i].getLocation()); - // mark the current file as failed - copiers[i].fail(); - // tell the thread to stop - copiers[i].interrupt(); - // create a replacement thread - copiers[i] = new MapOutputCopier(); - copiers[i].start(); - } - } - } - } else { - Thread.sleep(lastStalledCheck + STALLED_COPY_CHECK - currentTime); - } - } catch (InterruptedException ie) { - return; - } catch (Throwable th) { - LOG.error("MapCopyLeaseChecker error: " + - StringUtils.stringifyException(th)); - } - } - } - } public ReduceTaskRunner(Task task, TaskTracker tracker, JobConf conf) throws IOException { @@ -352,7 +293,6 @@ DecimalFormat mbpsFormat = new DecimalFormat("0.00"); Random backoff = new Random(); final Progress copyPhase = getTask().getProgress().phase(); - MapCopyLeaseChecker leaseChecker = null; for (int i = 0; i < numOutputs; i++) { neededOutputs.add(new Integer(i)); @@ -367,8 +307,6 @@ copiers[i] = new MapOutputCopier(); copiers[i].start(); } - leaseChecker = new MapCopyLeaseChecker(); - leaseChecker.start(); // start the clock for bandwidth measurement long startTime = System.currentTimeMillis(); @@ -450,6 +388,7 @@ } catch (InterruptedException e) { } // IGNORE while (!killed && numInFlight > 0) { + LOG.debug(reduceTask.getTaskId() + " numInFlight = " + numInFlight); CopyResult cr = getCopyResult(); if (cr != null) { @@ -506,7 +445,6 @@ } // all done, inform the copiers to exit - leaseChecker.interrupt(); synchronized (copiers) { synchronized (scheduledCopies) { for (int i=0; i < copiers.length; i++) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=481429&r1=481428&r2=481429 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Fri Dec 1 14:26:30 2006 @@ -33,7 +33,7 @@ public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner"); - boolean killed = false; + volatile boolean killed = false; private Process process; private Task t; private TaskTracker tracker;