Author: cutting Date: Wed Jun 20 14:00:25 2007 New Revision: 549233 URL: http://svn.apache.org/viewvc?view=rev&rev=549233 Log: HADOOP-1462. Improve task progress reporting. Contributed by Vivek Ratan.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Jun 20 14:00:25 2007 @@ -204,6 +204,11 @@ to only terminate Hadoop instances, and not other instances started by the same user. (tomwhite via cutting) + 63. HADOOP-1462. Improve task progress reporting. Progress reports + are no longer blocking since i/o is performed in a separate + thread. Reporting during sorting and more is also more + consistent. (Vivek Ratan via cutting) + Release 0.13.0 - 2007-06-08 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Wed Jun 20 14:00:25 2007 @@ -1908,6 +1908,8 @@ private Class valClass; private Configuration conf; + + private Progressable progressable = null; /** Sort and merge files containing the named classes. */ public Sorter(FileSystem fs, Class keyClass, Class valClass, Configuration conf) { @@ -1938,6 +1940,11 @@ /** Get the total amount of buffer memory, in bytes.*/ public int getMemory() { return memory; } + /** Set the progressable object in order to report progress. */ + public void setProgressable(Progressable progressable) { + this.progressable = progressable; + } + /** * Perform a file sort from a set of input files into an output file. * @param inFiles the files to be sorted @@ -2000,6 +2007,7 @@ private int sortPass(boolean deleteInput) throws IOException { LOG.debug("running sort pass"); SortPass sortPass = new SortPass(); // make the SortPass + sortPass.setProgressable(progressable); mergeSort = new MergeSort(sortPass.new SeqFileComparator()); try { return sortPass.run(deleteInput); // run it @@ -2028,6 +2036,8 @@ private FSDataOutputStream indexOut = null; private Path outName; + private Progressable progressable = null; + public int run(boolean deleteInput) throws IOException { int segments = 0; int currentFile = 0; @@ -2098,6 +2108,10 @@ LOG.debug("flushing segment " + segments); rawBuffer = rawKeys.getData(); sort(count); + // indicate we're making progress + if (progressable != null) { + progressable.progress(); + } flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec, segments==0 && atEof); segments++; @@ -2186,6 +2200,13 @@ keyOffsets[J.get()], keyLengths[J.get()]); } } + + /** set the progressable object in order to report progress */ + public void setProgressable(Progressable progressable) + { + this.progressable = progressable; + } + } // SequenceFile.Sorter.SortPass /** The interface to iterate over raw keys/values of SequenceFiles. */ @@ -2225,7 +2246,8 @@ public RawKeyValueIterator merge(List <SegmentDescriptor> segments, Path tmpDir) throws IOException { - MergeQueue mQueue = new MergeQueue(segments, tmpDir); + // pass in object to report progress, if present + MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable); return mQueue.merge(); } @@ -2270,7 +2292,7 @@ a.add(s); } this.factor = factor; - MergeQueue mQueue = new MergeQueue(a, tmpDir); + MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable); return mQueue.merge(); } @@ -2299,7 +2321,8 @@ a.add(s); } factor = (inNames.length < factor) ? inNames.length : factor; - MergeQueue mQueue = new MergeQueue(a, tempDir); + // pass in object to report progress, if present + MergeQueue mQueue = new MergeQueue(a, tempDir, progressable); return mQueue.merge(); } @@ -2391,7 +2414,7 @@ //the contained segments during the merge process & hence don't need //them anymore SegmentContainer container = new SegmentContainer(inName, indexIn); - MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir); + MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable); return mQueue.merge(); } @@ -2406,6 +2429,7 @@ private float progPerByte; private Progress mergeProgress = new Progress(); private Path tmpDir; + private Progressable progress = null; //handle to the progress reporting object //a TreeMap used to store the segments sorted by size (segment offset and //segment path name is used to break ties between segments of same sizes) @@ -2427,16 +2451,22 @@ * A queue of file segments to merge * @param segments the file segments to merge * @param tmpDir a relative local directory to save intermediate files in + * @param progress the reference to the Progressable object */ public MergeQueue(List <SegmentDescriptor> segments, - Path tmpDir) { + Path tmpDir, Progressable progress) { int size = segments.size(); for (int i = 0; i < size; i++) { sortedSegmentSizes.put(segments.get(i), null); } this.tmpDir = tmpDir; + this.progress = progress; } protected boolean lessThan(Object a, Object b) { + // indicate we're making progress + if (progress != null) { + progress.progress(); + } SegmentDescriptor msa = (SegmentDescriptor)a; SegmentDescriptor msb = (SegmentDescriptor)b; return comparator.compare(msa.getKey().getData(), 0, Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java Wed Jun 20 14:00:25 2007 @@ -26,7 +26,7 @@ import org.apache.hadoop.util.Progress; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator; - +import org.apache.hadoop.util.Progressable; /** This class implements the sort interface using primitive int arrays as * the data structures (that is why this class is called 'BasicType'SorterBase) @@ -51,6 +51,8 @@ //4 for indices into startOffsets array in the //pointers array (ignored the partpointers list itself) static private final int BUFFERED_KEY_VAL_OVERHEAD = 16; + //Reference to the Progressable object for sending KeepAlive + private Progressable reporter; //Implementation of methods of the SorterBase interface // @@ -62,6 +64,10 @@ comparator = conf.getOutputKeyComparator(); } + public void setProgressable(Progressable reporter) { + this.reporter = reporter; + } + public void addKeyValue(int recordOffset, int keyLength, int valLength) { //Add the start offset of the key in the startOffsets array and the //length in the keyLengths array. @@ -92,6 +98,8 @@ //A compare method that references the keyValBuffer through the indirect //pointers protected int compare(int i, int j) { + // indicate we're making progress + reporter.progress(); return comparator.compare(keyValBuffer.getData(), startOffsets[i], keyLengths[i], keyValBuffer.getData(), startOffsets[j], Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java Wed Jun 20 14:00:25 2007 @@ -19,6 +19,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator; +import org.apache.hadoop.util.Progressable; /** This class provides a generic sort interface that should be implemented * by specific sort algorithms. The use case is the following: @@ -37,6 +38,11 @@ */ interface BufferSorter extends JobConfigurable { + /** Pass the Progressable object so that sort can call progress while it is sorting + * @param reporter the Progressable object reference + */ + public void setProgressable(Progressable reporter); + /** When a key/value is added at a particular offset in the key/value buffer, * this method is invoked by the user class so that the impl of this sort * interface can update its datastructures. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Wed Jun 20 14:00:25 2007 @@ -59,7 +59,7 @@ return true; } - public void progress(String taskid, float progress, String state, + public boolean progress(String taskid, float progress, String state, TaskStatus.Phase phase, Counters counters) throws IOException { @@ -74,6 +74,7 @@ LOG.info(buf.toString()); // ignore phase // ignore counters + return true; } public void reportDiagnosticInfo(String taskid, String trace) throws IOException { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Wed Jun 20 14:00:25 2007 @@ -195,7 +195,7 @@ public Task getTask(String taskid) { return null; } - public void progress(String taskId, float progress, String state, + public boolean progress(String taskId, float progress, String state, TaskStatus.Phase phase, Counters taskCounters) { LOG.info(state); float taskIndex = mapIds.indexOf(taskId); @@ -208,6 +208,8 @@ currentCounters = Counters.sum(completedTaskCounters, taskCounters); // ignore phase + + return true; } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Wed Jun 20 14:00:25 2007 @@ -191,7 +191,8 @@ Path localFilename, LocalDirAllocator lDirAlloc, Configuration conf, int reduce, - int timeout) throws IOException, InterruptedException { + int timeout, Progressable progressable) + throws IOException, InterruptedException { boolean good = false; long totalBytes = 0; FileSystem fileSys = localFileSys; @@ -245,6 +246,8 @@ if (currentThread.isInterrupted()) { throw new InterruptedException(); } + // indicate we're making progress + progressable.progress(); len = input.read(buffer); } } finally { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Jun 20 14:00:25 2007 @@ -117,6 +117,9 @@ final Reporter reporter = getReporter(umbilical); + // start thread that will handle communication with parent + startCommunicationThread(umbilical); + int numReduceTasks = conf.getNumReduceTasks(); LOG.info("numReduceTasks: " + numReduceTasks); MapOutputCollector collector = null; @@ -164,7 +167,6 @@ throws IOException { setProgress(getProgress()); - reportProgress(umbilical); long beforePos = getPos(); boolean ret = rawIn.next(key, value); if (ret) { @@ -194,32 +196,6 @@ done(umbilical); } - private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) { - //spawn a thread to give merge progress heartbeats - Thread sortProgress = new Thread() { - public void run() { - LOG.debug("Started thread: " + getName()); - while (true) { - try { - reportProgress(umbilical); - Thread.sleep(PROGRESS_INTERVAL); - } catch (InterruptedException e) { - return; - } catch (Throwable e) { - LOG.info("Thread Exception in " + - "reporting sort progress\n" + - StringUtils.stringifyException(e)); - continue; - } - } - } - }; - sortProgress.setName("Sort progress reporter for task "+getTaskId()); - sortProgress.setDaemon(true); - sortProgress.start(); - return sortProgress; - } - interface MapOutputCollector extends OutputCollector { public void close() throws IOException; @@ -376,19 +352,10 @@ for (int i = 0; i < partitions; i++) totalMem += sortImpl[i].getMemoryUtilized(); if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) { - - // Start the progress thread - Thread progress = createProgressThread(umbilical); - - try { - sortAndSpillToDisk(); - keyValBuffer.reset(); - for (int i = 0; i < partitions; i++) { - sortImpl[i].close(); - } - } finally { - // Stop the progress thread - progress.interrupt(); + sortAndSpillToDisk(); + keyValBuffer.reset(); + for (int i = 0; i < partitions; i++) { + sortImpl[i].close(); } } } @@ -414,6 +381,7 @@ //invoke the sort for (int i = 0; i < partitions; i++) { sortImpl[i].setInputBuffer(keyValBuffer); + sortImpl[i].setProgressable(reporter); RawKeyValueIterator rIter = sortImpl[i].sort(); startPartition(i); @@ -459,6 +427,8 @@ combiner.reduce(values.getKey(), values, combineCollector, reporter); values.nextKey(); reporter.incrCounter(COMBINE_OUTPUT_RECORDS, 1); + // indicate we're making progress + reporter.progress(); } } @@ -467,6 +437,9 @@ Writable value = null; try { + // indicate progress, since constructor may take a while (because of + // user code) + reporter.progress(); key = (WritableComparable)ReflectionUtils.newInstance(keyClass, job); value = (Writable)ReflectionUtils.newInstance(valClass, job); } catch (Exception e) { @@ -484,8 +457,8 @@ (resultIter.getValue()).writeUncompressedBytes(valOut); valIn.reset(valOut.getData(), valOut.getLength()); value.readFields(valIn); - writer.append(key, value); + reporter.progress(); } } @@ -546,6 +519,7 @@ //create a sorter object as we need access to the SegmentDescriptor //class and merge methods Sorter sorter = new Sorter(localFs, keyClass, valClass, job); + sorter.setProgressable(reporter); for (int parts = 0; parts < partitions; parts++){ List<SegmentDescriptor> segmentList = @@ -608,23 +582,15 @@ } } - public void flush() throws IOException { - - // Start the progress thread - Thread progress = createProgressThread(umbilical); - - try { - //check whether the length of the key/value buffer is 0. If not, then - //we need to spill that to disk. Note that we reset the key/val buffer - //upon each spill (so a length > 0 means that we have not spilled yet) - if (keyValBuffer.getLength() > 0) { - sortAndSpillToDisk(); - } - mergeParts(); - } finally { - // Stop the progress thread - progress.interrupt(); + public void flush() throws IOException + { + //check whether the length of the key/value buffer is 0. If not, then + //we need to spill that to disk. Note that we reset the key/val buffer + //upon each spill (so a length > 0 means that we have not spilled yet) + if (keyValBuffer.getLength() > 0) { + sortAndSpillToDisk(); } + mergeParts(); } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Jun 20 14:00:25 2007 @@ -78,7 +78,6 @@ private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName()); private int numMaps; - AtomicBoolean sortComplete = new AtomicBoolean(false); private ReduceCopier reduceCopier; { @@ -169,12 +168,7 @@ } catch (IOException e) { throw new RuntimeException(e); } - // ignore the error, since failures in progress shouldn't kill us - try { - reporter.progress(); - } catch (IOException ie) { - LOG.debug("caught exception from progress", ie); - } + reporter.progress(); return result; // return saved value } @@ -232,11 +226,7 @@ } public void informReduceProgress() { reducePhase.set(super.in.getProgress().get()); // update progress - try { - reporter.progress(); - } catch (IOException ie) { - LOG.debug("Exception caught from progress", ie); - } + reporter.progress(); } public Object next() { reporter.incrCounter(REDUCE_INPUT_RECORDS, 1); @@ -249,8 +239,11 @@ Class valueClass = job.getMapOutputValueClass(); Reducer reducer = (Reducer)ReflectionUtils.newInstance( job.getReducerClass(), job); - FileSystem lfs = FileSystem.getLocal(job); + // start thread that will handle communication with parent + startCommunicationThread(umbilical); + + FileSystem lfs = FileSystem.getLocal(job); if (!job.get("mapred.job.tracker", "local").equals("local")) { reduceCopier = new ReduceCopier(umbilical, job); if (!reduceCopier.fetchOutputs()) { @@ -280,52 +273,26 @@ Path[] mapFiles = new Path[mapFilesList.size()]; mapFiles = mapFilesList.toArray(mapFiles); - // spawn a thread to give sort progress heartbeats - Thread sortProgress = new Thread() { - public void run() { - while (!sortComplete.get()) { - try { - reportProgress(umbilical); - Thread.sleep(PROGRESS_INTERVAL); - } catch (InterruptedException e) { - return; - } catch (Throwable e) { - System.out.println("Thread Exception in " + - "reporting sort progress\n" + - StringUtils.stringifyException(e)); - continue; - } - } - } - }; - sortProgress.setDaemon(true); - sortProgress.setName("Sort progress reporter for task "+getTaskId()); - Path tempDir = new Path(getTaskId()); WritableComparator comparator = job.getOutputValueGroupingComparator(); SequenceFile.Sorter.RawKeyValueIterator rIter; - try { - setPhase(TaskStatus.Phase.SORT); - sortProgress.start(); - - // sort the input file - SequenceFile.Sorter sorter = - new SequenceFile.Sorter(lfs, comparator, valueClass, job); - rIter = sorter.merge(mapFiles, tempDir, - !conf.getKeepFailedTaskFiles()); // sort + setPhase(TaskStatus.Phase.SORT); - } finally { - sortComplete.set(true); - } + final Reporter reporter = getReporter(umbilical); + + // sort the input file + SequenceFile.Sorter sorter = + new SequenceFile.Sorter(lfs, comparator, valueClass, job); + sorter.setProgressable(reporter); + rIter = sorter.merge(mapFiles, tempDir, + !conf.getKeepFailedTaskFiles()); // sort sortPhase.complete(); // sort is complete setPhase(TaskStatus.Phase.REDUCE); - final Reporter reporter = getReporter(umbilical); - // make output collector String finalName = getOutputName(getPartition()); FileSystem fs = FileSystem.get(job); @@ -338,7 +305,8 @@ throws IOException { out.write(key, value); reporter.incrCounter(REDUCE_OUTPUT_RECORDS, 1); - reportProgress(umbilical); + // indicate that progress update needs to be sent + reporter.progress(); } }; @@ -532,31 +500,6 @@ "map_".length(), endIndex)); } - private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) { - //spawn a thread to give copy progress heartbeats - Thread copyProgress = new Thread() { - public void run() { - LOG.debug("Started thread: " + getName()); - while (true) { - try { - reportProgress(umbilical); - Thread.sleep(PROGRESS_INTERVAL); - } catch (InterruptedException e) { - return; - } catch (Throwable e) { - LOG.info("Thread Exception in " + - "reporting copy progress\n" + - StringUtils.stringifyException(e)); - continue; - } - } - } - }; - copyProgress.setName("Copy progress reporter for task "+getTaskId()); - copyProgress.setDaemon(true); - return copyProgress; - } - private int nextMapOutputCopierId = 0; /** Copies map outputs as they become available */ @@ -564,10 +507,12 @@ private MapOutputLocation currentLocation = null; private int id = nextMapOutputCopierId++; + private Reporter reporter; - public MapOutputCopier() { + public MapOutputCopier(Reporter reporter) { setName("MapOutputCopier " + reduceTask.getTaskId() + "." + id); LOG.debug(getName() + " created"); + this.reporter = reporter; } /** @@ -665,7 +610,7 @@ tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics, tmpFilename, lDirAlloc, conf, reduceTask.getPartition(), - STALLED_COPY_TIMEOUT); + STALLED_COPY_TIMEOUT, reporter); if (!neededOutputs.contains(loc.getMapId())) { if (tmpFilename != null) { FileSystem fs = tmpFilename.getFileSystem(conf); @@ -788,6 +733,7 @@ sorter = new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(), conf.getMapOutputValueClass(), conf); + sorter.setProgressable(getReporter(umbilical)); // hosts -> next contact time this.penaltyBox = new Hashtable<String, Long>(); @@ -832,9 +778,10 @@ copiers = new MapOutputCopier[numCopiers]; + Reporter reporter = getReporter(umbilical); // start all the copying threads for (int i=0; i < copiers.length; i++) { - copiers[i] = new MapOutputCopier(); + copiers[i] = new MapOutputCopier(reporter); copiers[i].start(); } @@ -843,8 +790,6 @@ long currentTime = startTime; IntWritable fromEventId = new IntWritable(0); - Thread copyProgress = createProgressThread(umbilical); - copyProgress.start(); try { // loop until we get all required outputs while (!neededOutputs.isEmpty() && mergeThrowable == null) { @@ -1077,7 +1022,6 @@ return mergeThrowable == null && neededOutputs.isEmpty(); } finally { inMemFileSys.close(); - copyProgress.interrupt(); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Wed Jun 20 14:00:25 2007 @@ -31,7 +31,7 @@ public static final Reporter NULL = new Reporter() { public void setStatus(String s) { } - public void progress() throws IOException { + public void progress() { } public void incrCounter(Enum key, long amount) { } @@ -46,7 +46,7 @@ * @param status * a brief description of the current status */ - public abstract void setStatus(String status) throws IOException; + public abstract void setStatus(String status); /** * Increments the counter identified by the key, which can be of Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Jun 20 14:00:25 2007 @@ -22,6 +22,7 @@ import java.io.DataOutput; import java.io.IOException; import java.net.URI; +import java.util.concurrent.atomic.AtomicBoolean; import java.text.NumberFormat; import org.apache.commons.logging.Log; @@ -36,6 +37,7 @@ import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Progress; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; /** Base class for tasks. */ @@ -129,16 +131,17 @@ } /** * Return current phase of the task. + * needs to be synchronized as communication thread sends the phase every second * @return */ - public TaskStatus.Phase getPhase(){ + public synchronized TaskStatus.Phase getPhase(){ return this.phase; } /** * Set current phase of the task. * @param p */ - protected void setPhase(TaskStatus.Phase p){ + protected synchronized void setPhase(TaskStatus.Phase p){ this.phase = p; } @@ -217,12 +220,24 @@ public static final int PROGRESS_INTERVAL = 1000; private transient Progress taskProgress = new Progress(); - private transient long nextProgressTime = - System.currentTimeMillis() + PROGRESS_INTERVAL; // Current counters private transient Counters counters = new Counters(); + /** + * flag that indicates whether progress update needs to be sent to parent. + * If true, it has been set. If false, it has been reset. + * Using AtomicBoolean since we need an atomic read & reset method. + */ + private AtomicBoolean progressFlag = new AtomicBoolean(false); + // getters and setters for flag + private void setProgressFlag() { + progressFlag.set(true); + } + private boolean resetProgressFlag() { + return progressFlag.getAndSet(false); + } + public abstract boolean isMapTask(); public Progress getProgress() { return taskProgress; } @@ -231,18 +246,76 @@ throw new UnsupportedOperationException("Input only available on map"); } + /** + * The communication thread handles communication with the parent (Task Tracker). + * It sends progress updates if progress has been made or if the task needs to + * let the parent know that it's alive. It also pings the parent to see if it's alive. + */ + protected void startCommunicationThread(final TaskUmbilicalProtocol umbilical) { + Thread thread = new Thread(new Runnable() { + public void run() { + final int MAX_RETRIES = 3; + int remainingRetries = MAX_RETRIES; + while (true) { + try { + // get current flag value and reset it as well + boolean sendProgress = resetProgressFlag(); + boolean taskFound = true; // whether TT knows about this task + + if (sendProgress) { + // we need to send progress update + taskFound = umbilical.progress(taskId, taskProgress.get(), + taskProgress.toString(), getPhase(), counters); + } + else { + // send ping + taskFound = umbilical.ping(taskId); + } + + // if Task Tracker is not aware of our task ID (probably because it died and + // came back up), kill ourselves + if (!taskFound) { + LOG.warn("Parent died. Exiting "+taskId); + System.exit(66); + } + + remainingRetries = MAX_RETRIES; + // sleep for a bit + try { + Thread.sleep(PROGRESS_INTERVAL); + } + catch (InterruptedException e) { + } + } + catch (Throwable t) { + LOG.info("Communication exception: " + StringUtils.stringifyException(t)); + remainingRetries -=1; + if (remainingRetries == 0) { + ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0); + LOG.warn("Last retry, killing "+taskId); + System.exit(65); + } + } + } + } + }, "Comm thread for "+taskId); + thread.setDaemon(true); + thread.start(); + } + + protected Reporter getReporter(final TaskUmbilicalProtocol umbilical) throws IOException { return new Reporter() { - public void setStatus(String status) throws IOException { - synchronized (this) { - taskProgress.setStatus(status); - progress(); - } + public void setStatus(String status) { + taskProgress.setStatus(status); + // indicate that progress update needs to be sent + setProgressFlag(); } - public void progress() throws IOException { - reportProgress(umbilical); + public void progress() { + // indicate that progress update needs to be sent + setProgressFlag(); } public void incrCounter(Enum key, long amount) { Counters counters = getCounters(); @@ -258,24 +331,8 @@ public void setProgress(float progress) { taskProgress.set(progress); - } - - public void reportProgress(TaskUmbilicalProtocol umbilical) { - long now = System.currentTimeMillis(); - synchronized (this) { - if (now > nextProgressTime) { - nextProgressTime = now + PROGRESS_INTERVAL; - float progress = taskProgress.get(); - String status = taskProgress.toString(); - try { - umbilical.progress(getTaskId(), progress, status, phase, counters); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); // interrupt ourself - } catch (IOException ie) { - LOG.warn(StringUtils.stringifyException(ie)); - } - } - } + // indicate that progress update needs to be sent + setProgressFlag(); } public void done(TaskUmbilicalProtocol umbilical) throws IOException { @@ -286,14 +343,17 @@ if (needProgress) { // send a final status report try { - umbilical.progress(getTaskId(), taskProgress.get(), - taskProgress.toString(), phase, counters); + if (!umbilical.progress(taskId, taskProgress.get(), + taskProgress.toString(), getPhase(), counters)) { + LOG.warn("Parent died. Exiting "+taskId); + System.exit(66); + } needProgress = false; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // interrupt ourself } } - umbilical.done(getTaskId()); + umbilical.done(taskId); return; } catch (IOException ie) { LOG.warn("Failure signalling completion: " + Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Jun 20 14:00:25 2007 @@ -1530,7 +1530,7 @@ /** * Called periodically to report Task progress, from 0.0 to 1.0. */ - public synchronized void progress(String taskid, float progress, + public synchronized boolean progress(String taskid, float progress, String state, TaskStatus.Phase phase, Counters counters @@ -1538,8 +1538,10 @@ TaskInProgress tip = (TaskInProgress) tasks.get(taskid); if (tip != null) { tip.reportProgress(progress, state, phase, counters); + return true; } else { - LOG.warn("Progress from unknown child task: "+taskid+". Ignored."); + LOG.warn("Progress from unknown child task: "+taskid); + return false; } } @@ -1697,8 +1699,6 @@ defaultConf.addFinalResource(new Path(task.getJobFile())); - startPinging(umbilical, taskid); // start pinging parent - try { // use job-specified working directory FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory()); @@ -1720,41 +1720,6 @@ // there is no more logging done. LogManager.shutdown(); } - } - - /** Periodically ping parent and exit when this fails.*/ - private static void startPinging(final TaskUmbilicalProtocol umbilical, - final String taskid) { - Thread thread = new Thread(new Runnable() { - public void run() { - final int MAX_RETRIES = 3; - int remainingRetries = MAX_RETRIES; - while (true) { - try { - if (!umbilical.ping(taskid)) { - LOG.warn("Parent died. Exiting "+taskid); - System.exit(66); - } - remainingRetries = MAX_RETRIES; - } catch (Throwable t) { - String msg = StringUtils.stringifyException(t); - LOG.info("Ping exception: " + msg); - remainingRetries -=1; - if (remainingRetries == 0) { - ReflectionUtils.logThreadInfo(LOG, "ping exception", 0); - LOG.warn("Last retry, killing "+taskid); - System.exit(65); - } - } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } - } - } - }, "Pinger for "+taskid); - thread.setDaemon(true); - thread.start(); } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Wed Jun 20 14:00:25 2007 @@ -29,8 +29,10 @@ * and parent is via this protocol. */ interface TaskUmbilicalProtocol extends VersionedProtocol { - /** Changed the version to 2, since we have a new method getMapOutputs */ - public static final long versionID = 2L; + /** Changed the version to 2, since we have a new method getMapOutputs + * Changed version to 3 to have progress() return a boolean + * */ + public static final long versionID = 3L; /** Called when a child task process starts, to get its task.*/ Task getTask(String taskid) throws IOException; @@ -41,8 +43,9 @@ * @param state description of task's current state * @param phase current phase of the task. * @param counters the counters for this task. + * @return True if the task is known */ - void progress(String taskid, float progress, String state, + boolean progress(String taskid, float progress, String state, TaskStatus.Phase phase, Counters counters) throws IOException, InterruptedException; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/OutputHandler.java Wed Jun 20 14:00:25 2007 @@ -65,7 +65,7 @@ /** * Update the status message for the task. */ - public void status(String msg) throws IOException { + public void status(String msg) { reporter.setStatus(msg); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progress.java?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progress.java Wed Jun 20 14:00:25 2007 @@ -44,7 +44,7 @@ } /** Adds a node to the tree. */ - public Progress addPhase() { + public synchronized Progress addPhase() { Progress phase = new Progress(); phases.add(phase); phase.parent = this; @@ -54,17 +54,17 @@ /** Called during execution to move to the next phase at this level in the * tree. */ - public void startNextPhase() { + public synchronized void startNextPhase() { currentPhase++; } /** Returns the current sub-node executing. */ - public Progress phase() { + public synchronized Progress phase() { return phases.get(currentPhase); } /** Completes this node, moving the parent node to its next child. */ - public void complete() { + public synchronized void complete() { progress = 1.0f; if (parent != null) { parent.startNextPhase(); @@ -72,12 +72,14 @@ } /** Called during execution on a leaf node to set its progress. */ - public void set(float progress) { + public synchronized void set(float progress) { this.progress = progress; } /** Returns the overall progress of the root. */ - public float get() { + // this method probably does not need to be synchronized as getINternal() is synchronized + // and the node's parent never changes. Still, it doesn't hurt. + public synchronized float get() { Progress node = this; while (node.parent != null) { // find the root node = parent; @@ -86,7 +88,7 @@ } /** Computes progress in this node. */ - private float getInternal() { + private synchronized float getInternal() { int phaseCount = phases.size(); if (phaseCount != 0) { float subProgress = @@ -97,7 +99,7 @@ } } - public void setStatus(String status) { + public synchronized void setStatus(String status) { this.status = status; } @@ -107,7 +109,7 @@ return result.toString(); } - private void toString(StringBuffer buffer) { + private synchronized void toString(StringBuffer buffer) { buffer.append(status); if (phases.size() != 0 && currentPhase < phases.size()) { buffer.append(" > "); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java?view=diff&rev=549233&r1=549232&r2=549233 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/Progressable.java Wed Jun 20 14:00:25 2007 @@ -12,5 +12,5 @@ /** callback for reporting progress. Used by DFSclient to report * progress while writing a block of DFS file. */ - public void progress() throws IOException; + public void progress(); }