Author: cutting Date: Fri Dec 15 15:31:39 2006 New Revision: 487715 URL: http://svn.apache.org/viewvc?view=rev&rev=487715 Log: HADOOP-813. Fix map output sorting to report progress. Contributed by Devaraj.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=487715&r1=487714&r2=487715 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Dec 15 15:31:39 2006 @@ -101,6 +101,10 @@ 28. HADOOP-824. Rename DFSShell to be FsShell, since it applies generically to all FileSystem implementations. (cutting) +29. HADOOP-813. Fix map output sorting to report progress, so that + sorts which take longer than the task timeout do not fail. + (Devaraj Das via cutting) + Release 0.9.2 - 2006-12-15 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=487715&r1=487714&r2=487715 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Dec 15 15:31:39 2006 @@ -35,6 +35,7 @@ import org.apache.commons.logging.*; import org.apache.hadoop.metrics.Metrics; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.fs.*; import org.apache.hadoop.mapred.ReduceTask.ValuesIterator; @@ -341,6 +342,7 @@ throws IOException { synchronized (this) { writer.append(key, value); + reportProgress(umbilical); } } }; @@ -366,6 +368,7 @@ while (values.more()) { combiner.reduce(values.getKey(), values, combineCollector, reporter); values.nextKey(); + reportProgress(umbilical); } } @@ -393,6 +396,7 @@ value.readFields(valIn); writer.append(key, value); + reportProgress(umbilical); } } @@ -425,62 +429,87 @@ compressionType, codec); finalIndexOut.writeLong(segmentStart); finalIndexOut.writeLong(finalOut.getPos() - segmentStart); + reportProgress(umbilical); } finalOut.close(); finalIndexOut.close(); return; } - - Path [] filename = new Path[numSpills]; - Path [] indexFileName = new Path[numSpills]; - FSDataInputStream in[] = new FSDataInputStream[numSpills]; - FSDataInputStream indexIn[] = new FSDataInputStream[numSpills]; - - for(int i = 0; i < numSpills; i++) { - filename[i] = mapOutputFile.getSpillFile(getTaskId(), i); - in[i] = localFs.open(filename[i]); - indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i); - indexIn[i] = localFs.open(indexFileName[i]); - } - - //create a sorter object as we need access to the SegmentDescriptor - //class and merge methods - Sorter sorter = new Sorter(localFs, keyClass, valClass, job); - sorter.setFactor(numSpills); - - for (int parts = 0; parts < partitions; parts++){ - List<SegmentDescriptor> segmentList = new ArrayList(numSpills); + //spawn a thread to give merge progress heartbeats + Thread sortProgress = new Thread() { + public void run() { + while (true) { + try { + reportProgress(umbilical); + Thread.sleep(PROGRESS_INTERVAL); + } catch (InterruptedException e) { + return; + } catch (Throwable e) { + LOG.info("Thread Exception in " + + "reporting sort progress\n" + + StringUtils.stringifyException(e)); + continue; + } + } + } + }; + sortProgress.setName("Sort progress reporter for task "+getTaskId()); + sortProgress.setDaemon(true); + sortProgress.start(); + try { + Path [] filename = new Path[numSpills]; + Path [] indexFileName = new Path[numSpills]; + FSDataInputStream in[] = new FSDataInputStream[numSpills]; + FSDataInputStream indexIn[] = new FSDataInputStream[numSpills]; + for(int i = 0; i < numSpills; i++) { - long segmentOffset = indexIn[i].readLong(); - long segmentLength = indexIn[i].readLong(); - SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset, - segmentLength, filename[i]); - s.preserveInput(true); - s.doSync(); - segmentList.add(i, s); + filename[i] = mapOutputFile.getSpillFile(getTaskId(), i); + in[i] = localFs.open(filename[i]); + indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i); + indexIn[i] = localFs.open(indexFileName[i]); } - segmentStart = finalOut.getPos(); - SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, - job.getMapOutputKeyClass(), job.getMapOutputValueClass(), - compressionType, codec); - sorter.writeFile(sorter.merge(segmentList), writer); - //add a sync block - required esp. for block compression to ensure - //partition data don't span partition boundaries - writer.sync(); - //when we write the offset/length to the final index file, we write - //longs for both. This helps us to reliably seek directly to the - //offset/length for a partition when we start serving the byte-ranges - //to the reduces. We probably waste some space in the file by doing - //this as opposed to writing VLong but it helps us later on. - finalIndexOut.writeLong(segmentStart); - finalIndexOut.writeLong(finalOut.getPos()-segmentStart); - } - finalOut.close(); - finalIndexOut.close(); - //cleanup - for(int i = 0; i < numSpills; i++) { - in[i].close(); localFs.delete(filename[i]); - indexIn[i].close(); localFs.delete(indexFileName[i]); + + //create a sorter object as we need access to the SegmentDescriptor + //class and merge methods + Sorter sorter = new Sorter(localFs, keyClass, valClass, job); + sorter.setFactor(numSpills); + + for (int parts = 0; parts < partitions; parts++){ + List<SegmentDescriptor> segmentList = new ArrayList(numSpills); + for(int i = 0; i < numSpills; i++) { + long segmentOffset = indexIn[i].readLong(); + long segmentLength = indexIn[i].readLong(); + SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset, + segmentLength, filename[i]); + s.preserveInput(true); + s.doSync(); + segmentList.add(i, s); + } + segmentStart = finalOut.getPos(); + SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, + job.getMapOutputKeyClass(), job.getMapOutputValueClass(), + compressionType, codec); + sorter.writeFile(sorter.merge(segmentList), writer); + //add a sync block - required esp. for block compression to ensure + //partition data don't span partition boundaries + writer.sync(); + //when we write the offset/length to the final index file, we write + //longs for both. This helps us to reliably seek directly to the + //offset/length for a partition when we start serving the byte-ranges + //to the reduces. We probably waste some space in the file by doing + //this as opposed to writing VLong but it helps us later on. + finalIndexOut.writeLong(segmentStart); + finalIndexOut.writeLong(finalOut.getPos()-segmentStart); + } + finalOut.close(); + finalIndexOut.close(); + //cleanup + for(int i = 0; i < numSpills; i++) { + in[i].close(); localFs.delete(filename[i]); + indexIn[i].close(); localFs.delete(indexFileName[i]); + } + } finally { + sortProgress.interrupt(); } }