Author: tomwhite Date: Tue Jul 10 12:53:22 2007 New Revision: 555060 URL: http://svn.apache.org/viewvc?view=rev&rev=555060 Log: HADOOP-1556. Make LocalJobRunner delete working files at end of job run. Contributed by Devaraj Das.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=555060&r1=555059&r2=555060 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Jul 10 12:53:22 2007 @@ -277,6 +277,9 @@ 85. HADOOP-1546. Remove spurious column from HDFS web UI. (Dhruba Borthakur via cutting) + 86. HADOOP-1556. Make LocalJobRunner delete working files at end of + job run. (Devaraj Das via tomwhite) + Release 0.13.0 - 2007-06-08 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=555060&r1=555059&r2=555060 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Jul 10 12:53:22 2007 @@ -134,38 +134,46 @@ map_tasks -= 1; updateCounters(map); } - if (numReduceTasks > 0) { - // move map output to reduce input - String reduceId = "reduce_" + newId(); + String reduceId = "reduce_" + newId(); + try { + if (numReduceTasks > 0) { + // move map output to reduce input + for (int i = 0; i < mapIds.size(); i++) { + String mapId = mapIds.get(i); + Path mapOut = this.mapoutputFile.getOutputFile(mapId); + Path reduceIn = this.mapoutputFile.getInputFileForWrite(i,reduceId, + localFs.getLength(mapOut)); + if (!localFs.mkdirs(reduceIn.getParent())) { + throw new IOException("Mkdirs failed to create " + + reduceIn.getParent().toString()); + } + if (!localFs.rename(mapOut, reduceIn)) + throw new IOException("Couldn't rename " + mapOut); + } + + { + ReduceTask reduce = new ReduceTask(jobId, file, "tip_r_0001", + reduceId, 0, mapIds.size()); + JobConf localConf = new JobConf(job); + reduce.localizeConfiguration(localConf); + reduce.setConf(localConf); + reduce_tasks += 1; + myMetrics.launchReduce(); + reduce.run(localConf, this); + reduce.saveTaskOutput(); + myMetrics.completeReduce(); + reduce_tasks -= 1; + updateCounters(reduce); + } + } + } finally { for (int i = 0; i < mapIds.size(); i++) { String mapId = mapIds.get(i); - Path mapOut = this.mapoutputFile.getOutputFile(mapId); - Path reduceIn = this.mapoutputFile.getInputFileForWrite(i,reduceId, - localFs.getLength(mapOut)); - if (!localFs.mkdirs(reduceIn.getParent())) { - throw new IOException("Mkdirs failed to create " - + reduceIn.getParent().toString()); - } - if (!localFs.rename(mapOut, reduceIn)) - throw new IOException("Couldn't rename " + mapOut); this.mapoutputFile.removeAll(mapId); } - - { - ReduceTask reduce = new ReduceTask(jobId, file, "tip_r_0001", - reduceId, 0, mapIds.size()); - JobConf localConf = new JobConf(job); - reduce.localizeConfiguration(localConf); - reduce.setConf(localConf); - reduce_tasks += 1; - myMetrics.launchReduce(); - reduce.run(localConf, this); - reduce.saveTaskOutput(); - myMetrics.completeReduce(); - reduce_tasks -= 1; - updateCounters(reduce); + if (numReduceTasks == 1) { + this.mapoutputFile.removeAll(reduceId); } - this.mapoutputFile.removeAll(reduceId); } this.status.setRunState(JobStatus.SUCCEEDED);