Author: cutting Date: Thu Jan 4 10:49:27 2007 New Revision: 492677 URL: http://svn.apache.org/viewvc?view=rev&rev=492677 Log: HADOOP-846. Report progress during entire map. Contributed by Devaraj.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=492677&r1=492676&r2=492677 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Jan 4 10:49:27 2007 @@ -181,6 +181,10 @@ distributions. Also add contrib and example documentation to distributed javadoc, in separate sections. (Nigel Daley via cutting) +52. HADOOP-846. Report progress during entire map, as sorting of + intermediate outputs may happen at any time, potentially causing + task timeouts. (Devaraj Das via cutting) + Release 0.9.2 - 2006-12-15 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=492677&r1=492676&r2=492677 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Thu Jan 4 10:49:27 2007 @@ -161,7 +161,7 @@ public synchronized boolean next(Writable key, Writable value) throws IOException { - reportProgress(umbilical, getProgress()); + setProgress(getProgress()); long beforePos = getPos(); boolean ret = rawIn.next(key, value); myMetrics.mapInput(getPos() - beforePos); @@ -174,13 +174,13 @@ } }; + Thread sortProgress = createProgressThread(umbilical); MapRunnable runner = (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job); try { + sortProgress.start(); runner.run(in, collector, reporter); // run the map - } finally { - in.close(); // close input //check whether the length of the key/value buffer is 0. If not, then //we need to spill that to disk. Note that we reset the key/val buffer //upon each spill (so a length > 0 means that we have not spilled yet) @@ -189,12 +189,40 @@ } //merge the partitions from the spilled files and create one output collector.mergeParts(); + } finally { //close + in.close(); // close input collector.close(); + sortProgress.interrupt(); } done(umbilical); } + private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) { + //spawn a thread to give merge progress heartbeats + Thread sortProgress = new Thread() { + public void run() { + LOG.info("Started thread: " + getName()); + while (true) { + try { + reportProgress(umbilical); + Thread.sleep(PROGRESS_INTERVAL); + } catch (InterruptedException e) { + return; + } catch (Throwable e) { + LOG.info("Thread Exception in " + + "reporting sort progress\n" + + StringUtils.stringifyException(e)); + continue; + } + } + } + }; + sortProgress.setName("Sort progress reporter for task "+getTaskId()); + sortProgress.setDaemon(true); + return sortProgress; + } + public void setConf(Configuration conf) { if (conf instanceof JobConf) { this.conf = (JobConf) conf; @@ -298,7 +326,6 @@ int partNumber = partitioner.getPartition(key, value, partitions); sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength); - reportProgress(umbilical); myMetrics.mapOutput(keyValBuffer.getLength() - keyOffset); //now check whether we need to spill to disk @@ -348,7 +375,6 @@ throws IOException { synchronized (this) { writer.append(key, value); - reportProgress(umbilical); } } }; @@ -374,7 +400,6 @@ while (values.more()) { combiner.reduce(values.getKey(), values, combineCollector, reporter); values.nextKey(); - reportProgress(umbilical); } } @@ -402,7 +427,6 @@ value.readFields(valIn); writer.append(key, value); - reportProgress(umbilical); } } @@ -435,34 +459,12 @@ compressionType, codec); finalIndexOut.writeLong(segmentStart); finalIndexOut.writeLong(finalOut.getPos() - segmentStart); - reportProgress(umbilical); } finalOut.close(); finalIndexOut.close(); return; } - //spawn a thread to give merge progress heartbeats - Thread sortProgress = new Thread() { - public void run() { - while (true) { - try { - reportProgress(umbilical); - Thread.sleep(PROGRESS_INTERVAL); - } catch (InterruptedException e) { - return; - } catch (Throwable e) { - LOG.info("Thread Exception in " + - "reporting sort progress\n" + - StringUtils.stringifyException(e)); - continue; - } - } - } - }; - sortProgress.setName("Sort progress reporter for task "+getTaskId()); - sortProgress.setDaemon(true); - sortProgress.start(); - try { + { Path [] filename = new Path[numSpills]; Path [] indexFileName = new Path[numSpills]; FSDataInputStream in[] = new FSDataInputStream[numSpills]; @@ -514,8 +516,6 @@ in[i].close(); localFs.delete(filename[i]); indexIn[i].close(); localFs.delete(indexFileName[i]); } - } finally { - sortProgress.interrupt(); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=492677&r1=492676&r2=492677 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Thu Jan 4 10:49:27 2007 @@ -142,7 +142,7 @@ /** The number of milliseconds between progress reports. */ public static final int PROGRESS_INTERVAL = 1000; - private transient Progress taskProgress = new Progress(); + private volatile Progress taskProgress = new Progress(); private transient long nextProgressTime = System.currentTimeMillis() + PROGRESS_INTERVAL; @@ -165,9 +165,13 @@ }; } + public void setProgress(float progress) { + taskProgress.set(progress); + } + public void reportProgress(TaskUmbilicalProtocol umbilical, float progress) throws IOException { - taskProgress.set(progress); + setProgress(progress); reportProgress(umbilical); }