[ http://issues.apache.org/jira/browse/HADOOP-20?page=all ] Doug Cutting closed HADOOP-20: ------------------------------
> Mapper, Reducer need an occasion to cleanup after the last record is > processed. > ------------------------------------------------------------------------------- > > Key: HADOOP-20 > URL: http://issues.apache.org/jira/browse/HADOOP-20 > Project: Hadoop > Type: Improvement > Components: mapred > Environment: Linux > Reporter: Michel Tourn > Fix For: 0.1.0 > Attachments: mapredfinished.log, mrclose.patch > > Mapper, Reducer need an occasion to do some cleanup after the last record is > processed. > Proposal (patch attached) > in interface Mapper: > add method void finished(); > in interface Reducer: > add method void finished(); > finished() methods are called from MapTask, CombiningCollector, ReduceTask. > ------------ > Known limitation: Fetcher (a multithreaded MapRunnable) does not call > finished(). > This is not currently a problem bec. fetcher Map/Reduce modules do not do > anything in finished(). > The right way to add finished() support to Fetcher would be to wait for all > threads to finish, > then do: > if (collector instanceof CombiningCollector) > ((CombiningCollector)collector).finished(); > ------------ > patch begins: (svn trunk) > Index: src/test/org/apache/nutch/mapred/MapredLoadTest.java > =================================================================== > --- src/test/org/apache/nutch/mapred/MapredLoadTest.java (revision > 374781) > +++ src/test/org/apache/nutch/mapred/MapredLoadTest.java (working copy) > @@ -69,6 +69,8 @@ > out.collect(new IntWritable(Math.abs(r.nextInt())), new > IntWritable(randomVal)); > } > } > + public void finished() { > + } > } > static class RandomGenReducer implements Reducer { > public void configure(JobConf job) { > @@ -81,6 +83,8 @@ > out.collect(new UTF8("" + val), new UTF8("")); > } > } > + public void finished() { > + } > } > static class RandomCheckMapper implements Mapper { > public void configure(JobConf job) { > @@ -92,6 +96,8 @@ > > out.collect(new > IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1)); > } > + public void finished() { > + } > } > static class RandomCheckReducer implements Reducer { > public void configure(JobConf job) { > @@ -106,6 +112,8 @@ > } > out.collect(new IntWritable(keyint), new IntWritable(count)); > } > + public void finished() { > + } > } > > int range; > Index: src/test/org/apache/nutch/fs/TestNutchFileSystem.java > =================================================================== > --- src/test/org/apache/nutch/fs/TestNutchFileSystem.java (revision > 374783) > +++ src/test/org/apache/nutch/fs/TestNutchFileSystem.java (working copy) > @@ -155,6 +155,8 @@ > > reporter.setStatus("wrote " + name); > } > + > + public void finished() {} > } > > public static void writeTest(NutchFileSystem fs, boolean fastCheck) > @@ -247,6 +249,9 @@ > > reporter.setStatus("read " + name); > } > + > + public void finished() {} > + > } > > public static void readTest(NutchFileSystem fs, boolean fastCheck) > @@ -339,6 +344,9 @@ > in.close(); > } > } > + > + public void finished() {} > + > } > > public static void seekTest(NutchFileSystem fs, boolean fastCheck) > Index: src/java/org/apache/nutch/indexer/DeleteDuplicates.java > =================================================================== > --- src/java/org/apache/nutch/indexer/DeleteDuplicates.java (revision > 374776) > +++ src/java/org/apache/nutch/indexer/DeleteDuplicates.java (working copy) > @@ -225,6 +225,7 @@ > } > } > } > + public void finished() {} > } > > private NutchFileSystem fs; > @@ -265,6 +266,8 @@ > reader.close(); > } > } > + > + public void finished() {} > > /** Write nothing. */ > public RecordWriter getRecordWriter(final NutchFileSystem fs, > Index: src/java/org/apache/nutch/indexer/Indexer.java > =================================================================== > --- src/java/org/apache/nutch/indexer/Indexer.java (revision 374778) > +++ src/java/org/apache/nutch/indexer/Indexer.java (working copy) > @@ -227,6 +227,8 @@ > > output.collect(key, new ObjectWritable(doc)); > } > + > + public void finished() {} > > public void index(File indexDir, File crawlDb, File linkDb, File[] > segments) > throws IOException { > Index: src/java/org/apache/nutch/segment/SegmentReader.java > =================================================================== > --- src/java/org/apache/nutch/segment/SegmentReader.java (revision > 374778) > +++ src/java/org/apache/nutch/segment/SegmentReader.java (working copy) > @@ -143,7 +143,9 @@ > } > output.collect(key, new ObjectWritable(dump.toString())); > } > - > + > + public void finished() {} > + > public void reader(File segment) throws IOException { > LOG.info("Reader: segment: " + segment); > > Index: src/java/org/apache/nutch/mapred/Mapper.java > =================================================================== > --- src/java/org/apache/nutch/mapred/Mapper.java (revision 374737) > +++ src/java/org/apache/nutch/mapred/Mapper.java (working copy) > @@ -39,4 +39,9 @@ > void map(WritableComparable key, Writable value, > OutputCollector output, Reporter reporter) > throws IOException; > + > + /** Called after the last [EMAIL PROTECTED] #map} call on this Mapper > object. > + Typical implementations do nothing. > + */ > + void finished(); > } > Index: src/java/org/apache/nutch/mapred/lib/RegexMapper.java > =================================================================== > --- src/java/org/apache/nutch/mapred/lib/RegexMapper.java (revision > 374737) > +++ src/java/org/apache/nutch/mapred/lib/RegexMapper.java (working copy) > @@ -53,4 +53,5 @@ > output.collect(new UTF8(matcher.group(group)), new LongWritable(1)); > } > } > + public void finished() {} > } > Index: src/java/org/apache/nutch/mapred/lib/InverseMapper.java > =================================================================== > --- src/java/org/apache/nutch/mapred/lib/InverseMapper.java (revision > 374737) > +++ src/java/org/apache/nutch/mapred/lib/InverseMapper.java (working copy) > @@ -38,4 +38,6 @@ > throws IOException { > output.collect((WritableComparable)value, key); > } > + > + public void finished() {} > } > Index: src/java/org/apache/nutch/mapred/lib/IdentityReducer.java > =================================================================== > --- src/java/org/apache/nutch/mapred/lib/IdentityReducer.java (revision > 374737) > +++ src/java/org/apache/nutch/mapred/lib/IdentityReducer.java (working copy) > @@ -42,4 +42,5 @@ > } > } > > + public void finished() {} > } > Index: src/java/org/apache/nutch/mapred/lib/IdentityMapper.java > =================================================================== > --- src/java/org/apache/nutch/mapred/lib/IdentityMapper.java (revision > 374737) > +++ src/java/org/apache/nutch/mapred/lib/IdentityMapper.java (working copy) > @@ -39,4 +39,5 @@ > output.collect(key, val); > } > > + public void finished() {} > } > Index: src/java/org/apache/nutch/mapred/lib/LongSumReducer.java > =================================================================== > --- src/java/org/apache/nutch/mapred/lib/LongSumReducer.java (revision > 374737) > +++ src/java/org/apache/nutch/mapred/lib/LongSumReducer.java (working copy) > @@ -47,4 +47,6 @@ > // output sum > output.collect(key, new LongWritable(sum)); > } > + > + public void finished() {} > } > Index: src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java > =================================================================== > --- src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java > (revision 374737) > +++ src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java > (working copy) > @@ -50,4 +50,6 @@ > output.collect(new UTF8(st.nextToken()), new LongWritable(1)); > } > } > + > + public void finished() {} > } > Index: src/java/org/apache/nutch/mapred/ReduceTask.java > =================================================================== > --- src/java/org/apache/nutch/mapred/ReduceTask.java (revision 374781) > +++ src/java/org/apache/nutch/mapred/ReduceTask.java (working copy) > @@ -275,6 +275,7 @@ > } > > } finally { > + reducer.finished(); > in.close(); > lfs.delete(new File(sortedFile)); // remove sorted > out.close(reporter); > Index: src/java/org/apache/nutch/mapred/MapTask.java > =================================================================== > --- src/java/org/apache/nutch/mapred/MapTask.java (revision 374737) > +++ src/java/org/apache/nutch/mapred/MapTask.java (working copy) > @@ -50,7 +50,7 @@ > public void write(DataOutput out) throws IOException { > super.write(out); > split.write(out); > - > + > } > public void readFields(DataInput in) throws IOException { > super.readFields(in); > @@ -126,6 +126,10 @@ > } > > } finally { > + if (combining) { > + ((CombiningCollector)collector).finished(); > + } > + > in.close(); // close input > } > } finally { > @@ -147,5 +151,5 @@ > public NutchConf getConf() { > return this.nutchConf; > } > - > + > } > Index: src/java/org/apache/nutch/mapred/MapRunner.java > =================================================================== > --- src/java/org/apache/nutch/mapred/MapRunner.java (revision 374737) > +++ src/java/org/apache/nutch/mapred/MapRunner.java (working copy) > @@ -38,18 +38,22 @@ > public void run(RecordReader input, OutputCollector output, > Reporter reporter) > throws IOException { > - while (true) { > - // allocate new key & value instances > - WritableComparable key = > - (WritableComparable)job.newInstance(inputKeyClass); > - Writable value = (Writable)job.newInstance(inputValueClass); > + try { > + while (true) { > + // allocate new key & value instances > + WritableComparable key = > + (WritableComparable)job.newInstance(inputKeyClass); > + Writable value = (Writable)job.newInstance(inputValueClass); > > - // read next key & value > - if (!input.next(key, value)) > - return; > + // read next key & value > + if (!input.next(key, value)) > + return; > > - // map pair to output > - mapper.map(key, value, output, reporter); > + // map pair to output > + mapper.map(key, value, output, reporter); > + } > + } finally { > + mapper.finished(); > } > } > > Index: src/java/org/apache/nutch/mapred/CombiningCollector.java > =================================================================== > --- src/java/org/apache/nutch/mapred/CombiningCollector.java (revision > 374780) > +++ src/java/org/apache/nutch/mapred/CombiningCollector.java (working copy) > @@ -78,4 +78,9 @@ > count = 0; > } > > + public synchronized void finished() > + { > + combiner.finished(); > + } > + > } > Index: src/java/org/apache/nutch/mapred/Reducer.java > =================================================================== > --- src/java/org/apache/nutch/mapred/Reducer.java (revision 374737) > +++ src/java/org/apache/nutch/mapred/Reducer.java (working copy) > @@ -38,4 +38,10 @@ > void reduce(WritableComparable key, Iterator values, > OutputCollector output, Reporter reporter) > throws IOException; > + > + /** Called after the last [EMAIL PROTECTED] #reduce} call on this Reducer > object. > + Typical implementations do nothing. > + */ > + void finished(); > + > } > Index: src/java/org/apache/nutch/crawl/CrawlDbReader.java > =================================================================== > --- src/java/org/apache/nutch/crawl/CrawlDbReader.java (revision > 374737) > +++ src/java/org/apache/nutch/crawl/CrawlDbReader.java (working copy) > @@ -50,9 +50,9 @@ > > /** > * Read utility for the CrawlDB. > - * > + * > * @author Andrzej Bialecki > - * > + * > */ > public class CrawlDbReader { > > @@ -68,6 +68,7 @@ > output.collect(new UTF8("retry"), new > LongWritable(cd.getRetriesSinceFetch())); > output.collect(new UTF8("score"), new LongWritable((long) > (cd.getScore() * 1000.0))); > } > + public void finished() {} > } > > public static class CrawlDbStatReducer implements Reducer { > @@ -121,6 +122,7 @@ > output.collect(new UTF8("avg score"), new LongWritable(total / cnt)); > } > } > + public void finished() {} > } > > public static class CrawlDbDumpReducer implements Reducer { > @@ -133,8 +135,11 @@ > > public void configure(JobConf job) { > } > + > + public void finished() { > + } > } > - > + > public void processStatJob(String crawlDb, NutchConf config) throws > IOException { > LOG.info("CrawlDb statistics start: " + crawlDb); > File tmpFolder = new File(crawlDb, "stat_tmp" + > System.currentTimeMillis()); > @@ -219,7 +224,7 @@ > System.out.println("not found"); > } > } > - > + > public void processDumpJob(String crawlDb, String output, NutchConf > config) throws IOException { > > LOG.info("CrawlDb dump: starting"); > @@ -270,4 +275,5 @@ > } > return; > } > + > } > Index: src/java/org/apache/nutch/crawl/LinkDb.java > =================================================================== > --- src/java/org/apache/nutch/crawl/LinkDb.java (revision 374779) > +++ src/java/org/apache/nutch/crawl/LinkDb.java (working copy) > @@ -118,7 +118,8 @@ > output.collect(key, result); > } > > - > + public void finished() {} > + > public void invert(File linkDb, File segmentsDir) throws IOException { > LOG.info("LinkDb: starting"); > LOG.info("LinkDb: linkdb: " + linkDb); > Index: src/java/org/apache/nutch/crawl/Injector.java > =================================================================== > --- src/java/org/apache/nutch/crawl/Injector.java (revision 374779) > +++ src/java/org/apache/nutch/crawl/Injector.java (working copy) > @@ -65,6 +65,8 @@ > interval)); > } > } > + > + public void finished() {} > } > > /** Combine multiple new entries for a url. */ > @@ -76,6 +78,7 @@ > throws IOException { > output.collect(key, (Writable)values.next()); // just collect first > value > } > + public void finished() {} > } > > /** Construct an Injector. */ > Index: src/java/org/apache/nutch/crawl/Generator.java > =================================================================== > --- src/java/org/apache/nutch/crawl/Generator.java (revision 374779) > +++ src/java/org/apache/nutch/crawl/Generator.java (working copy) > @@ -63,6 +63,8 @@ > output.collect(crawlDatum, key); // invert for sort by score > } > > + public void finished() {} > + > /** Partition by host (value). */ > public int getPartition(WritableComparable key, Writable value, > int numReduceTasks) { > Index: src/java/org/apache/nutch/crawl/CrawlDbReducer.java > =================================================================== > --- src/java/org/apache/nutch/crawl/CrawlDbReducer.java (revision > 374781) > +++ src/java/org/apache/nutch/crawl/CrawlDbReducer.java (working copy) > @@ -115,4 +115,5 @@ > } > } > > + public void finished() {} > } > Index: src/java/org/apache/nutch/parse/ParseSegment.java > =================================================================== > --- src/java/org/apache/nutch/parse/ParseSegment.java (revision 374776) > +++ src/java/org/apache/nutch/parse/ParseSegment.java (working copy) > @@ -78,6 +78,8 @@ > throws IOException { > output.collect(key, (Writable)values.next()); // collect first value > } > + > + public void finished() {} > > public void parse(File segment) throws IOException { > LOG.info("Parse: starting"); -- This message is automatically generated by JIRA. - If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa - For more information on JIRA, see: http://www.atlassian.com/software/jira
