Author: cutting Date: Fri Jun 1 13:52:09 2007 New Revision: 543607 URL: http://svn.apache.org/viewvc?view=rev&rev=543607 Log: HADOOP-1431. Fix so that sort progress reporting during map truns only while sorting so that stuck maps are correctly terminated. Contributed by Arun & Devaraj.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=543607&r1=543606&r2=543607 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Jun 1 13:52:09 2007 @@ -511,6 +511,10 @@ 130. HADOOP-1332. Fix so that TaskTracker exits reliably during unit tests on Windows. (omalley via cutting) +131. HADOOP-1431. Fix so that sort progress reporting during map runs + only while sorting, so that stuck maps are correctly terminated. + (Devaraj Das and Arun C Murthy via cutting) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=543607&r1=543606&r2=543607 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Jun 1 13:52:09 2007 @@ -163,6 +163,7 @@ throws IOException { setProgress(getProgress()); + reportProgress(umbilical); long beforePos = getPos(); boolean ret = rawIn.next(key, value); if (ret) { @@ -178,22 +179,16 @@ } }; - Thread sortProgress = createProgressThread(umbilical); MapRunnable runner = (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job); try { - sortProgress.start(); runner.run(in, collector, reporter); collector.flush(); } finally { //close in.close(); // close input collector.close(); - sortProgress.interrupt(); - try { - sortProgress.join(); - } catch (InterruptedException ie){ } } done(umbilical); } @@ -220,6 +215,7 @@ }; sortProgress.setName("Sort progress reporter for task "+getTaskId()); sortProgress.setDaemon(true); + sortProgress.start(); return sortProgress; } @@ -273,6 +269,7 @@ private Partitioner partitioner; private JobConf job; private Reporter reporter; + final private TaskUmbilicalProtocol umbilical; private DataOutputBuffer keyValBuffer; //the buffer where key/val will //be stored before they are @@ -302,6 +299,7 @@ this.job = job; this.reporter = reporter; + this.umbilical = umbilical; this.comparator = job.getOutputKeyComparator(); this.keyClass = job.getMapOutputKeyClass(); this.valClass = job.getMapOutputValueClass(); @@ -324,6 +322,7 @@ job.getClass("map.sort.class", MergeSorter.class, BufferSorter.class), job); } + private void startPartition(int partNumber) throws IOException { //We create the sort output as multiple sequence files within a spilled //file. So we create a writer for each partition. @@ -376,10 +375,20 @@ for (int i = 0; i < partitions; i++) totalMem += sortImpl[i].getMemoryUtilized(); if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) { - sortAndSpillToDisk(); - keyValBuffer.reset(); - for (int i = 0; i < partitions; i++) - sortImpl[i].close(); + + // Start the progress thread + Thread progress = createProgressThread(umbilical); + + try { + sortAndSpillToDisk(); + keyValBuffer.reset(); + for (int i = 0; i < partitions; i++) { + sortImpl[i].close(); + } + } finally { + // Stop the progress thread + progress.interrupt(); + } } } } @@ -597,13 +606,22 @@ } public void flush() throws IOException { - //check whether the length of the key/value buffer is 0. If not, then - //we need to spill that to disk. Note that we reset the key/val buffer - //upon each spill (so a length > 0 means that we have not spilled yet) - if (keyValBuffer.getLength() > 0) { - sortAndSpillToDisk(); + + // Start the progress thread + Thread progress = createProgressThread(umbilical); + + try { + //check whether the length of the key/value buffer is 0. If not, then + //we need to spill that to disk. Note that we reset the key/val buffer + //upon each spill (so a length > 0 means that we have not spilled yet) + if (keyValBuffer.getLength() > 0) { + sortAndSpillToDisk(); + } + mergeParts(); + } finally { + // Stop the progress thread + progress.interrupt(); } - mergeParts(); } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=543607&r1=543606&r2=543607 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Jun 1 13:52:09 2007 @@ -37,6 +37,7 @@ import java.util.Random; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -78,7 +79,7 @@ private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName()); private int numMaps; - private boolean sortComplete; + AtomicBoolean sortComplete = new AtomicBoolean(false); private ReduceCopier reduceCopier; { @@ -283,7 +284,7 @@ // spawn a thread to give sort progress heartbeats Thread sortProgress = new Thread() { public void run() { - while (!sortComplete) { + while (!sortComplete.get()) { try { reportProgress(umbilical); Thread.sleep(PROGRESS_INTERVAL); @@ -298,6 +299,7 @@ } } }; + sortProgress.setDaemon(true); sortProgress.setName("Sort progress reporter for task "+getTaskId()); Path tempDir = new Path(getTaskId()); @@ -317,7 +319,7 @@ !conf.getKeepFailedTaskFiles()); // sort } finally { - sortComplete = true; + sortComplete.set(true); } sortPhase.complete(); // sort is complete