Modified: nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java (original) +++ nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java Thu Jan 29 05:38:59 2015 @@ -107,8 +107,9 @@ public class WebGraph extends Configured * by domain and host can be ignored. The number of Outlinks out to a given * page or domain can also be limited. */ - public static class OutlinkDb extends Configured implements - Mapper<Text, Writable, Text, NutchWritable>, Reducer<Text, NutchWritable, Text, LinkDatum> { + public static class OutlinkDb extends Configured implements + Mapper<Text, Writable, Text, NutchWritable>, + Reducer<Text, NutchWritable, Text, LinkDatum> { public static final String URL_NORMALIZING = "webgraph.url.normalizers"; public static final String URL_FILTERING = "webgraph.url.filters"; @@ -133,7 +134,8 @@ public class WebGraph extends Configured /** * Normalizes and trims extra whitespace from the given url. * - * @param url The url to normalize. + * @param url + * The url to normalize. * * @return The normalized url. */ @@ -151,8 +153,7 @@ public class WebGraph extends Configured normalized = urlNormalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT); normalized = normalized.trim(); - } - catch (Exception e) { + } catch (Exception e) { LOG.warn("Skipping " + url + ":" + e); normalized = null; } @@ -162,9 +163,10 @@ public class WebGraph extends Configured /** * Filters the given url. - * - * @param url The url to filter. - * + * + * @param url + * The url to filter. + * * @return The filtered url or null. */ private String filterUrl(String url) { @@ -186,7 +188,8 @@ public class WebGraph extends Configured * Returns the fetch time from the parse data or the current system time if * the fetch time doesn't exist. * - * @param data The parse data. + * @param data + * The parse data. * * @return The fetch time as a long. */ @@ -198,8 +201,7 @@ public class WebGraph extends Configured try { // get the fetch time from the parse data fetchTime = Long.parseLong(fetchTimeStr); - } - catch (Exception e) { + } catch (Exception e) { fetchTime = System.currentTimeMillis(); } return fetchTime; @@ -244,8 +246,9 @@ public class WebGraph extends Configured * Passes through existing LinkDatum objects from an existing OutlinkDb and * maps out new LinkDatum objects from new crawls ParseData. */ - public void map(Text key, Writable value, OutputCollector<Text, NutchWritable> - output, Reporter reporter) throws IOException { + public void map(Text key, Writable value, + OutputCollector<Text, NutchWritable> output, Reporter reporter) + throws IOException { // normalize url, stop processing if null String url = normalizeUrl(key.toString()); @@ -262,20 +265,19 @@ public class WebGraph extends Configured key.set(url); if (value instanceof CrawlDatum) { - CrawlDatum datum = (CrawlDatum)value; + CrawlDatum datum = (CrawlDatum) value; - if (datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP || - datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM || - datum.getStatus() == CrawlDatum.STATUS_FETCH_GONE) { + if (datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP + || datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM + || datum.getStatus() == CrawlDatum.STATUS_FETCH_GONE) { // Tell the reducer to get rid of all instances of this key output.collect(key, new NutchWritable(new BooleanWritable(true))); } - } - else if (value instanceof ParseData) { + } else if (value instanceof ParseData) { // get the parse data and the outlinks from the parse data, along with // the fetch time for those links - ParseData data = (ParseData)value; + ParseData data = (ParseData) value; long fetchTime = getFetchTime(data); Outlink[] outlinkAr = data.getOutlinks(); Map<String, String> outlinkMap = new LinkedHashMap<String, String>(); @@ -307,9 +309,8 @@ public class WebGraph extends Configured LinkDatum datum = new LinkDatum(outlinkUrl, anchor, fetchTime); output.collect(key, new NutchWritable(datum)); } - } - else if (value instanceof LinkDatum) { - LinkDatum datum = (LinkDatum)value; + } else if (value instanceof LinkDatum) { + LinkDatum datum = (LinkDatum) value; String linkDatumUrl = normalizeUrl(datum.getUrl()); if (filterUrl(linkDatumUrl) != null) { @@ -323,7 +324,7 @@ public class WebGraph extends Configured public void reduce(Text key, Iterator<NutchWritable> values, OutputCollector<Text, LinkDatum> output, Reporter reporter) - throws IOException { + throws IOException { // aggregate all outlinks, get the most recent timestamp for a fetch // which should be the timestamp for all of the most recent outlinks @@ -334,17 +335,17 @@ public class WebGraph extends Configured if (value instanceof LinkDatum) { // loop through, change out most recent timestamp if needed - LinkDatum next = (LinkDatum)value; + LinkDatum next = (LinkDatum) value; long timestamp = next.getTimestamp(); if (mostRecent == 0L || mostRecent < timestamp) { mostRecent = timestamp; } outlinkList.add(WritableUtils.clone(next, conf)); reporter.incrCounter("WebGraph.outlinks", "added links", 1); - } - else if (value instanceof BooleanWritable) { - BooleanWritable delete = (BooleanWritable)value; - // Actually, delete is always true, otherwise we don't emit it in the mapper in the first place + } else if (value instanceof BooleanWritable) { + BooleanWritable delete = (BooleanWritable) value; + // Actually, delete is always true, otherwise we don't emit it in the + // mapper in the first place if (delete.get() == true) { // This page is gone, do not emit it's outlinks reporter.incrCounter("WebGraph.outlinks", "removed links", 1); @@ -378,7 +379,8 @@ public class WebGraph extends Configured && (!limitPages || (limitPages && !pages.contains(toPage))) && (!limitDomains || (limitDomains && !domains.contains(toDomain))) && (!ignoreHost || (ignoreHost && !toHost.equalsIgnoreCase(host))) - && (!ignoreDomain || (ignoreDomain && !toDomain.equalsIgnoreCase(domain)))) { + && (!ignoreDomain || (ignoreDomain && !toDomain + .equalsIgnoreCase(domain)))) { output.collect(key, datum); pages.add(toPage); domains.add(toDomain); @@ -395,7 +397,8 @@ public class WebGraph extends Configured * OutlinkDb LinkDatum objects and are regenerated each time the WebGraph is * updated. */ - private static class InlinkDb extends Configured implements Mapper<Text, LinkDatum, Text, LinkDatum> { + private static class InlinkDb extends Configured implements + Mapper<Text, LinkDatum, Text, LinkDatum> { private long timestamp; @@ -414,8 +417,9 @@ public class WebGraph extends Configured * Inverts the Outlink LinkDatum objects into new LinkDatum objects with a * new system timestamp, type and to and from url switched. */ - public void map(Text key, LinkDatum datum, OutputCollector<Text, LinkDatum> - output, Reporter reporter) throws IOException { + public void map(Text key, LinkDatum datum, + OutputCollector<Text, LinkDatum> output, Reporter reporter) + throws IOException { // get the to and from url and the anchor String fromUrl = key.toString(); @@ -433,21 +437,25 @@ public class WebGraph extends Configured * Creates the Node database which consists of the number of in and outlinks * for each url and a score slot for analysis programs such as LinkRank. */ - private static class NodeDb extends Configured implements Reducer<Text, LinkDatum, Text, Node> { + private static class NodeDb extends Configured implements + Reducer<Text, LinkDatum, Text, Node> { /** * Configures job. */ - public void configure(JobConf conf) { } + public void configure(JobConf conf) { + } - public void close() { } + public void close() { + } /** * Counts the number of inlinks and outlinks for each url and sets a default * score of 0.0 for each url (node) in the webgraph. */ - public void reduce(Text key, Iterator<LinkDatum> values, OutputCollector<Text, Node> - output, Reporter reporter) throws IOException { + public void reduce(Text key, Iterator<LinkDatum> values, + OutputCollector<Text, Node> output, Reporter reporter) + throws IOException { Node node = new Node(); int numInlinks = 0; @@ -458,8 +466,7 @@ public class WebGraph extends Configured LinkDatum next = values.next(); if (next.getLinkType() == LinkDatum.INLINK) { numInlinks++; - } - else if (next.getLinkType() == LinkDatum.OUTLINK) { + } else if (next.getLinkType() == LinkDatum.OUTLINK) { numOutlinks++; } } @@ -477,16 +484,21 @@ public class WebGraph extends Configured * Node. If a current WebGraph exists then it is updated, if it doesn't exist * then a new WebGraph database is created. * - * @param webGraphDb The WebGraph to create or update. - * @param segments The array of segments used to update the WebGraph. Newer - * segments and fetch times will overwrite older segments. - * @param normalize whether to use URLNormalizers on URL's in the segment - * @param filter whether to use URLFilters on URL's in the segment + * @param webGraphDb + * The WebGraph to create or update. + * @param segments + * The array of segments used to update the WebGraph. Newer segments + * and fetch times will overwrite older segments. + * @param normalize + * whether to use URLNormalizers on URL's in the segment + * @param filter + * whether to use URLFilters on URL's in the segment * - * @throws IOException If an error occurs while processing the WebGraph. + * @throws IOException + * If an error occurs while processing the WebGraph. */ - public void createWebGraph(Path webGraphDb, Path[] segments, boolean normalize, boolean filter) - throws IOException { + public void createWebGraph(Path webGraphDb, Path[] segments, + boolean normalize, boolean filter) throws IOException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); @@ -563,7 +575,8 @@ public class WebGraph extends Configured outlinkJob.setOutputValueClass(LinkDatum.class); FileOutputFormat.setOutputPath(outlinkJob, tempOutlinkDb); outlinkJob.setOutputFormat(MapFileOutputFormat.class); - outlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); + outlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", + false); // run the outlinkdb job and replace any old outlinkdb with the new one try { @@ -572,10 +585,10 @@ public class WebGraph extends Configured LOG.info("OutlinkDb: installing " + outlinkDb); FSUtils.replace(fs, oldOutlinkDb, outlinkDb, true); FSUtils.replace(fs, outlinkDb, tempOutlinkDb, true); - if (!preserveBackup && fs.exists(oldOutlinkDb)) fs.delete(oldOutlinkDb, true); + if (!preserveBackup && fs.exists(oldOutlinkDb)) + fs.delete(oldOutlinkDb, true); LOG.info("OutlinkDb: finished"); - } - catch (IOException e) { + } catch (IOException e) { // remove lock file and and temporary directory if an error occurs LockUtil.removeLockFile(fs, lock); @@ -603,7 +616,8 @@ public class WebGraph extends Configured inlinkJob.setOutputValueClass(LinkDatum.class); FileOutputFormat.setOutputPath(inlinkJob, tempInlinkDb); inlinkJob.setOutputFormat(MapFileOutputFormat.class); - inlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); + inlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", + false); try { @@ -613,14 +627,13 @@ public class WebGraph extends Configured LOG.info("InlinkDb: installing " + inlinkDb); FSUtils.replace(fs, inlinkDb, tempInlinkDb, true); LOG.info("InlinkDb: finished"); - } - catch (IOException e) { + } catch (IOException e) { // remove lock file and and temporary directory if an error occurs LockUtil.removeLockFile(fs, lock); if (fs.exists(tempInlinkDb)) { fs.delete(tempInlinkDb, true); - } + } LOG.error(StringUtils.stringifyException(e)); throw e; } @@ -644,7 +657,8 @@ public class WebGraph extends Configured nodeJob.setOutputValueClass(Node.class); FileOutputFormat.setOutputPath(nodeJob, tempNodeDb); nodeJob.setOutputFormat(MapFileOutputFormat.class); - nodeJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); + nodeJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", + false); try { @@ -654,14 +668,13 @@ public class WebGraph extends Configured LOG.info("NodeDb: installing " + nodeDb); FSUtils.replace(fs, nodeDb, tempNodeDb, true); LOG.info("NodeDb: finished"); - } - catch (IOException e) { + } catch (IOException e) { // remove lock file and and temporary directory if an error occurs LockUtil.removeLockFile(fs, lock); if (fs.exists(tempNodeDb)) { fs.delete(tempNodeDb, true); - } + } LOG.error(StringUtils.stringifyException(e)); throw e; } @@ -670,7 +683,8 @@ public class WebGraph extends Configured LockUtil.removeLockFile(fs, lock); long end = System.currentTimeMillis(); - LOG.info("WebGraphDb: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); + LOG.info("WebGraphDb: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); } public static void main(String[] args) throws Exception { @@ -683,26 +697,29 @@ public class WebGraph extends Configured */ public int run(String[] args) throws Exception { - //boolean options + // boolean options Option helpOpt = new Option("h", "help", false, "show this help message"); - Option normOpt = new Option("n", "normalize", false, "whether to use URLNormalizers on the URL's in the segment"); - Option filtOpt = new Option("f", "filter", false, "whether to use URLFilters on the URL's in the segment"); + Option normOpt = new Option("n", "normalize", false, + "whether to use URLNormalizers on the URL's in the segment"); + Option filtOpt = new Option("f", "filter", false, + "whether to use URLFilters on the URL's in the segment"); - //argument options + // argument options @SuppressWarnings("static-access") - Option graphOpt = OptionBuilder.withArgName("webgraphdb") - .hasArg().withDescription("the web graph database to create (if none exists) or use if one does") + Option graphOpt = OptionBuilder + .withArgName("webgraphdb") + .hasArg() + .withDescription( + "the web graph database to create (if none exists) or use if one does") .create("webgraphdb"); @SuppressWarnings("static-access") - Option segOpt = OptionBuilder.withArgName("segment") - .hasArgs().withDescription("the segment(s) to use") - .create("segment"); + Option segOpt = OptionBuilder.withArgName("segment").hasArgs() + .withDescription("the segment(s) to use").create("segment"); @SuppressWarnings("static-access") - Option segDirOpt = OptionBuilder.withArgName("segmentDir") - .hasArgs().withDescription("the segment directory to use") - .create("segmentDir"); - - //create the options + Option segDirOpt = OptionBuilder.withArgName("segmentDir").hasArgs() + .withDescription("the segment directory to use").create("segmentDir"); + + // create the options Options options = new Options(); options.addOption(helpOpt); options.addOption(normOpt); @@ -738,7 +755,8 @@ public class WebGraph extends Configured if (line.hasOption("segmentDir")) { Path dir = new Path(line.getOptionValue("segmentDir")); FileSystem fs = dir.getFileSystem(getConf()); - FileStatus[] fstats = fs.listStatus(dir, HadoopFSUtil.getPassDirectoriesFilter(fs)); + FileStatus[] fstats = fs.listStatus(dir, + HadoopFSUtil.getPassDirectoriesFilter(fs)); segPaths = HadoopFSUtil.getPaths(fstats); } @@ -756,8 +774,7 @@ public class WebGraph extends Configured createWebGraph(new Path(webGraphDb), segPaths, normalize, filter); return 0; - } - catch (Exception e) { + } catch (Exception e) { LOG.error("WebGraph: " + StringUtils.stringifyException(e)); return -2; }
Modified: nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/package-info.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/package-info.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/package-info.java (original) +++ nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/package-info.java Thu Jan 29 05:38:59 2015 @@ -21,3 +21,4 @@ * see {@link org.apache.nutch.scoring.webgraph.WebGraph}. */ package org.apache.nutch.scoring.webgraph; + Modified: nutch/trunk/src/java/org/apache/nutch/segment/ContentAsTextInputFormat.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/segment/ContentAsTextInputFormat.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/segment/ContentAsTextInputFormat.java (original) +++ nutch/trunk/src/java/org/apache/nutch/segment/ContentAsTextInputFormat.java Thu Jan 29 05:38:59 2015 @@ -31,14 +31,14 @@ import org.apache.nutch.protocol.Content /** * An input format that takes Nutch Content objects and converts them to text - * while converting newline endings to spaces. This format is useful for working + * while converting newline endings to spaces. This format is useful for working * with Nutch content objects in Hadoop Streaming with other languages. */ -public class ContentAsTextInputFormat - extends SequenceFileInputFormat<Text, Text> { +public class ContentAsTextInputFormat extends + SequenceFileInputFormat<Text, Text> { - private static class ContentAsTextRecordReader - implements RecordReader<Text, Text> { + private static class ContentAsTextRecordReader implements + RecordReader<Text, Text> { private final SequenceFileRecordReader<Text, Content> sequenceFileRecordReader; @@ -46,9 +46,9 @@ public class ContentAsTextInputFormat private Content innerValue; public ContentAsTextRecordReader(Configuration conf, FileSplit split) - throws IOException { + throws IOException { sequenceFileRecordReader = new SequenceFileRecordReader<Text, Content>( - conf, split); + conf, split); innerKey = sequenceFileRecordReader.createKey(); innerValue = sequenceFileRecordReader.createValue(); } @@ -61,9 +61,8 @@ public class ContentAsTextInputFormat return new Text(); } - public synchronized boolean next(Text key, Text value) - throws IOException { - + public synchronized boolean next(Text key, Text value) throws IOException { + // convert the content object to text Text tKey = key; Text tValue = value; @@ -72,26 +71,23 @@ public class ContentAsTextInputFormat } tKey.set(innerKey.toString()); String contentAsStr = new String(innerValue.getContent()); - + // replace new line endings with spaces contentAsStr = contentAsStr.replaceAll("\n", " "); value.set(contentAsStr); - + return true; } - public float getProgress() - throws IOException { + public float getProgress() throws IOException { return sequenceFileRecordReader.getProgress(); } - public synchronized long getPos() - throws IOException { + public synchronized long getPos() throws IOException { return sequenceFileRecordReader.getPos(); } - public synchronized void close() - throws IOException { + public synchronized void close() throws IOException { sequenceFileRecordReader.close(); } } @@ -101,10 +97,9 @@ public class ContentAsTextInputFormat } public RecordReader<Text, Text> getRecordReader(InputSplit split, - JobConf job, Reporter reporter) - throws IOException { + JobConf job, Reporter reporter) throws IOException { reporter.setStatus(split.toString()); - return new ContentAsTextRecordReader(job, (FileSplit)split); + return new ContentAsTextRecordReader(job, (FileSplit) split); } } Modified: nutch/trunk/src/java/org/apache/nutch/segment/SegmentMergeFilters.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMergeFilters.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/segment/SegmentMergeFilters.java (original) +++ nutch/trunk/src/java/org/apache/nutch/segment/SegmentMergeFilters.java Thu Jan 29 05:38:59 2015 @@ -39,7 +39,8 @@ import org.apache.nutch.protocol.Content * */ public class SegmentMergeFilters { - private static final Logger LOG = LoggerFactory.getLogger(SegmentMergeFilters.class); + private static final Logger LOG = LoggerFactory + .getLogger(SegmentMergeFilters.class); private SegmentMergeFilter[] filters; public SegmentMergeFilters(Configuration conf) { Modified: nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java (original) +++ nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Thu Jan 29 05:38:59 2015 @@ -73,40 +73,47 @@ import org.apache.nutch.util.NutchJob; * <p> * Also, it's possible to slice the resulting segment into chunks of fixed size. * </p> - * <h3>Important Notes</h3> - * <h4>Which parts are merged?</h4> - * <p>It doesn't make sense to merge data from segments, which are at different stages - * of processing (e.g. one unfetched segment, one fetched but not parsed, and - * one fetched and parsed). Therefore, prior to merging, the tool will determine - * the lowest common set of input data, and only this data will be merged. - * This may have some unintended consequences: - * e.g. if majority of input segments are fetched and parsed, but one of them is unfetched, - * the tool will fall back to just merging fetchlists, and it will skip all other data - * from all segments.</p> + * <h3>Important Notes</h3> <h4>Which parts are merged?</h4> + * <p> + * It doesn't make sense to merge data from segments, which are at different + * stages of processing (e.g. one unfetched segment, one fetched but not parsed, + * and one fetched and parsed). Therefore, prior to merging, the tool will + * determine the lowest common set of input data, and only this data will be + * merged. This may have some unintended consequences: e.g. if majority of input + * segments are fetched and parsed, but one of them is unfetched, the tool will + * fall back to just merging fetchlists, and it will skip all other data from + * all segments. + * </p> * <h4>Merging fetchlists</h4> - * <p>Merging segments, which contain just fetchlists (i.e. prior to fetching) - * is not recommended, because this tool (unlike the {@link org.apache.nutch.crawl.Generator} - * doesn't ensure that fetchlist parts for each map task are disjoint.</p> + * <p> + * Merging segments, which contain just fetchlists (i.e. prior to fetching) is + * not recommended, because this tool (unlike the + * {@link org.apache.nutch.crawl.Generator} doesn't ensure that fetchlist parts + * for each map task are disjoint. + * </p> * <p> * <h4>Duplicate content</h4> - * Merging segments removes older content whenever possible (see below). However, - * this is NOT the same as de-duplication, which in addition removes identical - * content found at different URL-s. In other words, running DeleteDuplicates is - * still necessary. + * Merging segments removes older content whenever possible (see below). + * However, this is NOT the same as de-duplication, which in addition removes + * identical content found at different URL-s. In other words, running + * DeleteDuplicates is still necessary. + * </p> + * <p> + * For some types of data (especially ParseText) it's not possible to determine + * which version is really older. Therefore the tool always uses segment names + * as timestamps, for all types of input data. Segment names are compared in + * forward lexicographic order (0-9a-zA-Z), and data from segments with "higher" + * names will prevail. It follows then that it is extremely important that + * segments be named in an increasing lexicographic order as their creation time + * increases. * </p> - * <p>For some types of data (especially ParseText) it's not possible to determine - * which version is really older. Therefore the tool always uses segment names as - * timestamps, for all types of input data. Segment names are compared in forward lexicographic - * order (0-9a-zA-Z), and data from segments with "higher" names will prevail. - * It follows then that it is extremely important that segments be named in an - * increasing lexicographic order as their creation time increases.</p> * <p> * <h4>Merging and indexes</h4> * Merged segment gets a different name. Since Indexer embeds segment names in - * indexes, any indexes originally created for the input segments will NOT work with the - * merged segment. Newly created merged segment(s) need to be indexed afresh. - * This tool doesn't use existing indexes in any way, so if - * you plan to merge segments you don't have to index them prior to merging. + * indexes, any indexes originally created for the input segments will NOT work + * with the merged segment. Newly created merged segment(s) need to be indexed + * afresh. This tool doesn't use existing indexes in any way, so if you plan to + * merge segments you don't have to index them prior to merging. * * * @author Andrzej Bialecki @@ -114,7 +121,8 @@ import org.apache.nutch.util.NutchJob; public class SegmentMerger extends Configured implements Mapper<Text, MetaWrapper, Text, MetaWrapper>, Reducer<Text, MetaWrapper, Text, MetaWrapper> { - private static final Logger LOG = LoggerFactory.getLogger(SegmentMerger.class); + private static final Logger LOG = LoggerFactory + .getLogger(SegmentMerger.class); private static final String SEGMENT_PART_KEY = "part"; private static final String SEGMENT_SLICE_KEY = "slice"; @@ -124,20 +132,21 @@ public class SegmentMerger extends Confi private SegmentMergeFilters mergeFilters = null; private long sliceSize = -1; private long curCount = 0; - + /** - * Wraps inputs in an {@link MetaWrapper}, to permit merging different - * types in reduce and use additional metadata. + * Wraps inputs in an {@link MetaWrapper}, to permit merging different types + * in reduce and use additional metadata. */ public static class ObjectInputFormat extends - SequenceFileInputFormat<Text, MetaWrapper> { - + SequenceFileInputFormat<Text, MetaWrapper> { + @Override - public RecordReader<Text, MetaWrapper> getRecordReader(final InputSplit split, - final JobConf job, Reporter reporter) throws IOException { + public RecordReader<Text, MetaWrapper> getRecordReader( + final InputSplit split, final JobConf job, Reporter reporter) + throws IOException { reporter.setStatus(split.toString()); - + // find part name SegmentPart segmentPart; final String spString; @@ -148,10 +157,10 @@ public class SegmentMerger extends Confi } catch (IOException e) { throw new RuntimeException("Cannot identify segment:", e); } - - SequenceFile.Reader reader = - new SequenceFile.Reader(FileSystem.get(job), fSplit.getPath(), job); - + + SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.get(job), + fSplit.getPath(), job); + final Writable w; try { w = (Writable) reader.getValueClass().newInstance(); @@ -164,13 +173,14 @@ public class SegmentMerger extends Confi // ignore } } - final SequenceFileRecordReader<Text,Writable> splitReader = - new SequenceFileRecordReader<Text,Writable>(job, (FileSplit)split); + final SequenceFileRecordReader<Text, Writable> splitReader = new SequenceFileRecordReader<Text, Writable>( + job, (FileSplit) split); try { return new SequenceFileRecordReader<Text, MetaWrapper>(job, fSplit) { - - public synchronized boolean next(Text key, MetaWrapper wrapper) throws IOException { + + public synchronized boolean next(Text key, MetaWrapper wrapper) + throws IOException { LOG.debug("Running OIF.next()"); boolean res = splitReader.next(key, w); @@ -178,17 +188,17 @@ public class SegmentMerger extends Confi wrapper.setMeta(SEGMENT_PART_KEY, spString); return res; } - + @Override public synchronized void close() throws IOException { splitReader.close(); } - + @Override public MetaWrapper createValue() { return new MetaWrapper(); } - + }; } catch (IOException e) { throw new RuntimeException("Cannot create RecordReader: ", e); @@ -196,11 +206,14 @@ public class SegmentMerger extends Confi } } - public static class SegmentOutputFormat extends FileOutputFormat<Text, MetaWrapper> { + public static class SegmentOutputFormat extends + FileOutputFormat<Text, MetaWrapper> { private static final String DEFAULT_SLICE = "default"; - + @Override - public RecordWriter<Text, MetaWrapper> getRecordWriter(final FileSystem fs, final JobConf job, final String name, final Progressable progress) throws IOException { + public RecordWriter<Text, MetaWrapper> getRecordWriter(final FileSystem fs, + final JobConf job, final String name, final Progressable progress) + throws IOException { return new RecordWriter<Text, MetaWrapper>() { MapFile.Writer c_out = null; MapFile.Writer f_out = null; @@ -210,7 +223,7 @@ public class SegmentMerger extends Confi SequenceFile.Writer p_out = null; HashMap<String, Closeable> sliceWriters = new HashMap<String, Closeable>(); String segmentName = job.get("segment.merger.segmentName"); - + public void write(Text key, MetaWrapper wrapper) throws IOException { // unwrap SegmentPart sp = SegmentPart.parse(wrapper.getMeta(SEGMENT_PART_KEY)); @@ -221,13 +234,15 @@ public class SegmentMerger extends Confi g_out = ensureSequenceFile(slice, CrawlDatum.GENERATE_DIR_NAME); g_out.append(key, o); } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) { - f_out = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME, CrawlDatum.class); + f_out = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME, + CrawlDatum.class); f_out.append(key, o); } else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) { p_out = ensureSequenceFile(slice, CrawlDatum.PARSE_DIR_NAME); p_out.append(key, o); } else { - throw new IOException("Cannot determine segment part: " + sp.partName); + throw new IOException("Cannot determine segment part: " + + sp.partName); } } else if (o instanceof Content) { c_out = ensureMapFile(slice, Content.DIR_NAME, Content.class); @@ -235,9 +250,11 @@ public class SegmentMerger extends Confi } else if (o instanceof ParseData) { // update the segment name inside contentMeta - required by Indexer if (slice == null) { - ((ParseData)o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName); + ((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY, + segmentName); } else { - ((ParseData)o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName + "-" + slice); + ((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY, + segmentName + "-" + slice); } pd_out = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class); pd_out.append(key, o); @@ -246,20 +263,26 @@ public class SegmentMerger extends Confi pt_out.append(key, o); } } - + // lazily create SequenceFile-s. - private SequenceFile.Writer ensureSequenceFile(String slice, String dirName) throws IOException { - if (slice == null) slice = DEFAULT_SLICE; - SequenceFile.Writer res = (SequenceFile.Writer)sliceWriters.get(slice + dirName); - if (res != null) return res; + private SequenceFile.Writer ensureSequenceFile(String slice, + String dirName) throws IOException { + if (slice == null) + slice = DEFAULT_SLICE; + SequenceFile.Writer res = (SequenceFile.Writer) sliceWriters + .get(slice + dirName); + if (res != null) + return res; Path wname; Path out = FileOutputFormat.getOutputPath(job); if (slice == DEFAULT_SLICE) { - wname = new Path(new Path(new Path(out, segmentName), dirName), name); + wname = new Path(new Path(new Path(out, segmentName), dirName), + name); } else { - wname = new Path(new Path(new Path(out, segmentName + "-" + slice), dirName), name); + wname = new Path(new Path(new Path(out, segmentName + "-" + slice), + dirName), name); } - res = SequenceFile.createWriter(fs, job, wname, Text.class, + res = SequenceFile.createWriter(fs, job, wname, Text.class, CrawlDatum.class, SequenceFileOutputFormat.getOutputCompressionType(job), progress); sliceWriters.put(slice + dirName, res); @@ -267,23 +290,30 @@ public class SegmentMerger extends Confi } // lazily create MapFile-s. - private MapFile.Writer ensureMapFile(String slice, String dirName, Class<? extends Writable> clazz) throws IOException { - if (slice == null) slice = DEFAULT_SLICE; - MapFile.Writer res = (MapFile.Writer)sliceWriters.get(slice + dirName); - if (res != null) return res; + private MapFile.Writer ensureMapFile(String slice, String dirName, + Class<? extends Writable> clazz) throws IOException { + if (slice == null) + slice = DEFAULT_SLICE; + MapFile.Writer res = (MapFile.Writer) sliceWriters.get(slice + + dirName); + if (res != null) + return res; Path wname; Path out = FileOutputFormat.getOutputPath(job); if (slice == DEFAULT_SLICE) { - wname = new Path(new Path(new Path(out, segmentName), dirName), name); + wname = new Path(new Path(new Path(out, segmentName), dirName), + name); } else { - wname = new Path(new Path(new Path(out, segmentName + "-" + slice), dirName), name); + wname = new Path(new Path(new Path(out, segmentName + "-" + slice), + dirName), name); } - CompressionType compType = - SequenceFileOutputFormat.getOutputCompressionType(job); + CompressionType compType = SequenceFileOutputFormat + .getOutputCompressionType(job); if (clazz.isAssignableFrom(ParseText.class)) { compType = CompressionType.RECORD; } - res = new MapFile.Writer(job, fs, wname.toString(), Text.class, clazz, compType, progress); + res = new MapFile.Writer(job, fs, wname.toString(), Text.class, + clazz, compType, progress); sliceWriters.put(slice + dirName, res); return res; } @@ -293,9 +323,9 @@ public class SegmentMerger extends Confi while (it.hasNext()) { Object o = it.next(); if (o instanceof SequenceFile.Writer) { - ((SequenceFile.Writer)o).close(); + ((SequenceFile.Writer) o).close(); } else { - ((MapFile.Writer)o).close(); + ((MapFile.Writer) o).close(); } } } @@ -306,14 +336,15 @@ public class SegmentMerger extends Confi public SegmentMerger() { super(null); } - + public SegmentMerger(Configuration conf) { super(conf); } - + public void setConf(Configuration conf) { super.setConf(conf); - if (conf == null) return; + if (conf == null) + return; if (conf.getBoolean("segment.merger.filter", false)) { filters = new URLFilters(conf); mergeFilters = new SegmentMergeFilters(conf); @@ -335,15 +366,18 @@ public class SegmentMerger extends Confi sliceSize = sliceSize / conf.getNumReduceTasks(); } } - + private Text newKey = new Text(); - + public void map(Text key, MetaWrapper value, - OutputCollector<Text, MetaWrapper> output, Reporter reporter) throws IOException { + OutputCollector<Text, MetaWrapper> output, Reporter reporter) + throws IOException { String url = key.toString(); if (normalizers != null) { try { - url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT); // normalize the url + url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT); // normalize + // the + // url } catch (Exception e) { LOG.warn("Skipping " + url + ":" + e.getMessage()); url = null; @@ -357,7 +391,7 @@ public class SegmentMerger extends Confi url = null; } } - if(url != null) { + if (url != null) { newKey.set(url); output.collect(newKey, value); } @@ -365,12 +399,13 @@ public class SegmentMerger extends Confi /** * NOTE: in selecting the latest version we rely exclusively on the segment - * name (not all segment data contain time information). Therefore it is extremely - * important that segments be named in an increasing lexicographic order as - * their creation time increases. + * name (not all segment data contain time information). Therefore it is + * extremely important that segments be named in an increasing lexicographic + * order as their creation time increases. */ public void reduce(Text key, Iterator<MetaWrapper> values, - OutputCollector<Text, MetaWrapper> output, Reporter reporter) throws IOException { + OutputCollector<Text, MetaWrapper> output, Reporter reporter) + throws IOException { CrawlDatum lastG = null; CrawlDatum lastF = null; CrawlDatum lastSig = null; @@ -383,18 +418,17 @@ public class SegmentMerger extends Confi String lastCname = null; String lastPDname = null; String lastPTname = null; - TreeMap<String, ArrayList<CrawlDatum>> linked = - new TreeMap<String, ArrayList<CrawlDatum>>(); + TreeMap<String, ArrayList<CrawlDatum>> linked = new TreeMap<String, ArrayList<CrawlDatum>>(); while (values.hasNext()) { MetaWrapper wrapper = values.next(); Object o = wrapper.get(); String spString = wrapper.getMeta(SEGMENT_PART_KEY); if (spString == null) { - throw new IOException("Null segment part, key=" + key); + throw new IOException("Null segment part, key=" + key); } SegmentPart sp = SegmentPart.parse(spString); if (o instanceof CrawlDatum) { - CrawlDatum val = (CrawlDatum)o; + CrawlDatum val = (CrawlDatum) o; // check which output dir it belongs to if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) { if (lastG == null) { @@ -411,9 +445,9 @@ public class SegmentMerger extends Confi // only consider fetch status and ignore fetch retry status // https://issues.apache.org/jira/browse/NUTCH-1520 // https://issues.apache.org/jira/browse/NUTCH-1113 - if (CrawlDatum.hasFetchStatus(val) && - val.getStatus() != CrawlDatum.STATUS_FETCH_RETRY && - val.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) { + if (CrawlDatum.hasFetchStatus(val) + && val.getStatus() != CrawlDatum.STATUS_FETCH_RETRY + && val.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) { if (lastF == null) { lastF = val; lastFname = sp.segmentName; @@ -450,40 +484,40 @@ public class SegmentMerger extends Confi } } else if (o instanceof Content) { if (lastC == null) { - lastC = (Content)o; + lastC = (Content) o; lastCname = sp.segmentName; } else { if (lastCname.compareTo(sp.segmentName) < 0) { - lastC = (Content)o; + lastC = (Content) o; lastCname = sp.segmentName; } } } else if (o instanceof ParseData) { if (lastPD == null) { - lastPD = (ParseData)o; + lastPD = (ParseData) o; lastPDname = sp.segmentName; } else { if (lastPDname.compareTo(sp.segmentName) < 0) { - lastPD = (ParseData)o; + lastPD = (ParseData) o; lastPDname = sp.segmentName; } } } else if (o instanceof ParseText) { if (lastPT == null) { - lastPT = (ParseText)o; + lastPT = (ParseText) o; lastPTname = sp.segmentName; } else { if (lastPTname.compareTo(sp.segmentName) < 0) { - lastPT = (ParseText)o; + lastPT = (ParseText) o; lastPTname = sp.segmentName; } } } } - // perform filtering based on full merge record - if (mergeFilters != null && - !mergeFilters.filter(key, lastG, lastF, lastSig, lastC, lastPD, lastPT, - linked.isEmpty() ? null : linked.lastEntry().getValue())){ + // perform filtering based on full merge record + if (mergeFilters != null + && !mergeFilters.filter(key, lastG, lastF, lastSig, lastC, lastPD, + lastPT, linked.isEmpty() ? null : linked.lastEntry().getValue())) { return; } @@ -552,10 +586,12 @@ public class SegmentMerger extends Confi } } - public void merge(Path out, Path[] segs, boolean filter, boolean normalize, long slice) throws Exception { + public void merge(Path out, Path[] segs, boolean filter, boolean normalize, + long slice) throws Exception { String segmentName = Generator.generateSegmentName(); if (LOG.isInfoEnabled()) { - LOG.info("Merging " + segs.length + " segments to " + out + "/" + segmentName); + LOG.info("Merging " + segs.length + " segments to " + out + "/" + + segmentName); } JobConf job = new NutchJob(getConf()); job.setJobName("mergesegs " + out + "/" + segmentName); @@ -596,17 +632,24 @@ public class SegmentMerger extends Confi pt = pt && fs.exists(ptDir); } StringBuffer sb = new StringBuffer(); - if (c) sb.append(" " + Content.DIR_NAME); - if (g) sb.append(" " + CrawlDatum.GENERATE_DIR_NAME); - if (f) sb.append(" " + CrawlDatum.FETCH_DIR_NAME); - if (p) sb.append(" " + CrawlDatum.PARSE_DIR_NAME); - if (pd) sb.append(" " + ParseData.DIR_NAME); - if (pt) sb.append(" " + ParseText.DIR_NAME); + if (c) + sb.append(" " + Content.DIR_NAME); + if (g) + sb.append(" " + CrawlDatum.GENERATE_DIR_NAME); + if (f) + sb.append(" " + CrawlDatum.FETCH_DIR_NAME); + if (p) + sb.append(" " + CrawlDatum.PARSE_DIR_NAME); + if (pd) + sb.append(" " + ParseData.DIR_NAME); + if (pt) + sb.append(" " + ParseText.DIR_NAME); if (LOG.isInfoEnabled()) { LOG.info("SegmentMerger: using segment data from:" + sb.toString()); } for (int i = 0; i < segs.length; i++) { - if (segs[i] == null) continue; + if (segs[i] == null) + continue; if (g) { Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME); FileInputFormat.addInputPath(job, gDir); @@ -639,9 +682,9 @@ public class SegmentMerger extends Confi job.setOutputKeyClass(Text.class); job.setOutputValueClass(MetaWrapper.class); job.setOutputFormat(SegmentOutputFormat.class); - + setConf(job); - + JobClient.runJob(job); } @@ -650,13 +693,19 @@ public class SegmentMerger extends Confi */ public static void main(String[] args) throws Exception { if (args.length < 2) { - System.err.println("SegmentMerger output_dir (-dir segments | seg1 seg2 ...) [-filter] [-slice NNNN]"); - System.err.println("\toutput_dir\tname of the parent dir for output segment slice(s)"); - System.err.println("\t-dir segments\tparent dir containing several segments"); + System.err + .println("SegmentMerger output_dir (-dir segments | seg1 seg2 ...) [-filter] [-slice NNNN]"); + System.err + .println("\toutput_dir\tname of the parent dir for output segment slice(s)"); + System.err + .println("\t-dir segments\tparent dir containing several segments"); System.err.println("\tseg1 seg2 ...\tlist of segment dirs"); - System.err.println("\t-filter\t\tfilter out URL-s prohibited by current URLFilters"); - System.err.println("\t-normalize\t\tnormalize URL via current URLNormalizers"); - System.err.println("\t-slice NNNN\tcreate many output segments, each containing NNNN URLs"); + System.err + .println("\t-filter\t\tfilter out URL-s prohibited by current URLFilters"); + System.err + .println("\t-normalize\t\tnormalize URL via current URLNormalizers"); + System.err + .println("\t-slice NNNN\tcreate many output segments, each containing NNNN URLs"); return; } Configuration conf = NutchConfiguration.create(); @@ -688,7 +737,8 @@ public class SegmentMerger extends Confi return; } SegmentMerger merger = new SegmentMerger(conf); - merger.merge(out, segs.toArray(new Path[segs.size()]), filter, normalize, sliceSize); + merger.merge(out, segs.toArray(new Path[segs.size()]), filter, normalize, + sliceSize); } } Modified: nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java (original) +++ nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java Thu Jan 29 05:38:59 2015 @@ -30,16 +30,16 @@ public class SegmentPart { public String segmentName; /** Name of the segment part (ie. one of subdirectories inside a segment). */ public String partName; - + public SegmentPart() { - + } - + public SegmentPart(String segmentName, String partName) { this.segmentName = segmentName; this.partName = partName; } - + /** * Return a String representation of this class, in the form * "segmentName/partName". @@ -47,23 +47,27 @@ public class SegmentPart { public String toString() { return segmentName + "/" + partName; } - + /** * Create SegmentPart from a FileSplit. + * * @param split - * @return A {@link SegmentPart} resultant from a - * {@link FileSplit}. + * @return A {@link SegmentPart} resultant from a {@link FileSplit}. * @throws Exception */ public static SegmentPart get(FileSplit split) throws IOException { return get(split.getPath().toString()); } - + /** * Create SegmentPart from a full path of a location inside any segment part. - * @param path full path into a segment part (may include "part-xxxxx" components) + * + * @param path + * full path into a segment part (may include "part-xxxxx" + * components) * @return SegmentPart instance describing this part. - * @throws IOException if any required path components are missing. + * @throws IOException + * if any required path components are missing. */ public static SegmentPart get(String path) throws IOException { // find part name @@ -87,12 +91,15 @@ public class SegmentPart { String segment = dir.substring(idx + 1); return new SegmentPart(segment, part); } - + /** * Create SegmentPart from a String in format "segmentName/partName". - * @param string input String + * + * @param string + * input String * @return parsed instance of SegmentPart - * @throws IOException if "/" is missing. + * @throws IOException + * if "/" is missing. */ public static SegmentPart parse(String string) throws IOException { int idx = string.indexOf('/'); Modified: nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java (original) +++ nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java Thu Jan 29 05:38:59 2015 @@ -75,7 +75,7 @@ public class SegmentReader extends Confi public static final Logger LOG = LoggerFactory.getLogger(SegmentReader.class); long recNo = 0L; - + private boolean co, fe, ge, pa, pd, pt; private FileSystem fs; @@ -84,33 +84,38 @@ public class SegmentReader extends Confi private Text newKey = new Text(); public void map(WritableComparable<?> key, Writable value, - OutputCollector<Text, NutchWritable> collector, Reporter reporter) throws IOException { + OutputCollector<Text, NutchWritable> collector, Reporter reporter) + throws IOException { // convert on the fly from old formats with UTF8 keys. // UTF8 deprecated and replaced by Text. if (key instanceof Text) { newKey.set(key.toString()); key = newKey; } - collector.collect((Text)key, new NutchWritable(value)); + collector.collect((Text) key, new NutchWritable(value)); } - + } /** Implements a text output format */ public static class TextOutputFormat extends FileOutputFormat<WritableComparable<?>, Writable> { public RecordWriter<WritableComparable<?>, Writable> getRecordWriter( - final FileSystem fs, JobConf job, - String name, final Progressable progress) throws IOException { + final FileSystem fs, JobConf job, String name, + final Progressable progress) throws IOException { - final Path segmentDumpFile = new Path(FileOutputFormat.getOutputPath(job), name); + final Path segmentDumpFile = new Path( + FileOutputFormat.getOutputPath(job), name); // Get the old copy out of the way - if (fs.exists(segmentDumpFile)) fs.delete(segmentDumpFile, true); + if (fs.exists(segmentDumpFile)) + fs.delete(segmentDumpFile, true); - final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile)); + final PrintStream printStream = new PrintStream( + fs.create(segmentDumpFile)); return new RecordWriter<WritableComparable<?>, Writable>() { - public synchronized void write(WritableComparable<?> key, Writable value) throws IOException { + public synchronized void write(WritableComparable<?> key, Writable value) + throws IOException { printStream.println(value); } @@ -124,9 +129,9 @@ public class SegmentReader extends Confi public SegmentReader() { super(null); } - - public SegmentReader(Configuration conf, boolean co, boolean fe, boolean ge, boolean pa, - boolean pd, boolean pt) { + + public SegmentReader(Configuration conf, boolean co, boolean fe, boolean ge, + boolean pa, boolean pd, boolean pt) { super(conf); this.co = co; this.fe = fe; @@ -166,12 +171,12 @@ public class SegmentReader extends Confi job.setBoolean("segment.reader.pt", this.pt); return job; } - - public void close() {} + + public void close() { + } public void reduce(Text key, Iterator<NutchWritable> values, - OutputCollector<Text, Text> output, Reporter reporter) - throws IOException { + OutputCollector<Text, Text> output, Reporter reporter) throws IOException { StringBuffer dump = new StringBuffer(); dump.append("\nRecno:: ").append(recNo++).append("\n"); @@ -194,7 +199,7 @@ public class SegmentReader extends Confi } public void dump(Path segment, Path output) throws IOException { - + if (LOG.isInfoEnabled()) { LOG.info("SegmentReader: dump segment: " + segment); } @@ -202,20 +207,30 @@ public class SegmentReader extends Confi JobConf job = createJobConf(); job.setJobName("read " + segment); - if (ge) FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME)); - if (fe) FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.FETCH_DIR_NAME)); - if (pa) FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.PARSE_DIR_NAME)); - if (co) FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME)); - if (pd) FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME)); - if (pt) FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME)); + if (ge) + FileInputFormat.addInputPath(job, new Path(segment, + CrawlDatum.GENERATE_DIR_NAME)); + if (fe) + FileInputFormat.addInputPath(job, new Path(segment, + CrawlDatum.FETCH_DIR_NAME)); + if (pa) + FileInputFormat.addInputPath(job, new Path(segment, + CrawlDatum.PARSE_DIR_NAME)); + if (co) + FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME)); + if (pd) + FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME)); + if (pt) + FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME)); job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(InputCompatMapper.class); job.setReducerClass(SegmentReader.class); - Path tempDir = new Path(job.get("hadoop.tmp.dir", "/tmp") + "/segread-" + new java.util.Random().nextInt()); + Path tempDir = new Path(job.get("hadoop.tmp.dir", "/tmp") + "/segread-" + + new java.util.Random().nextInt()); fs.delete(tempDir, true); - + FileOutputFormat.setOutputPath(job, tempDir); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); @@ -228,22 +243,25 @@ public class SegmentReader extends Confi // remove the old file fs.delete(dumpFile, true); - FileStatus[] fstats = fs.listStatus(tempDir, HadoopFSUtil.getPassAllFilter()); + FileStatus[] fstats = fs.listStatus(tempDir, + HadoopFSUtil.getPassAllFilter()); Path[] files = HadoopFSUtil.getPaths(fstats); PrintWriter writer = null; int currentRecordNumber = 0; if (files.length > 0) { - writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(fs.create(dumpFile)))); + writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter( + fs.create(dumpFile)))); try { for (int i = 0; i < files.length; i++) { Path partFile = files[i]; try { - currentRecordNumber = append(fs, job, partFile, writer, currentRecordNumber); + currentRecordNumber = append(fs, job, partFile, writer, + currentRecordNumber); } catch (IOException exception) { if (LOG.isWarnEnabled()) { - LOG.warn("Couldn't copy the content of " + partFile.toString() + - " into " + dumpFile.toString()); + LOG.warn("Couldn't copy the content of " + partFile.toString() + + " into " + dumpFile.toString()); LOG.warn(exception.getMessage()); } } @@ -253,13 +271,16 @@ public class SegmentReader extends Confi } } fs.delete(tempDir, true); - if (LOG.isInfoEnabled()) { LOG.info("SegmentReader: done"); } + if (LOG.isInfoEnabled()) { + LOG.info("SegmentReader: done"); + } } /** Appends two files and updates the Recno counter */ - private int append(FileSystem fs, Configuration conf, Path src, PrintWriter writer, int currentRecordNumber) - throws IOException { - BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(src))); + private int append(FileSystem fs, Configuration conf, Path src, + PrintWriter writer, int currentRecordNumber) throws IOException { + BufferedReader reader = new BufferedReader(new InputStreamReader( + fs.open(src))); try { String line = reader.readLine(); while (line != null) { @@ -276,89 +297,101 @@ public class SegmentReader extends Confi } private static final String[][] keys = new String[][] { - {"co", "Content::\n"}, - {"ge", "Crawl Generate::\n"}, - {"fe", "Crawl Fetch::\n"}, - {"pa", "Crawl Parse::\n"}, - {"pd", "ParseData::\n"}, - {"pt", "ParseText::\n"} - }; + { "co", "Content::\n" }, { "ge", "Crawl Generate::\n" }, + { "fe", "Crawl Fetch::\n" }, { "pa", "Crawl Parse::\n" }, + { "pd", "ParseData::\n" }, { "pt", "ParseText::\n" } }; public void get(final Path segment, final Text key, Writer writer, - final Map<String, List<Writable>> results) throws Exception { + final Map<String, List<Writable>> results) throws Exception { LOG.info("SegmentReader: get '" + key + "'"); ArrayList<Thread> threads = new ArrayList<Thread>(); - if (co) threads.add(new Thread() { - public void run() { - try { - List<Writable> res = getMapRecords(new Path(segment, Content.DIR_NAME), key); - results.put("co", res); - } catch (Exception e) { - LOG.error("Exception:", e); - } - } - }); - if (fe) threads.add(new Thread() { - public void run() { - try { - List<Writable> res = getMapRecords(new Path(segment, CrawlDatum.FETCH_DIR_NAME), key); - results.put("fe", res); - } catch (Exception e) { - LOG.error("Exception:", e); - } - } - }); - if (ge) threads.add(new Thread() { - public void run() { - try { - List<Writable> res = getSeqRecords(new Path(segment, CrawlDatum.GENERATE_DIR_NAME), key); - results.put("ge", res); - } catch (Exception e) { - LOG.error("Exception:", e); - } - } - }); - if (pa) threads.add(new Thread() { - public void run() { - try { - List<Writable> res = getSeqRecords(new Path(segment, CrawlDatum.PARSE_DIR_NAME), key); - results.put("pa", res); - } catch (Exception e) { - LOG.error("Exception:", e); - } - } - }); - if (pd) threads.add(new Thread() { - public void run() { - try { - List<Writable> res = getMapRecords(new Path(segment, ParseData.DIR_NAME), key); - results.put("pd", res); - } catch (Exception e) { - LOG.error("Exception:", e); - } - } - }); - if (pt) threads.add(new Thread() { - public void run() { - try { - List<Writable> res = getMapRecords(new Path(segment, ParseText.DIR_NAME), key); - results.put("pt", res); - } catch (Exception e) { - LOG.error("Exception:", e); + if (co) + threads.add(new Thread() { + public void run() { + try { + List<Writable> res = getMapRecords(new Path(segment, + Content.DIR_NAME), key); + results.put("co", res); + } catch (Exception e) { + LOG.error("Exception:", e); + } } - } - }); + }); + if (fe) + threads.add(new Thread() { + public void run() { + try { + List<Writable> res = getMapRecords(new Path(segment, + CrawlDatum.FETCH_DIR_NAME), key); + results.put("fe", res); + } catch (Exception e) { + LOG.error("Exception:", e); + } + } + }); + if (ge) + threads.add(new Thread() { + public void run() { + try { + List<Writable> res = getSeqRecords(new Path(segment, + CrawlDatum.GENERATE_DIR_NAME), key); + results.put("ge", res); + } catch (Exception e) { + LOG.error("Exception:", e); + } + } + }); + if (pa) + threads.add(new Thread() { + public void run() { + try { + List<Writable> res = getSeqRecords(new Path(segment, + CrawlDatum.PARSE_DIR_NAME), key); + results.put("pa", res); + } catch (Exception e) { + LOG.error("Exception:", e); + } + } + }); + if (pd) + threads.add(new Thread() { + public void run() { + try { + List<Writable> res = getMapRecords(new Path(segment, + ParseData.DIR_NAME), key); + results.put("pd", res); + } catch (Exception e) { + LOG.error("Exception:", e); + } + } + }); + if (pt) + threads.add(new Thread() { + public void run() { + try { + List<Writable> res = getMapRecords(new Path(segment, + ParseText.DIR_NAME), key); + results.put("pt", res); + } catch (Exception e) { + LOG.error("Exception:", e); + } + } + }); Iterator<Thread> it = threads.iterator(); - while (it.hasNext()) it.next().start(); + while (it.hasNext()) + it.next().start(); int cnt; do { cnt = 0; try { Thread.sleep(5000); - } catch (Exception e) {}; + } catch (Exception e) { + } + ; it = threads.iterator(); while (it.hasNext()) { - if (it.next().isAlive()) cnt++; + if (it.next().isAlive()) + cnt++; } if ((cnt > 0) && (LOG.isDebugEnabled())) { LOG.debug("(" + cnt + " to retrieve)"); @@ -375,24 +408,25 @@ public class SegmentReader extends Confi writer.flush(); } } - + private List<Writable> getMapRecords(Path dir, Text key) throws Exception { - MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir, getConf()); + MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir, + getConf()); ArrayList<Writable> res = new ArrayList<Writable>(); Class<?> keyClass = readers[0].getKeyClass(); Class<?> valueClass = readers[0].getValueClass(); if (!keyClass.getName().equals("org.apache.hadoop.io.Text")) throw new IOException("Incompatible key (" + keyClass.getName() + ")"); - Writable value = (Writable)valueClass.newInstance(); + Writable value = (Writable) valueClass.newInstance(); // we don't know the partitioning schema for (int i = 0; i < readers.length; i++) { if (readers[i].get(key, value) != null) { res.add(value); - value = (Writable)valueClass.newInstance(); + value = (Writable) valueClass.newInstance(); Text aKey = (Text) keyClass.newInstance(); while (readers[i].next(aKey, value) && aKey.equals(key)) { res.add(value); - value = (Writable)valueClass.newInstance(); + value = (Writable) valueClass.newInstance(); } } readers[i].close(); @@ -401,19 +435,20 @@ public class SegmentReader extends Confi } private List<Writable> getSeqRecords(Path dir, Text key) throws Exception { - SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(getConf(), dir); + SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders( + getConf(), dir); ArrayList<Writable> res = new ArrayList<Writable>(); Class<?> keyClass = readers[0].getKeyClass(); Class<?> valueClass = readers[0].getValueClass(); if (!keyClass.getName().equals("org.apache.hadoop.io.Text")) throw new IOException("Incompatible key (" + keyClass.getName() + ")"); - Writable aKey = (Writable)keyClass.newInstance(); - Writable value = (Writable)valueClass.newInstance(); + Writable aKey = (Writable) keyClass.newInstance(); + Writable value = (Writable) valueClass.newInstance(); for (int i = 0; i < readers.length; i++) { while (readers[i].next(aKey, value)) { if (aKey.equals(key)) { res.add(value); - value = (Writable)valueClass.newInstance(); + value = (Writable) valueClass.newInstance(); } } readers[i].close(); @@ -430,41 +465,55 @@ public class SegmentReader extends Confi public long parsed = -1L; public long parseErrors = -1L; } - + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); - + public void list(List<Path> dirs, Writer writer) throws Exception { - writer.write("NAME\t\tGENERATED\tFETCHER START\t\tFETCHER END\t\tFETCHED\tPARSED\n"); + writer + .write("NAME\t\tGENERATED\tFETCHER START\t\tFETCHER END\t\tFETCHED\tPARSED\n"); for (int i = 0; i < dirs.size(); i++) { Path dir = dirs.get(i); SegmentReaderStats stats = new SegmentReaderStats(); getStats(dir, stats); writer.write(dir.getName() + "\t"); - if (stats.generated == -1) writer.write("?"); - else writer.write(stats.generated + ""); + if (stats.generated == -1) + writer.write("?"); + else + writer.write(stats.generated + ""); writer.write("\t\t"); - if (stats.start == -1) writer.write("?\t"); - else writer.write(sdf.format(new Date(stats.start))); + if (stats.start == -1) + writer.write("?\t"); + else + writer.write(sdf.format(new Date(stats.start))); writer.write("\t"); - if (stats.end == -1) writer.write("?"); - else writer.write(sdf.format(new Date(stats.end))); + if (stats.end == -1) + writer.write("?"); + else + writer.write(sdf.format(new Date(stats.end))); writer.write("\t"); - if (stats.fetched == -1) writer.write("?"); - else writer.write(stats.fetched + ""); + if (stats.fetched == -1) + writer.write("?"); + else + writer.write(stats.fetched + ""); writer.write("\t"); - if (stats.parsed == -1) writer.write("?"); - else writer.write(stats.parsed + ""); + if (stats.parsed == -1) + writer.write("?"); + else + writer.write(stats.parsed + ""); writer.write("\n"); writer.flush(); } } - - public void getStats(Path segment, final SegmentReaderStats stats) throws Exception { - SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(getConf(), new Path(segment, CrawlDatum.GENERATE_DIR_NAME)); + + public void getStats(Path segment, final SegmentReaderStats stats) + throws Exception { + SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders( + getConf(), new Path(segment, CrawlDatum.GENERATE_DIR_NAME)); long cnt = 0L; Text key = new Text(); for (int i = 0; i < readers.length; i++) { - while (readers[i].next(key)) cnt++; + while (readers[i].next(key)) + cnt++; readers[i].close(); } stats.generated = cnt; @@ -474,12 +523,15 @@ public class SegmentReader extends Confi long start = Long.MAX_VALUE; long end = Long.MIN_VALUE; CrawlDatum value = new CrawlDatum(); - MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, fetchDir, getConf()); + MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, fetchDir, + getConf()); for (int i = 0; i < mreaders.length; i++) { while (mreaders[i].next(key, value)) { cnt++; - if (value.getFetchTime() < start) start = value.getFetchTime(); - if (value.getFetchTime() > end) end = value.getFetchTime(); + if (value.getFetchTime() < start) + start = value.getFetchTime(); + if (value.getFetchTime() > end) + end = value.getFetchTime(); } mreaders[i].close(); } @@ -492,11 +544,13 @@ public class SegmentReader extends Confi cnt = 0L; long errors = 0L; ParseData value = new ParseData(); - MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, parseDir, getConf()); + MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, parseDir, + getConf()); for (int i = 0; i < mreaders.length; i++) { while (mreaders[i].next(key, value)) { cnt++; - if (!value.getStatus().isSuccess()) errors++; + if (!value.getStatus().isSuccess()) + errors++; } mreaders[i].close(); } @@ -504,7 +558,7 @@ public class SegmentReader extends Confi stats.parseErrors = errors; } } - + private static final int MODE_DUMP = 0; private static final int MODE_LIST = 1; @@ -521,7 +575,8 @@ public class SegmentReader extends Confi mode = MODE_DUMP; else if (args[0].equals("-list")) mode = MODE_LIST; - else if (args[0].equals("-get")) mode = MODE_GET; + else if (args[0].equals("-get")) + mode = MODE_GET; boolean co = true; boolean fe = true; @@ -553,63 +608,69 @@ public class SegmentReader extends Confi } Configuration conf = NutchConfiguration.create(); final FileSystem fs = FileSystem.get(conf); - SegmentReader segmentReader = new SegmentReader(conf, co, fe, ge, pa, pd, pt); + SegmentReader segmentReader = new SegmentReader(conf, co, fe, ge, pa, pd, + pt); // collect required args switch (mode) { - case MODE_DUMP: - String input = args[1]; - if (input == null) { - System.err.println("Missing required argument: <segment_dir>"); - usage(); - return; - } - String output = args.length > 2 ? args[2] : null; - if (output == null) { - System.err.println("Missing required argument: <output>"); - usage(); - return; - } - segmentReader.dump(new Path(input), new Path(output)); + case MODE_DUMP: + String input = args[1]; + if (input == null) { + System.err.println("Missing required argument: <segment_dir>"); + usage(); return; - case MODE_LIST: - ArrayList<Path> dirs = new ArrayList<Path>(); - for (int i = 1; i < args.length; i++) { - if (args[i] == null) continue; - if (args[i].equals("-dir")) { - Path dir = new Path(args[++i]); - FileStatus[] fstats = fs.listStatus(dir, HadoopFSUtil.getPassDirectoriesFilter(fs)); - Path[] files = HadoopFSUtil.getPaths(fstats); - if (files != null && files.length > 0) { - dirs.addAll(Arrays.asList(files)); - } - } else dirs.add(new Path(args[i])); - } - segmentReader.list(dirs, new OutputStreamWriter(System.out, "UTF-8")); + } + String output = args.length > 2 ? args[2] : null; + if (output == null) { + System.err.println("Missing required argument: <output>"); + usage(); return; - case MODE_GET: - input = args[1]; - if (input == null) { - System.err.println("Missing required argument: <segment_dir>"); - usage(); - return; - } - String key = args.length > 2 ? args[2] : null; - if (key == null) { - System.err.println("Missing required argument: <keyValue>"); - usage(); - return; - } - segmentReader.get(new Path(input), new Text(key), new OutputStreamWriter(System.out, "UTF-8"), new HashMap<String, List<Writable>>()); + } + segmentReader.dump(new Path(input), new Path(output)); + return; + case MODE_LIST: + ArrayList<Path> dirs = new ArrayList<Path>(); + for (int i = 1; i < args.length; i++) { + if (args[i] == null) + continue; + if (args[i].equals("-dir")) { + Path dir = new Path(args[++i]); + FileStatus[] fstats = fs.listStatus(dir, + HadoopFSUtil.getPassDirectoriesFilter(fs)); + Path[] files = HadoopFSUtil.getPaths(fstats); + if (files != null && files.length > 0) { + dirs.addAll(Arrays.asList(files)); + } + } else + dirs.add(new Path(args[i])); + } + segmentReader.list(dirs, new OutputStreamWriter(System.out, "UTF-8")); + return; + case MODE_GET: + input = args[1]; + if (input == null) { + System.err.println("Missing required argument: <segment_dir>"); + usage(); return; - default: - System.err.println("Invalid operation: " + args[0]); + } + String key = args.length > 2 ? args[2] : null; + if (key == null) { + System.err.println("Missing required argument: <keyValue>"); usage(); return; + } + segmentReader.get(new Path(input), new Text(key), new OutputStreamWriter( + System.out, "UTF-8"), new HashMap<String, List<Writable>>()); + return; + default: + System.err.println("Invalid operation: " + args[0]); + usage(); + return; } } private static void usage() { - System.err.println("Usage: SegmentReader (-dump ... | -list ... | -get ...) [general options]\n"); + System.err + .println("Usage: SegmentReader (-dump ... | -list ... | -get ...) [general options]\n"); System.err.println("* General options:"); System.err.println("\t-nocontent\tignore content directory"); System.err.println("\t-nofetch\tignore crawl_fetch directory"); @@ -618,21 +679,32 @@ public class SegmentReader extends Confi System.err.println("\t-noparsedata\tignore parse_data directory"); System.err.println("\t-noparsetext\tignore parse_text directory"); System.err.println(); - System.err.println("* SegmentReader -dump <segment_dir> <output> [general options]"); - System.err.println(" Dumps content of a <segment_dir> as a text file to <output>.\n"); + System.err + .println("* SegmentReader -dump <segment_dir> <output> [general options]"); + System.err + .println(" Dumps content of a <segment_dir> as a text file to <output>.\n"); System.err.println("\t<segment_dir>\tname of the segment directory."); - System.err.println("\t<output>\tname of the (non-existent) output directory."); + System.err + .println("\t<output>\tname of the (non-existent) output directory."); System.err.println(); - System.err.println("* SegmentReader -list (<segment_dir1> ... | -dir <segments>) [general options]"); - System.err.println(" List a synopsis of segments in specified directories, or all segments in"); - System.err.println(" a directory <segments>, and print it on System.out\n"); - System.err.println("\t<segment_dir1> ...\tlist of segment directories to process"); - System.err.println("\t-dir <segments>\t\tdirectory that contains multiple segments"); + System.err + .println("* SegmentReader -list (<segment_dir1> ... | -dir <segments>) [general options]"); + System.err + .println(" List a synopsis of segments in specified directories, or all segments in"); + System.err + .println(" a directory <segments>, and print it on System.out\n"); + System.err + .println("\t<segment_dir1> ...\tlist of segment directories to process"); + System.err + .println("\t-dir <segments>\t\tdirectory that contains multiple segments"); System.err.println(); - System.err.println("* SegmentReader -get <segment_dir> <keyValue> [general options]"); - System.err.println(" Get a specified record from a segment, and print it on System.out.\n"); + System.err + .println("* SegmentReader -get <segment_dir> <keyValue> [general options]"); + System.err + .println(" Get a specified record from a segment, and print it on System.out.\n"); System.err.println("\t<segment_dir>\tname of the segment directory."); System.err.println("\t<keyValue>\tvalue of the key (url)."); - System.err.println("\t\tNote: put double-quotes around strings with spaces."); + System.err + .println("\t\tNote: put double-quotes around strings with spaces."); } } Modified: nutch/trunk/src/java/org/apache/nutch/segment/package-info.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/segment/package-info.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/segment/package-info.java (original) +++ nutch/trunk/src/java/org/apache/nutch/segment/package-info.java Thu Jan 29 05:38:59 2015 @@ -20,3 +20,4 @@ * fetch list, protocol status, raw content, parsed content, and extracted outgoing links. */ package org.apache.nutch.segment; + Modified: nutch/trunk/src/java/org/apache/nutch/tools/Benchmark.java URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/Benchmark.java?rev=1655526&r1=1655525&r2=1655526&view=diff ============================================================================== --- nutch/trunk/src/java/org/apache/nutch/tools/Benchmark.java (original) +++ nutch/trunk/src/java/org/apache/nutch/tools/Benchmark.java Thu Jan 29 05:38:59 2015 @@ -52,13 +52,14 @@ public class Benchmark extends Configure int res = ToolRunner.run(conf, new Benchmark(), args); System.exit(res); } - + private static String getDate() { - return new SimpleDateFormat("yyyyMMddHHmmss").format - (new Date(System.currentTimeMillis())); + return new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System + .currentTimeMillis())); } - - private void createSeeds(FileSystem fs, Path seedsDir, int count) throws Exception { + + private void createSeeds(FileSystem fs, Path seedsDir, int count) + throws Exception { OutputStream os = fs.create(new Path(seedsDir, "seeds")); for (int i = 0; i < count; i++) { String url = "http://www.test-" + i + ".com/\r\n"; @@ -67,9 +68,9 @@ public class Benchmark extends Configure os.flush(); os.close(); } - + public static final class BenchmarkResults { - Map<String,Map<String,Long>> timings = new HashMap<String,Map<String,Long>>(); + Map<String, Map<String, Long>> timings = new HashMap<String, Map<String, Long>>(); List<String> runs = new ArrayList<String>(); List<String> stages = new ArrayList<String>(); int seeds, depth, threads; @@ -77,7 +78,7 @@ public class Benchmark extends Configure long topN; long elapsed; String plugins; - + public void addTiming(String stage, String run, long timing) { if (!runs.contains(run)) { runs.add(run); @@ -85,14 +86,14 @@ public class Benchmark extends Configure if (!stages.contains(stage)) { stages.add(stage); } - Map<String,Long> t = timings.get(stage); + Map<String, Long> t = timings.get(stage); if (t == null) { - t = new HashMap<String,Long>(); + t = new HashMap<String, Long>(); timings.put(stage, t); } t.put(run, timing); } - + public String toString() { StringBuilder sb = new StringBuilder(); sb.append("* Plugins:\t" + plugins + "\n"); @@ -103,8 +104,9 @@ public class Benchmark extends Configure sb.append("* Delete:\t" + delete + "\n"); sb.append("* TOTAL ELAPSED:\t" + elapsed + "\n"); for (String stage : stages) { - Map<String,Long> timing = timings.get(stage); - if (timing == null) continue; + Map<String, Long> timing = timings.get(stage); + if (timing == null) + continue; sb.append("- stage: " + stage + "\n"); for (String r : runs) { Long Time = timing.get(r); @@ -116,15 +118,16 @@ public class Benchmark extends Configure } return sb.toString(); } - + public List<String> getStages() { return stages; } + public List<String> getRuns() { return runs; } } - + public int run(String[] args) throws Exception { String plugins = "protocol-http|parse-tika|scoring-opic|urlfilter-regex|urlnormalizer-pass"; int seeds = 1; @@ -132,17 +135,24 @@ public class Benchmark extends Configure int threads = 10; boolean delete = true; long topN = Long.MAX_VALUE; - + if (args.length == 0) { - System.err.println("Usage: Benchmark [-seeds NN] [-depth NN] [-threads NN] [-keep] [-maxPerHost NN] [-plugins <regex>]"); - System.err.println("\t-seeds NN\tcreate NN unique hosts in a seed list (default: 1)"); + System.err + .println("Usage: Benchmark [-seeds NN] [-depth NN] [-threads NN] [-keep] [-maxPerHost NN] [-plugins <regex>]"); + System.err + .println("\t-seeds NN\tcreate NN unique hosts in a seed list (default: 1)"); System.err.println("\t-depth NN\tperform NN crawl cycles (default: 10)"); - System.err.println("\t-threads NN\tuse NN threads per Fetcher task (default: 10)"); - System.err.println("\t-keep\tkeep segment data (default: delete after updatedb)"); + System.err + .println("\t-threads NN\tuse NN threads per Fetcher task (default: 10)"); + System.err + .println("\t-keep\tkeep segment data (default: delete after updatedb)"); System.err.println("\t-plugins <regex>\toverride 'plugin.includes'."); - System.err.println("\tNOTE: if not specified, this is reset to: " + plugins); - System.err.println("\tNOTE: if 'default' is specified then a value set in nutch-default/nutch-site is used."); - System.err.println("\t-maxPerHost NN\tmax. # of URLs per host in a fetchlist"); + System.err.println("\tNOTE: if not specified, this is reset to: " + + plugins); + System.err + .println("\tNOTE: if 'default' is specified then a value set in nutch-default/nutch-site is used."); + System.err + .println("\t-maxPerHost NN\tmax. # of URLs per host in a fetchlist"); return -1; } int maxPerHost = Integer.MAX_VALUE; @@ -164,13 +174,15 @@ public class Benchmark extends Configure return -1; } } - BenchmarkResults res = benchmark(seeds, depth, threads, maxPerHost, topN, delete, plugins); + BenchmarkResults res = benchmark(seeds, depth, threads, maxPerHost, topN, + delete, plugins); System.out.println(res); return 0; } - - public BenchmarkResults benchmark(int seeds, int depth, int threads, int maxPerHost, - long topN, boolean delete, String plugins) throws Exception { + + public BenchmarkResults benchmark(int seeds, int depth, int threads, + int maxPerHost, long topN, boolean delete, String plugins) + throws Exception { Configuration conf = getConf(); conf.set("http.proxy.host", "localhost"); conf.setInt("http.proxy.port", 8181); @@ -180,11 +192,12 @@ public class Benchmark extends Configure conf.set("plugin.includes", plugins); } conf.setInt(Generator.GENERATOR_MAX_COUNT, maxPerHost); - conf.set(Generator.GENERATOR_COUNT_MODE, Generator.GENERATOR_COUNT_VALUE_HOST); - JobConf job = new NutchJob(getConf()); + conf.set(Generator.GENERATOR_COUNT_MODE, + Generator.GENERATOR_COUNT_VALUE_HOST); + JobConf job = new NutchJob(getConf()); FileSystem fs = FileSystem.get(job); - Path dir = new Path(getConf().get("hadoop.tmp.dir"), - "bench-" + System.currentTimeMillis()); + Path dir = new Path(getConf().get("hadoop.tmp.dir"), "bench-" + + System.currentTimeMillis()); fs.mkdirs(dir); Path rootUrlDir = new Path(dir, "seed"); fs.mkdirs(rootUrlDir); @@ -194,7 +207,7 @@ public class Benchmark extends Configure LOG.info("crawl started in: " + dir); LOG.info("rootUrlDir = " + rootUrlDir); LOG.info("threads = " + threads); - LOG.info("depth = " + depth); + LOG.info("depth = " + depth); } BenchmarkResults res = new BenchmarkResults(); res.delete = delete; @@ -213,17 +226,17 @@ public class Benchmark extends Configure ParseSegment parseSegment = new ParseSegment(getConf()); CrawlDb crawlDbTool = new CrawlDb(getConf()); LinkDb linkDbTool = new LinkDb(getConf()); - + // initialize crawlDb long start = System.currentTimeMillis(); injector.inject(crawlDb, rootUrlDir); long delta = System.currentTimeMillis() - start; res.addTiming("inject", "0", delta); int i; - for (i = 0; i < depth; i++) { // generate new segment + for (i = 0; i < depth; i++) { // generate new segment start = System.currentTimeMillis(); - Path[] segs = generator.generate(crawlDb, segments, -1, topN, System - .currentTimeMillis()); + Path[] segs = generator.generate(crawlDb, segments, -1, topN, + System.currentTimeMillis()); delta = System.currentTimeMillis() - start; res.addTiming("generate", i + "", delta); if (segs == null) { @@ -231,12 +244,12 @@ public class Benchmark extends Configure break; } start = System.currentTimeMillis(); - fetcher.fetch(segs[0], threads); // fetch it + fetcher.fetch(segs[0], threads); // fetch it delta = System.currentTimeMillis() - start; res.addTiming("fetch", i + "", delta); if (!Fetcher.isParsing(job)) { start = System.currentTimeMillis(); - parseSegment.parse(segs[0]); // parse it, if needed + parseSegment.parse(segs[0]); // parse it, if needed delta = System.currentTimeMillis() - start; res.addTiming("parse", i + "", delta); } @@ -258,7 +271,9 @@ public class Benchmark extends Configure if (i == 0) { LOG.warn("No URLs to fetch - check your seed list and URL filters."); } - if (LOG.isInfoEnabled()) { LOG.info("crawl finished: " + dir); } + if (LOG.isInfoEnabled()) { + LOG.info("crawl finished: " + dir); + } res.elapsed = System.currentTimeMillis() - res.elapsed; CrawlDbReader dbreader = new CrawlDbReader(); dbreader.processStatJob(crawlDb.toString(), conf, false);
