http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/Node.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/Node.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/Node.java new file mode 100644 index 0000000..a35e842 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/Node.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.scoring.webgraph; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.nutch.metadata.Metadata; + +/** + * A class which holds the number of inlinks and outlinks for a given url along + * with an inlink score from a link analysis program and any metadata. + * + * The Node is the core unit of the NodeDb in the WebGraph. + */ +public class Node implements Writable { + + private int numInlinks = 0; + private int numOutlinks = 0; + private float inlinkScore = 1.0f; + private Metadata metadata = new Metadata(); + + public Node() { + + } + + public int getNumInlinks() { + return numInlinks; + } + + public void setNumInlinks(int numInlinks) { + this.numInlinks = numInlinks; + } + + public int getNumOutlinks() { + return numOutlinks; + } + + public void setNumOutlinks(int numOutlinks) { + this.numOutlinks = numOutlinks; + } + + public float getInlinkScore() { + return inlinkScore; + } + + public void setInlinkScore(float inlinkScore) { + this.inlinkScore = inlinkScore; + } + + public float getOutlinkScore() { + return (numOutlinks > 0) ? inlinkScore / numOutlinks : inlinkScore; + } + + public Metadata getMetadata() { + return metadata; + } + + public void setMetadata(Metadata metadata) { + this.metadata = metadata; + } + + public void readFields(DataInput in) throws IOException { + + numInlinks = in.readInt(); + numOutlinks = in.readInt(); + inlinkScore = in.readFloat(); + metadata.clear(); + metadata.readFields(in); + } + + public void write(DataOutput out) throws IOException { + + out.writeInt(numInlinks); + out.writeInt(numOutlinks); + out.writeFloat(inlinkScore); + metadata.write(out); + } + + public String toString() { + return "num inlinks: " + numInlinks + ", num outlinks: " + numOutlinks + + ", inlink score: " + inlinkScore + ", outlink score: " + + getOutlinkScore() + ", metadata: " + metadata.toString(); + } + +}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeDumper.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeDumper.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeDumper.java new file mode 100644 index 0000000..4a57c29 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeDumper.java @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.scoring.webgraph; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Iterator; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.TimingUtil; +import org.apache.nutch.util.URLUtil; + +/** + * A tools that dumps out the top urls by number of inlinks, number of outlinks, + * or by score, to a text file. One of the major uses of this tool is to check + * the top scoring urls of a link analysis program such as LinkRank. + * + * For number of inlinks or number of outlinks the WebGraph program will need to + * have been run. For link analysis score a program such as LinkRank will need + * to have been run which updates the NodeDb of the WebGraph. + */ +public class NodeDumper extends Configured implements Tool { + + public static final Logger LOG = LoggerFactory.getLogger(NodeDumper.class); + + private static enum DumpType { + INLINKS, OUTLINKS, SCORES + } + + private static enum AggrType { + SUM, MAX + } + + private static enum NameType { + HOST, DOMAIN + } + + /** + * Outputs the top urls sorted in descending order. Depending on the flag set + * on the command line, the top urls could be for number of inlinks, for + * number of outlinks, or for link analysis score. + */ + public static class Sorter extends Configured implements + Mapper<Text, Node, FloatWritable, Text>, + Reducer<FloatWritable, Text, Text, FloatWritable> { + + private JobConf conf; + private boolean inlinks = false; + private boolean outlinks = false; + private boolean scores = false; + private long topn = Long.MAX_VALUE; + + /** + * Configures the job, sets the flag for type of content and the topN number + * if any. + */ + public void configure(JobConf conf) { + this.conf = conf; + this.inlinks = conf.getBoolean("inlinks", false); + this.outlinks = conf.getBoolean("outlinks", false); + this.scores = conf.getBoolean("scores", true); + this.topn = conf.getLong("topn", Long.MAX_VALUE); + } + + public void close() { + } + + /** + * Outputs the url with the appropriate number of inlinks, outlinks, or for + * score. + */ + public void map(Text key, Node node, + OutputCollector<FloatWritable, Text> output, Reporter reporter) + throws IOException { + + float number = 0; + if (inlinks) { + number = node.getNumInlinks(); + } else if (outlinks) { + number = node.getNumOutlinks(); + } else { + number = node.getInlinkScore(); + } + + // number collected with negative to be descending + output.collect(new FloatWritable(-number), key); + } + + /** + * Flips and collects the url and numeric sort value. + */ + public void reduce(FloatWritable key, Iterator<Text> values, + OutputCollector<Text, FloatWritable> output, Reporter reporter) + throws IOException { + + // take the negative of the negative to get original value, sometimes 0 + // value are a little weird + float val = key.get(); + FloatWritable number = new FloatWritable(val == 0 ? 0 : -val); + long numCollected = 0; + + // collect all values, this time with the url as key + while (values.hasNext() && (numCollected < topn)) { + Text url = WritableUtils.clone(values.next(), conf); + output.collect(url, number); + numCollected++; + } + } + } + + /** + * Outputs the hosts or domains with an associated value. This value consists + * of either the number of inlinks, the number of outlinks or the score. The + * computed value is then either the sum of all parts or the top value. + */ + public static class Dumper extends Configured implements + Mapper<Text, Node, Text, FloatWritable>, + Reducer<Text, FloatWritable, Text, FloatWritable> { + + private JobConf conf; + private boolean inlinks = false; + private boolean outlinks = false; + private boolean scores = false; + private long topn = Long.MAX_VALUE; + private boolean host = false; + private boolean domain = false; + private boolean sum = false; + private boolean max = false; + + public void configure(JobConf conf) { + this.conf = conf; + this.inlinks = conf.getBoolean("inlinks", false); + this.outlinks = conf.getBoolean("outlinks", false); + this.scores = conf.getBoolean("scores", true); + this.topn = conf.getLong("topn", Long.MAX_VALUE); + this.host = conf.getBoolean("host", false); + this.domain = conf.getBoolean("domain", false); + this.sum = conf.getBoolean("sum", false); + this.max = conf.getBoolean("max", false); + } + + public void close() { + } + + /** + * Outputs the host or domain as key for this record and numInlinks, + * numOutlinks or score as the value. + */ + public void map(Text key, Node node, + OutputCollector<Text, FloatWritable> output, Reporter reporter) + throws IOException { + + float number = 0; + if (inlinks) { + number = node.getNumInlinks(); + } else if (outlinks) { + number = node.getNumOutlinks(); + } else { + number = node.getInlinkScore(); + } + + if (host) { + key.set(URLUtil.getHost(key.toString())); + } else { + key.set(URLUtil.getDomainName(key.toString())); + } + + output.collect(key, new FloatWritable(number)); + } + + /** + * Outputs either the sum or the top value for this record. + */ + public void reduce(Text key, Iterator<FloatWritable> values, + OutputCollector<Text, FloatWritable> output, Reporter reporter) + throws IOException { + + long numCollected = 0; + float sumOrMax = 0; + float val = 0; + + // collect all values, this time with the url as key + while (values.hasNext() && (numCollected < topn)) { + val = values.next().get(); + + if (sum) { + sumOrMax += val; + } else { + if (sumOrMax < val) { + sumOrMax = val; + } + } + + numCollected++; + } + + output.collect(key, new FloatWritable(sumOrMax)); + } + } + + /** + * Runs the process to dump the top urls out to a text file. + * + * @param webGraphDb + * The WebGraph from which to pull values. + * + * @param topN + * @param output + * + * @throws IOException + * If an error occurs while dumping the top values. + */ + public void dumpNodes(Path webGraphDb, DumpType type, long topN, Path output, + boolean asEff, NameType nameType, AggrType aggrType, + boolean asSequenceFile) throws Exception { + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("NodeDumper: starting at " + sdf.format(start)); + Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR); + Configuration conf = getConf(); + + JobConf dumper = new NutchJob(conf); + dumper.setJobName("NodeDumper: " + webGraphDb); + FileInputFormat.addInputPath(dumper, nodeDb); + dumper.setInputFormat(SequenceFileInputFormat.class); + + if (nameType == null) { + dumper.setMapperClass(Sorter.class); + dumper.setReducerClass(Sorter.class); + dumper.setMapOutputKeyClass(FloatWritable.class); + dumper.setMapOutputValueClass(Text.class); + } else { + dumper.setMapperClass(Dumper.class); + dumper.setReducerClass(Dumper.class); + dumper.setMapOutputKeyClass(Text.class); + dumper.setMapOutputValueClass(FloatWritable.class); + } + + dumper.setOutputKeyClass(Text.class); + dumper.setOutputValueClass(FloatWritable.class); + FileOutputFormat.setOutputPath(dumper, output); + + if (asSequenceFile) { + dumper.setOutputFormat(SequenceFileOutputFormat.class); + } else { + dumper.setOutputFormat(TextOutputFormat.class); + } + + dumper.setNumReduceTasks(1); + dumper.setBoolean("inlinks", type == DumpType.INLINKS); + dumper.setBoolean("outlinks", type == DumpType.OUTLINKS); + dumper.setBoolean("scores", type == DumpType.SCORES); + + dumper.setBoolean("host", nameType == NameType.HOST); + dumper.setBoolean("domain", nameType == NameType.DOMAIN); + dumper.setBoolean("sum", aggrType == AggrType.SUM); + dumper.setBoolean("max", aggrType == AggrType.MAX); + + dumper.setLong("topn", topN); + + // Set equals-sign as separator for Solr's ExternalFileField + if (asEff) { + dumper.set("mapred.textoutputformat.separator", "="); + } + + try { + LOG.info("NodeDumper: running"); + JobClient.runJob(dumper); + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + throw e; + } + long end = System.currentTimeMillis(); + LOG.info("NodeDumper: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new NodeDumper(), + args); + System.exit(res); + } + + /** + * Runs the node dumper tool. + */ + public int run(String[] args) throws Exception { + + Options options = new Options(); + OptionBuilder.withArgName("help"); + OptionBuilder.withDescription("show this help message"); + Option helpOpts = OptionBuilder.create("help"); + options.addOption(helpOpts); + + OptionBuilder.withArgName("webgraphdb"); + OptionBuilder.hasArg(); + OptionBuilder.withDescription("the web graph database to use"); + Option webGraphDbOpts = OptionBuilder.create("webgraphdb"); + options.addOption(webGraphDbOpts); + + OptionBuilder.withArgName("inlinks"); + OptionBuilder.withDescription("show highest inlinks"); + Option inlinkOpts = OptionBuilder.create("inlinks"); + options.addOption(inlinkOpts); + + OptionBuilder.withArgName("outlinks"); + OptionBuilder.withDescription("show highest outlinks"); + Option outlinkOpts = OptionBuilder.create("outlinks"); + options.addOption(outlinkOpts); + + OptionBuilder.withArgName("scores"); + OptionBuilder.withDescription("show highest scores"); + Option scoreOpts = OptionBuilder.create("scores"); + options.addOption(scoreOpts); + + OptionBuilder.withArgName("topn"); + OptionBuilder.hasOptionalArg(); + OptionBuilder.withDescription("show topN scores"); + Option topNOpts = OptionBuilder.create("topn"); + options.addOption(topNOpts); + + OptionBuilder.withArgName("output"); + OptionBuilder.hasArg(); + OptionBuilder.withDescription("the output directory to use"); + Option outputOpts = OptionBuilder.create("output"); + options.addOption(outputOpts); + + OptionBuilder.withArgName("asEff"); + OptionBuilder + .withDescription("Solr ExternalFileField compatible output format"); + Option effOpts = OptionBuilder.create("asEff"); + options.addOption(effOpts); + + OptionBuilder.hasArgs(2); + OptionBuilder.withDescription("group <host|domain> <sum|max>"); + Option groupOpts = OptionBuilder.create("group"); + options.addOption(groupOpts); + + OptionBuilder.withArgName("asSequenceFile"); + OptionBuilder.withDescription("whether to output as a sequencefile"); + Option sequenceFileOpts = OptionBuilder.create("asSequenceFile"); + options.addOption(sequenceFileOpts); + + CommandLineParser parser = new GnuParser(); + try { + + CommandLine line = parser.parse(options, args); + if (line.hasOption("help") || !line.hasOption("webgraphdb")) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("NodeDumper", options); + return -1; + } + + String webGraphDb = line.getOptionValue("webgraphdb"); + boolean inlinks = line.hasOption("inlinks"); + boolean outlinks = line.hasOption("outlinks"); + + long topN = (line.hasOption("topn") ? Long.parseLong(line + .getOptionValue("topn")) : Long.MAX_VALUE); + + // get the correct dump type + String output = line.getOptionValue("output"); + DumpType type = (inlinks ? DumpType.INLINKS + : outlinks ? DumpType.OUTLINKS : DumpType.SCORES); + + NameType nameType = null; + AggrType aggrType = null; + String[] group = line.getOptionValues("group"); + if (group != null && group.length == 2) { + nameType = (group[0].equals("host") ? NameType.HOST : group[0] + .equals("domain") ? NameType.DOMAIN : null); + aggrType = (group[1].equals("sum") ? AggrType.SUM : group[1] + .equals("sum") ? AggrType.MAX : null); + } + + // Use ExternalFileField? + boolean asEff = line.hasOption("asEff"); + boolean asSequenceFile = line.hasOption("asSequenceFile"); + + dumpNodes(new Path(webGraphDb), type, topN, new Path(output), asEff, + nameType, aggrType, asSequenceFile); + return 0; + } catch (Exception e) { + LOG.error("NodeDumper: " + StringUtils.stringifyException(e)); + return -2; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeReader.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeReader.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeReader.java new file mode 100644 index 0000000..e6b6815 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeReader.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.scoring.webgraph; + +import java.io.IOException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.MapFileOutputFormat; +import org.apache.hadoop.mapred.lib.HashPartitioner; +import org.apache.nutch.util.FSUtils; +import org.apache.nutch.util.NutchConfiguration; + +/** + * Reads and prints to system out information for a single node from the NodeDb + * in the WebGraph. + */ +public class NodeReader extends Configured { + + private FileSystem fs; + private MapFile.Reader[] nodeReaders; + + public NodeReader() { + + } + + public NodeReader(Configuration conf) { + super(conf); + } + + /** + * Prints the content of the Node represented by the url to system out. + * + * @param webGraphDb + * The webgraph from which to get the node. + * @param url + * The url of the node. + * + * @throws IOException + * If an error occurs while getting the node. + */ + public void dumpUrl(Path webGraphDb, String url) throws IOException { + + fs = FileSystem.get(getConf()); + nodeReaders = MapFileOutputFormat.getReaders(fs, new Path(webGraphDb, + WebGraph.NODE_DIR), getConf()); + + // open the readers, get the node, print out the info, and close the readers + Text key = new Text(url); + Node node = new Node(); + MapFileOutputFormat.getEntry(nodeReaders, + new HashPartitioner<Text, Node>(), key, node); + System.out.println(url + ":"); + System.out.println(" inlink score: " + node.getInlinkScore()); + System.out.println(" outlink score: " + node.getOutlinkScore()); + System.out.println(" num inlinks: " + node.getNumInlinks()); + System.out.println(" num outlinks: " + node.getNumOutlinks()); + FSUtils.closeReaders(nodeReaders); + } + + /** + * Runs the NodeReader tool. The command line arguments must contain a + * webgraphdb path and a url. The url must match the normalized url that is + * contained in the NodeDb of the WebGraph. + */ + public static void main(String[] args) throws Exception { + + Options options = new Options(); + OptionBuilder.withArgName("help"); + OptionBuilder.withDescription("show this help message"); + Option helpOpts = OptionBuilder.create("help"); + options.addOption(helpOpts); + + OptionBuilder.withArgName("webgraphdb"); + OptionBuilder.hasArg(); + OptionBuilder.withDescription("the webgraphdb to use"); + Option webGraphOpts = OptionBuilder.create("webgraphdb"); + options.addOption(webGraphOpts); + + OptionBuilder.withArgName("url"); + OptionBuilder.hasOptionalArg(); + OptionBuilder.withDescription("the url to dump"); + Option urlOpts = OptionBuilder.create("url"); + options.addOption(urlOpts); + + CommandLineParser parser = new GnuParser(); + try { + + // command line must take a webgraphdb and a url + CommandLine line = parser.parse(options, args); + if (line.hasOption("help") || !line.hasOption("webgraphdb") + || !line.hasOption("url")) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("WebGraphReader", options); + return; + } + + // dump the values to system out and return + String webGraphDb = line.getOptionValue("webgraphdb"); + String url = line.getOptionValue("url"); + NodeReader reader = new NodeReader(NutchConfiguration.create()); + reader.dumpUrl(new Path(webGraphDb), url); + + return; + } catch (Exception e) { + e.printStackTrace(); + return; + } + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java new file mode 100644 index 0000000..19704eb --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.scoring.webgraph; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Iterator; +import java.util.Random; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapFileOutputFormat; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.CrawlDb; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.TimingUtil; + +/** + * Updates the score from the WebGraph node database into the crawl database. + * Any score that is not in the node database is set to the clear score in the + * crawl database. + */ +public class ScoreUpdater extends Configured implements Tool, + Mapper<Text, Writable, Text, ObjectWritable>, + Reducer<Text, ObjectWritable, Text, CrawlDatum> { + + public static final Logger LOG = LoggerFactory.getLogger(ScoreUpdater.class); + + private JobConf conf; + private float clearScore = 0.0f; + + public void configure(JobConf conf) { + this.conf = conf; + clearScore = conf.getFloat("link.score.updater.clear.score", 0.0f); + } + + /** + * Changes input into ObjectWritables. + */ + public void map(Text key, Writable value, + OutputCollector<Text, ObjectWritable> output, Reporter reporter) + throws IOException { + + ObjectWritable objWrite = new ObjectWritable(); + objWrite.set(value); + output.collect(key, objWrite); + } + + /** + * Creates new CrawlDatum objects with the updated score from the NodeDb or + * with a cleared score. + */ + public void reduce(Text key, Iterator<ObjectWritable> values, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + + String url = key.toString(); + Node node = null; + CrawlDatum datum = null; + + // set the node and the crawl datum, should be one of each unless no node + // for url in the crawldb + while (values.hasNext()) { + ObjectWritable next = values.next(); + Object value = next.get(); + if (value instanceof Node) { + node = (Node) value; + } else if (value instanceof CrawlDatum) { + datum = (CrawlDatum) value; + } + } + + // datum should never be null, could happen if somehow the url was + // normalized or changed after being pulled from the crawldb + if (datum != null) { + + if (node != null) { + + // set the inlink score in the nodedb + float inlinkScore = node.getInlinkScore(); + datum.setScore(inlinkScore); + LOG.debug(url + ": setting to score " + inlinkScore); + } else { + + // clear out the score in the crawldb + datum.setScore(clearScore); + LOG.debug(url + ": setting to clear score of " + clearScore); + } + + output.collect(key, datum); + } else { + LOG.debug(url + ": no datum"); + } + } + + public void close() { + } + + /** + * Updates the inlink score in the web graph node databsae into the crawl + * database. + * + * @param crawlDb + * The crawl database to update + * @param webGraphDb + * The webgraph database to use. + * + * @throws IOException + * If an error occurs while updating the scores. + */ + public void update(Path crawlDb, Path webGraphDb) throws IOException { + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("ScoreUpdater: starting at " + sdf.format(start)); + + Configuration conf = getConf(); + FileSystem fs = FileSystem.get(conf); + + // create a temporary crawldb with the new scores + LOG.info("Running crawldb update " + crawlDb); + Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR); + Path crawlDbCurrent = new Path(crawlDb, CrawlDb.CURRENT_NAME); + Path newCrawlDb = new Path(crawlDb, Integer.toString(new Random() + .nextInt(Integer.MAX_VALUE))); + + // run the updater job outputting to the temp crawl database + JobConf updater = new NutchJob(conf); + updater.setJobName("Update CrawlDb from WebGraph"); + FileInputFormat.addInputPath(updater, crawlDbCurrent); + FileInputFormat.addInputPath(updater, nodeDb); + FileOutputFormat.setOutputPath(updater, newCrawlDb); + updater.setInputFormat(SequenceFileInputFormat.class); + updater.setMapperClass(ScoreUpdater.class); + updater.setReducerClass(ScoreUpdater.class); + updater.setMapOutputKeyClass(Text.class); + updater.setMapOutputValueClass(ObjectWritable.class); + updater.setOutputKeyClass(Text.class); + updater.setOutputValueClass(CrawlDatum.class); + updater.setOutputFormat(MapFileOutputFormat.class); + + try { + JobClient.runJob(updater); + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + + // remove the temp crawldb on error + if (fs.exists(newCrawlDb)) { + fs.delete(newCrawlDb, true); + } + throw e; + } + + // install the temp crawl database + LOG.info("ScoreUpdater: installing new crawldb " + crawlDb); + CrawlDb.install(updater, crawlDb); + + long end = System.currentTimeMillis(); + LOG.info("ScoreUpdater: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new ScoreUpdater(), + args); + System.exit(res); + } + + /** + * Runs the ScoreUpdater tool. + */ + public int run(String[] args) throws Exception { + + Options options = new Options(); + OptionBuilder.withArgName("help"); + OptionBuilder.withDescription("show this help message"); + Option helpOpts = OptionBuilder.create("help"); + options.addOption(helpOpts); + + OptionBuilder.withArgName("crawldb"); + OptionBuilder.hasArg(); + OptionBuilder.withDescription("the crawldb to use"); + Option crawlDbOpts = OptionBuilder.create("crawldb"); + options.addOption(crawlDbOpts); + + OptionBuilder.withArgName("webgraphdb"); + OptionBuilder.hasArg(); + OptionBuilder.withDescription("the webgraphdb to use"); + Option webGraphOpts = OptionBuilder.create("webgraphdb"); + options.addOption(webGraphOpts); + + CommandLineParser parser = new GnuParser(); + try { + + CommandLine line = parser.parse(options, args); + if (line.hasOption("help") || !line.hasOption("webgraphdb") + || !line.hasOption("crawldb")) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("ScoreUpdater", options); + return -1; + } + + String crawlDb = line.getOptionValue("crawldb"); + String webGraphDb = line.getOptionValue("webgraphdb"); + update(new Path(crawlDb), new Path(webGraphDb)); + return 0; + } catch (Exception e) { + LOG.error("ScoreUpdater: " + StringUtils.stringifyException(e)); + return -1; + } + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/WebGraph.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/WebGraph.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/WebGraph.java new file mode 100644 index 0000000..e2c3d8b --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/WebGraph.java @@ -0,0 +1,783 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.scoring.webgraph; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapFileOutputFormat; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.crawl.NutchWritable; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.net.URLNormalizers; +import org.apache.nutch.parse.Outlink; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.util.FSUtils; +import org.apache.nutch.util.HadoopFSUtil; +import org.apache.nutch.util.LockUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.TimingUtil; +import org.apache.nutch.util.URLUtil; + +/** + * Creates three databases, one for inlinks, one for outlinks, and a node + * database that holds the number of in and outlinks to a url and the current + * score for the url. + * + * The score is set by an analysis program such as LinkRank. The WebGraph is an + * update-able database. Outlinks are stored by their fetch time or by the + * current system time if no fetch time is available. Only the most recent + * version of outlinks for a given url is stored. As more crawls are executed + * and the WebGraph updated, newer Outlinks will replace older Outlinks. This + * allows the WebGraph to adapt to changes in the link structure of the web. + * + * The Inlink database is created from the Outlink database and is regenerated + * when the WebGraph is updated. The Node database is created from both the + * Inlink and Outlink databases. Because the Node database is overwritten when + * the WebGraph is updated and because the Node database holds current scores + * for urls it is recommended that a crawl-cycle (one or more full crawls) fully + * complete before the WebGraph is updated and some type of analysis, such as + * LinkRank, is run to update scores in the Node database in a stable fashion. + */ +public class WebGraph extends Configured implements Tool { + + public static final Logger LOG = LoggerFactory.getLogger(WebGraph.class); + public static final String LOCK_NAME = ".locked"; + public static final String INLINK_DIR = "inlinks"; + public static final String OUTLINK_DIR = "outlinks/current"; + public static final String OLD_OUTLINK_DIR = "outlinks/old"; + public static final String NODE_DIR = "nodes"; + + /** + * The OutlinkDb creates a database of all outlinks. Outlinks to internal urls + * by domain and host can be ignored. The number of Outlinks out to a given + * page or domain can also be limited. + */ + public static class OutlinkDb extends Configured implements + Mapper<Text, Writable, Text, NutchWritable>, + Reducer<Text, NutchWritable, Text, LinkDatum> { + + public static final String URL_NORMALIZING = "webgraph.url.normalizers"; + public static final String URL_FILTERING = "webgraph.url.filters"; + + // ignoring internal domains, internal hosts + private boolean ignoreDomain = true; + private boolean ignoreHost = true; + + // limiting urls out to a page or to a domain + private boolean limitPages = true; + private boolean limitDomains = true; + + // using normalizers and/or filters + private boolean normalize = false; + private boolean filter = false; + + // url normalizers, filters and job configuration + private URLNormalizers urlNormalizers; + private URLFilters filters; + private JobConf conf; + + /** + * Normalizes and trims extra whitespace from the given url. + * + * @param url + * The url to normalize. + * + * @return The normalized url. + */ + private String normalizeUrl(String url) { + + if (!normalize) { + return url; + } + + String normalized = null; + if (urlNormalizers != null) { + try { + + // normalize and trim the url + normalized = urlNormalizers.normalize(url, + URLNormalizers.SCOPE_DEFAULT); + normalized = normalized.trim(); + } catch (Exception e) { + LOG.warn("Skipping " + url + ":" + e); + normalized = null; + } + } + return normalized; + } + + /** + * Filters the given url. + * + * @param url + * The url to filter. + * + * @return The filtered url or null. + */ + private String filterUrl(String url) { + + if (!filter) { + return url; + } + + try { + url = filters.filter(url); + } catch (Exception e) { + url = null; + } + + return url; + } + + /** + * Returns the fetch time from the parse data or the current system time if + * the fetch time doesn't exist. + * + * @param data + * The parse data. + * + * @return The fetch time as a long. + */ + private long getFetchTime(ParseData data) { + + // default to current system time + long fetchTime = System.currentTimeMillis(); + String fetchTimeStr = data.getContentMeta().get(Nutch.FETCH_TIME_KEY); + try { + // get the fetch time from the parse data + fetchTime = Long.parseLong(fetchTimeStr); + } catch (Exception e) { + fetchTime = System.currentTimeMillis(); + } + return fetchTime; + } + + /** + * Default constructor. + */ + public OutlinkDb() { + } + + /** + * Configurable constructor. + */ + public OutlinkDb(Configuration conf) { + setConf(conf); + } + + /** + * Configures the OutlinkDb job. Sets up internal links and link limiting. + */ + public void configure(JobConf conf) { + this.conf = conf; + ignoreHost = conf.getBoolean("link.ignore.internal.host", true); + ignoreDomain = conf.getBoolean("link.ignore.internal.domain", true); + limitPages = conf.getBoolean("link.ignore.limit.page", true); + limitDomains = conf.getBoolean("link.ignore.limit.domain", true); + + normalize = conf.getBoolean(URL_NORMALIZING, false); + filter = conf.getBoolean(URL_FILTERING, false); + + if (normalize) { + urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT); + } + + if (filter) { + filters = new URLFilters(conf); + } + } + + /** + * Passes through existing LinkDatum objects from an existing OutlinkDb and + * maps out new LinkDatum objects from new crawls ParseData. + */ + public void map(Text key, Writable value, + OutputCollector<Text, NutchWritable> output, Reporter reporter) + throws IOException { + + // normalize url, stop processing if null + String url = normalizeUrl(key.toString()); + if (url == null) { + return; + } + + // filter url + if (filterUrl(url) == null) { + return; + } + + // Overwrite the key with the normalized URL + key.set(url); + + if (value instanceof CrawlDatum) { + CrawlDatum datum = (CrawlDatum) value; + + if (datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP + || datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM + || datum.getStatus() == CrawlDatum.STATUS_FETCH_GONE) { + + // Tell the reducer to get rid of all instances of this key + output.collect(key, new NutchWritable(new BooleanWritable(true))); + } + } else if (value instanceof ParseData) { + // get the parse data and the outlinks from the parse data, along with + // the fetch time for those links + ParseData data = (ParseData) value; + long fetchTime = getFetchTime(data); + Outlink[] outlinkAr = data.getOutlinks(); + Map<String, String> outlinkMap = new LinkedHashMap<String, String>(); + + // normalize urls and put into map + if (outlinkAr != null && outlinkAr.length > 0) { + for (int i = 0; i < outlinkAr.length; i++) { + Outlink outlink = outlinkAr[i]; + String toUrl = normalizeUrl(outlink.getToUrl()); + + if (filterUrl(toUrl) == null) { + continue; + } + + // only put into map if the url doesn't already exist in the map or + // if it does and the anchor for that link is null, will replace if + // url is existing + boolean existingUrl = outlinkMap.containsKey(toUrl); + if (toUrl != null + && (!existingUrl || (existingUrl && outlinkMap.get(toUrl) == null))) { + outlinkMap.put(toUrl, outlink.getAnchor()); + } + } + } + + // collect the outlinks under the fetch time + for (String outlinkUrl : outlinkMap.keySet()) { + String anchor = outlinkMap.get(outlinkUrl); + LinkDatum datum = new LinkDatum(outlinkUrl, anchor, fetchTime); + output.collect(key, new NutchWritable(datum)); + } + } else if (value instanceof LinkDatum) { + LinkDatum datum = (LinkDatum) value; + String linkDatumUrl = normalizeUrl(datum.getUrl()); + + if (filterUrl(linkDatumUrl) != null) { + datum.setUrl(linkDatumUrl); + + // collect existing outlinks from existing OutlinkDb + output.collect(key, new NutchWritable(datum)); + } + } + } + + public void reduce(Text key, Iterator<NutchWritable> values, + OutputCollector<Text, LinkDatum> output, Reporter reporter) + throws IOException { + + // aggregate all outlinks, get the most recent timestamp for a fetch + // which should be the timestamp for all of the most recent outlinks + long mostRecent = 0L; + List<LinkDatum> outlinkList = new ArrayList<LinkDatum>(); + while (values.hasNext()) { + Writable value = values.next().get(); + + if (value instanceof LinkDatum) { + // loop through, change out most recent timestamp if needed + LinkDatum next = (LinkDatum) value; + long timestamp = next.getTimestamp(); + if (mostRecent == 0L || mostRecent < timestamp) { + mostRecent = timestamp; + } + outlinkList.add(WritableUtils.clone(next, conf)); + reporter.incrCounter("WebGraph.outlinks", "added links", 1); + } else if (value instanceof BooleanWritable) { + BooleanWritable delete = (BooleanWritable) value; + // Actually, delete is always true, otherwise we don't emit it in the + // mapper in the first place + if (delete.get() == true) { + // This page is gone, do not emit it's outlinks + reporter.incrCounter("WebGraph.outlinks", "removed links", 1); + return; + } + } + } + + // get the url, domain, and host for the url + String url = key.toString(); + String domain = URLUtil.getDomainName(url); + String host = URLUtil.getHost(url); + + // setup checking sets for domains and pages + Set<String> domains = new HashSet<String>(); + Set<String> pages = new HashSet<String>(); + + // loop through the link datums + for (LinkDatum datum : outlinkList) { + + // get the url, host, domain, and page for each outlink + String toUrl = datum.getUrl(); + String toDomain = URLUtil.getDomainName(toUrl); + String toHost = URLUtil.getHost(toUrl); + String toPage = URLUtil.getPage(toUrl); + datum.setLinkType(LinkDatum.OUTLINK); + + // outlinks must be the most recent and conform to internal url and + // limiting rules, if it does collect it + if (datum.getTimestamp() == mostRecent + && (!limitPages || (limitPages && !pages.contains(toPage))) + && (!limitDomains || (limitDomains && !domains.contains(toDomain))) + && (!ignoreHost || (ignoreHost && !toHost.equalsIgnoreCase(host))) + && (!ignoreDomain || (ignoreDomain && !toDomain + .equalsIgnoreCase(domain)))) { + output.collect(key, datum); + pages.add(toPage); + domains.add(toDomain); + } + } + } + + public void close() { + } + } + + /** + * The InlinkDb creates a database of Inlinks. Inlinks are inverted from the + * OutlinkDb LinkDatum objects and are regenerated each time the WebGraph is + * updated. + */ + private static class InlinkDb extends Configured implements + Mapper<Text, LinkDatum, Text, LinkDatum> { + + private long timestamp; + + /** + * Configures job. Sets timestamp for all Inlink LinkDatum objects to the + * current system time. + */ + public void configure(JobConf conf) { + timestamp = System.currentTimeMillis(); + } + + public void close() { + } + + /** + * Inverts the Outlink LinkDatum objects into new LinkDatum objects with a + * new system timestamp, type and to and from url switched. + */ + public void map(Text key, LinkDatum datum, + OutputCollector<Text, LinkDatum> output, Reporter reporter) + throws IOException { + + // get the to and from url and the anchor + String fromUrl = key.toString(); + String toUrl = datum.getUrl(); + String anchor = datum.getAnchor(); + + // flip the from and to url and set the new link type + LinkDatum inlink = new LinkDatum(fromUrl, anchor, timestamp); + inlink.setLinkType(LinkDatum.INLINK); + output.collect(new Text(toUrl), inlink); + } + } + + /** + * Creates the Node database which consists of the number of in and outlinks + * for each url and a score slot for analysis programs such as LinkRank. + */ + private static class NodeDb extends Configured implements + Reducer<Text, LinkDatum, Text, Node> { + + /** + * Configures job. + */ + public void configure(JobConf conf) { + } + + public void close() { + } + + /** + * Counts the number of inlinks and outlinks for each url and sets a default + * score of 0.0 for each url (node) in the webgraph. + */ + public void reduce(Text key, Iterator<LinkDatum> values, + OutputCollector<Text, Node> output, Reporter reporter) + throws IOException { + + Node node = new Node(); + int numInlinks = 0; + int numOutlinks = 0; + + // loop through counting number of in and out links + while (values.hasNext()) { + LinkDatum next = values.next(); + if (next.getLinkType() == LinkDatum.INLINK) { + numInlinks++; + } else if (next.getLinkType() == LinkDatum.OUTLINK) { + numOutlinks++; + } + } + + // set the in and outlinks and a default score of 0 + node.setNumInlinks(numInlinks); + node.setNumOutlinks(numOutlinks); + node.setInlinkScore(0.0f); + output.collect(key, node); + } + } + + /** + * Creates the three different WebGraph databases, Outlinks, Inlinks, and + * Node. If a current WebGraph exists then it is updated, if it doesn't exist + * then a new WebGraph database is created. + * + * @param webGraphDb + * The WebGraph to create or update. + * @param segments + * The array of segments used to update the WebGraph. Newer segments + * and fetch times will overwrite older segments. + * @param normalize + * whether to use URLNormalizers on URL's in the segment + * @param filter + * whether to use URLFilters on URL's in the segment + * + * @throws IOException + * If an error occurs while processing the WebGraph. + */ + public void createWebGraph(Path webGraphDb, Path[] segments, + boolean normalize, boolean filter) throws IOException { + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + if (LOG.isInfoEnabled()) { + LOG.info("WebGraphDb: starting at " + sdf.format(start)); + LOG.info("WebGraphDb: webgraphdb: " + webGraphDb); + LOG.info("WebGraphDb: URL normalize: " + normalize); + LOG.info("WebGraphDb: URL filter: " + filter); + } + + Configuration conf = getConf(); + FileSystem fs = FileSystem.get(conf); + + // lock an existing webgraphdb to prevent multiple simultaneous updates + Path lock = new Path(webGraphDb, LOCK_NAME); + if (!fs.exists(webGraphDb)) { + fs.mkdirs(webGraphDb); + } + + LockUtil.createLockFile(fs, lock, false); + + // outlink and temp outlink database paths + Path outlinkDb = new Path(webGraphDb, OUTLINK_DIR); + Path oldOutlinkDb = new Path(webGraphDb, OLD_OUTLINK_DIR); + + if (!fs.exists(outlinkDb)) { + fs.mkdirs(outlinkDb); + } + + Path tempOutlinkDb = new Path(outlinkDb + "-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + JobConf outlinkJob = new NutchJob(conf); + outlinkJob.setJobName("Outlinkdb: " + outlinkDb); + + boolean deleteGone = conf.getBoolean("link.delete.gone", false); + boolean preserveBackup = conf.getBoolean("db.preserve.backup", true); + + if (deleteGone) { + LOG.info("OutlinkDb: deleting gone links"); + } + + // get the parse data and crawl fetch data for all segments + if (segments != null) { + for (int i = 0; i < segments.length; i++) { + Path parseData = new Path(segments[i], ParseData.DIR_NAME); + if (fs.exists(parseData)) { + LOG.info("OutlinkDb: adding input: " + parseData); + FileInputFormat.addInputPath(outlinkJob, parseData); + } + + if (deleteGone) { + Path crawlFetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME); + if (fs.exists(crawlFetch)) { + LOG.info("OutlinkDb: adding input: " + crawlFetch); + FileInputFormat.addInputPath(outlinkJob, crawlFetch); + } + } + } + } + + // add the existing webgraph + LOG.info("OutlinkDb: adding input: " + outlinkDb); + FileInputFormat.addInputPath(outlinkJob, outlinkDb); + + outlinkJob.setBoolean(OutlinkDb.URL_NORMALIZING, normalize); + outlinkJob.setBoolean(OutlinkDb.URL_FILTERING, filter); + + outlinkJob.setInputFormat(SequenceFileInputFormat.class); + outlinkJob.setMapperClass(OutlinkDb.class); + outlinkJob.setReducerClass(OutlinkDb.class); + outlinkJob.setMapOutputKeyClass(Text.class); + outlinkJob.setMapOutputValueClass(NutchWritable.class); + outlinkJob.setOutputKeyClass(Text.class); + outlinkJob.setOutputValueClass(LinkDatum.class); + FileOutputFormat.setOutputPath(outlinkJob, tempOutlinkDb); + outlinkJob.setOutputFormat(MapFileOutputFormat.class); + outlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", + false); + + // run the outlinkdb job and replace any old outlinkdb with the new one + try { + LOG.info("OutlinkDb: running"); + JobClient.runJob(outlinkJob); + LOG.info("OutlinkDb: installing " + outlinkDb); + FSUtils.replace(fs, oldOutlinkDb, outlinkDb, true); + FSUtils.replace(fs, outlinkDb, tempOutlinkDb, true); + if (!preserveBackup && fs.exists(oldOutlinkDb)) + fs.delete(oldOutlinkDb, true); + LOG.info("OutlinkDb: finished"); + } catch (IOException e) { + + // remove lock file and and temporary directory if an error occurs + LockUtil.removeLockFile(fs, lock); + if (fs.exists(tempOutlinkDb)) { + fs.delete(tempOutlinkDb, true); + } + LOG.error(StringUtils.stringifyException(e)); + throw e; + } + + // inlink and temp link database paths + Path inlinkDb = new Path(webGraphDb, INLINK_DIR); + Path tempInlinkDb = new Path(inlinkDb + "-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + JobConf inlinkJob = new NutchJob(conf); + inlinkJob.setJobName("Inlinkdb " + inlinkDb); + LOG.info("InlinkDb: adding input: " + outlinkDb); + FileInputFormat.addInputPath(inlinkJob, outlinkDb); + inlinkJob.setInputFormat(SequenceFileInputFormat.class); + inlinkJob.setMapperClass(InlinkDb.class); + inlinkJob.setMapOutputKeyClass(Text.class); + inlinkJob.setMapOutputValueClass(LinkDatum.class); + inlinkJob.setOutputKeyClass(Text.class); + inlinkJob.setOutputValueClass(LinkDatum.class); + FileOutputFormat.setOutputPath(inlinkJob, tempInlinkDb); + inlinkJob.setOutputFormat(MapFileOutputFormat.class); + inlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", + false); + + try { + + // run the inlink and replace any old with new + LOG.info("InlinkDb: running"); + JobClient.runJob(inlinkJob); + LOG.info("InlinkDb: installing " + inlinkDb); + FSUtils.replace(fs, inlinkDb, tempInlinkDb, true); + LOG.info("InlinkDb: finished"); + } catch (IOException e) { + + // remove lock file and and temporary directory if an error occurs + LockUtil.removeLockFile(fs, lock); + if (fs.exists(tempInlinkDb)) { + fs.delete(tempInlinkDb, true); + } + LOG.error(StringUtils.stringifyException(e)); + throw e; + } + + // node and temp node database paths + Path nodeDb = new Path(webGraphDb, NODE_DIR); + Path tempNodeDb = new Path(nodeDb + "-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + JobConf nodeJob = new NutchJob(conf); + nodeJob.setJobName("NodeDb " + nodeDb); + LOG.info("NodeDb: adding input: " + outlinkDb); + LOG.info("NodeDb: adding input: " + inlinkDb); + FileInputFormat.addInputPath(nodeJob, outlinkDb); + FileInputFormat.addInputPath(nodeJob, inlinkDb); + nodeJob.setInputFormat(SequenceFileInputFormat.class); + nodeJob.setReducerClass(NodeDb.class); + nodeJob.setMapOutputKeyClass(Text.class); + nodeJob.setMapOutputValueClass(LinkDatum.class); + nodeJob.setOutputKeyClass(Text.class); + nodeJob.setOutputValueClass(Node.class); + FileOutputFormat.setOutputPath(nodeJob, tempNodeDb); + nodeJob.setOutputFormat(MapFileOutputFormat.class); + nodeJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", + false); + + try { + + // run the node job and replace old nodedb with new + LOG.info("NodeDb: running"); + JobClient.runJob(nodeJob); + LOG.info("NodeDb: installing " + nodeDb); + FSUtils.replace(fs, nodeDb, tempNodeDb, true); + LOG.info("NodeDb: finished"); + } catch (IOException e) { + + // remove lock file and and temporary directory if an error occurs + LockUtil.removeLockFile(fs, lock); + if (fs.exists(tempNodeDb)) { + fs.delete(tempNodeDb, true); + } + LOG.error(StringUtils.stringifyException(e)); + throw e; + } + + // remove the lock file for the webgraph + LockUtil.removeLockFile(fs, lock); + + long end = System.currentTimeMillis(); + LOG.info("WebGraphDb: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new WebGraph(), args); + System.exit(res); + } + + /** + * Parses command link arguments and runs the WebGraph jobs. + */ + public int run(String[] args) throws Exception { + + // boolean options + Option helpOpt = new Option("h", "help", false, "show this help message"); + Option normOpt = new Option("n", "normalize", false, + "whether to use URLNormalizers on the URL's in the segment"); + Option filtOpt = new Option("f", "filter", false, + "whether to use URLFilters on the URL's in the segment"); + + // argument options + @SuppressWarnings("static-access") + Option graphOpt = OptionBuilder + .withArgName("webgraphdb") + .hasArg() + .withDescription( + "the web graph database to create (if none exists) or use if one does") + .create("webgraphdb"); + @SuppressWarnings("static-access") + Option segOpt = OptionBuilder.withArgName("segment").hasArgs() + .withDescription("the segment(s) to use").create("segment"); + @SuppressWarnings("static-access") + Option segDirOpt = OptionBuilder.withArgName("segmentDir").hasArgs() + .withDescription("the segment directory to use").create("segmentDir"); + + // create the options + Options options = new Options(); + options.addOption(helpOpt); + options.addOption(normOpt); + options.addOption(filtOpt); + options.addOption(graphOpt); + options.addOption(segOpt); + options.addOption(segDirOpt); + + CommandLineParser parser = new GnuParser(); + try { + CommandLine line = parser.parse(options, args); + if (line.hasOption("help") || !line.hasOption("webgraphdb") + || (!line.hasOption("segment") && !line.hasOption("segmentDir"))) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("WebGraph", options, true); + return -1; + } + + String webGraphDb = line.getOptionValue("webgraphdb"); + + Path[] segPaths = null; + + // Handle segment option + if (line.hasOption("segment")) { + String[] segments = line.getOptionValues("segment"); + segPaths = new Path[segments.length]; + for (int i = 0; i < segments.length; i++) { + segPaths[i] = new Path(segments[i]); + } + } + + // Handle segmentDir option + if (line.hasOption("segmentDir")) { + Path dir = new Path(line.getOptionValue("segmentDir")); + FileSystem fs = dir.getFileSystem(getConf()); + FileStatus[] fstats = fs.listStatus(dir, + HadoopFSUtil.getPassDirectoriesFilter(fs)); + segPaths = HadoopFSUtil.getPaths(fstats); + } + + boolean normalize = false; + + if (line.hasOption("normalize")) { + normalize = true; + } + + boolean filter = false; + + if (line.hasOption("filter")) { + filter = true; + } + + createWebGraph(new Path(webGraphDb), segPaths, normalize, filter); + return 0; + } catch (Exception e) { + LOG.error("WebGraph: " + StringUtils.stringifyException(e)); + return -2; + } + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/package-info.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/package-info.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/package-info.java new file mode 100644 index 0000000..a568b46 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Scoring implementation based on link analysis + * ({@link org.apache.nutch.scoring.webgraph.LinkRank}), + * see {@link org.apache.nutch.scoring.webgraph.WebGraph}. + */ +package org.apache.nutch.scoring.webgraph; + http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/ContentAsTextInputFormat.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/ContentAsTextInputFormat.java b/nutch-core/src/main/java/org/apache/nutch/segment/ContentAsTextInputFormat.java new file mode 100644 index 0000000..d67b590 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/segment/ContentAsTextInputFormat.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.segment; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileRecordReader; +import org.apache.nutch.protocol.Content; + +/** + * An input format that takes Nutch Content objects and converts them to text + * while converting newline endings to spaces. This format is useful for working + * with Nutch content objects in Hadoop Streaming with other languages. + */ +public class ContentAsTextInputFormat extends + SequenceFileInputFormat<Text, Text> { + + private static class ContentAsTextRecordReader implements + RecordReader<Text, Text> { + + private final SequenceFileRecordReader<Text, Content> sequenceFileRecordReader; + + private Text innerKey; + private Content innerValue; + + public ContentAsTextRecordReader(Configuration conf, FileSplit split) + throws IOException { + sequenceFileRecordReader = new SequenceFileRecordReader<Text, Content>( + conf, split); + innerKey = sequenceFileRecordReader.createKey(); + innerValue = sequenceFileRecordReader.createValue(); + } + + public Text createKey() { + return new Text(); + } + + public Text createValue() { + return new Text(); + } + + public synchronized boolean next(Text key, Text value) throws IOException { + + // convert the content object to text + Text tKey = key; + if (!sequenceFileRecordReader.next(innerKey, innerValue)) { + return false; + } + tKey.set(innerKey.toString()); + String contentAsStr = new String(innerValue.getContent()); + + // replace new line endings with spaces + contentAsStr = contentAsStr.replaceAll("\n", " "); + value.set(contentAsStr); + + return true; + } + + public float getProgress() throws IOException { + return sequenceFileRecordReader.getProgress(); + } + + public synchronized long getPos() throws IOException { + return sequenceFileRecordReader.getPos(); + } + + public synchronized void close() throws IOException { + sequenceFileRecordReader.close(); + } + } + + public ContentAsTextInputFormat() { + super(); + } + + public RecordReader<Text, Text> getRecordReader(InputSplit split, + JobConf job, Reporter reporter) throws IOException { + + reporter.setStatus(split.toString()); + return new ContentAsTextRecordReader(job, (FileSplit) split); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentChecker.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentChecker.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentChecker.java new file mode 100644 index 0000000..ec601f4 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentChecker.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.segment; + +import java.io.IOException; + +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.parse.ParseText; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.util.HadoopFSUtil; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Checks whether a segment is valid, or has a certain status (generated, + * fetched, parsed), or can be used safely for a certain processing step + * (e.g., indexing). + */ +public class SegmentChecker { + + public static final Logger LOG = LoggerFactory + .getLogger(SegmentChecker.class); + + /** + * Check if the segment is indexable. May add new check methods here. + */ + public static boolean isIndexable(Path segmentPath, FileSystem fs) + throws IOException { + if (segmentPath == null || fs == null) { + LOG.info("No segment path or filesystem set."); + return false; + } + + boolean checkResult = true; + checkResult &= checkSegmentDir(segmentPath, fs); + // Add new check methods here + + if (checkResult) { + return true; + } else { + return false; + } + } + + /** + * Check the segment to see if it is valid based on the sub directories. + */ + public static boolean checkSegmentDir(Path segmentPath, FileSystem fs) + throws IOException { + + if (segmentPath.getName().length() != 14) { + LOG.warn("The input path at {} is not a segment... skipping", segmentPath.getName()); + return false; + } + + FileStatus[] fstats_segment = fs.listStatus(segmentPath, + HadoopFSUtil.getPassDirectoriesFilter(fs)); + Path[] segment_files = HadoopFSUtil.getPaths(fstats_segment); + + boolean crawlFetchExists = false; + boolean crawlParseExists = false; + boolean parseDataExists = false; + boolean parseTextExists = false; + + for (Path path : segment_files) { + String pathName = path.getName(); + crawlFetchExists |= pathName.equals(CrawlDatum.FETCH_DIR_NAME); + crawlParseExists |= pathName.equals(CrawlDatum.PARSE_DIR_NAME); + parseDataExists |= pathName.equals(ParseData.DIR_NAME); + parseTextExists |= pathName.equals(ParseText.DIR_NAME); + } + + if (parseTextExists && crawlParseExists && crawlFetchExists + && parseDataExists) { + + // No segment dir missing + LOG.info("Segment dir is complete: " + segmentPath.toString() + "."); + + return true; + } else { + + // log the missing dir + StringBuilder missingDir = new StringBuilder(""); + if (parseDataExists == false) { + missingDir.append(ParseData.DIR_NAME + ", "); + } + if (parseTextExists == false) { + missingDir.append(ParseText.DIR_NAME + ", "); + } + if (crawlParseExists == false) { + missingDir.append(CrawlDatum.PARSE_DIR_NAME + ", "); + } + if (crawlFetchExists == false) { + missingDir.append(CrawlDatum.FETCH_DIR_NAME + ", "); + } + + String missingDirString = missingDir.toString(); + LOG.warn("Skipping segment: " + segmentPath.toString() + + ". Missing sub directories: " + + missingDirString.substring(0, missingDirString.length() - 2)); + + return false; + } + + } + + /** + * Check the segment to see if it is has been parsed before. + */ + public static boolean isParsed(Path segment, FileSystem fs) + throws IOException { + + if (fs.exists(new Path(segment, CrawlDatum.PARSE_DIR_NAME))){ + return true; + } + return false; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilter.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilter.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilter.java new file mode 100644 index 0000000..6d53809 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.segment; + +import java.util.Collection; + +import org.apache.hadoop.io.Text; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.parse.ParseText; +import org.apache.nutch.protocol.Content; + +/** + * Interface used to filter segments during segment merge. It allows filtering + * on more sophisticated criteria than just URLs. In particular it allows + * filtering based on metadata collected while parsing page. + * + */ +public interface SegmentMergeFilter { + /** The name of the extension point. */ + public final static String X_POINT_ID = SegmentMergeFilter.class.getName(); + + /** + * The filtering method which gets all information being merged for a given + * key (URL). + * + * @return <tt>true</tt> values for this <tt>key</tt> (URL) should be merged + * into the new segment. + */ + public boolean filter(Text key, CrawlDatum generateData, + CrawlDatum fetchData, CrawlDatum sigData, Content content, + ParseData parseData, ParseText parseText, Collection<CrawlDatum> linked); +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilters.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilters.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilters.java new file mode 100644 index 0000000..7aa2de3 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilters.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.segment; + +import java.util.Collection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.net.URLFilter; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.parse.ParseText; +import org.apache.nutch.plugin.Extension; +import org.apache.nutch.plugin.ExtensionPoint; +import org.apache.nutch.plugin.PluginRepository; +import org.apache.nutch.plugin.PluginRuntimeException; +import org.apache.nutch.protocol.Content; + +/** + * This class wraps all {@link SegmentMergeFilter} extensions in a single object + * so it is easier to operate on them. If any of extensions returns + * <tt>false</tt> this one will return <tt>false</tt> as well. + * + */ +public class SegmentMergeFilters { + private static final Logger LOG = LoggerFactory + .getLogger(SegmentMergeFilters.class); + private SegmentMergeFilter[] filters; + + public SegmentMergeFilters(Configuration conf) { + try { + ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint( + SegmentMergeFilter.X_POINT_ID); + if (point == null) + throw new RuntimeException(URLFilter.X_POINT_ID + " not found."); + Extension[] extensions = point.getExtensions(); + filters = new SegmentMergeFilter[extensions.length]; + for (int i = 0; i < extensions.length; i++) { + filters[i] = (SegmentMergeFilter) extensions[i].getExtensionInstance(); + } + } catch (PluginRuntimeException e) { + throw new RuntimeException(e); + } + } + + /** + * Iterates over all {@link SegmentMergeFilter} extensions and if any of them + * returns false, it will return false as well. + * + * @return <tt>true</tt> values for this <tt>key</tt> (URL) should be merged + * into the new segment. + */ + public boolean filter(Text key, CrawlDatum generateData, + CrawlDatum fetchData, CrawlDatum sigData, Content content, + ParseData parseData, ParseText parseText, Collection<CrawlDatum> linked) { + for (SegmentMergeFilter filter : filters) { + if (!filter.filter(key, generateData, fetchData, sigData, content, + parseData, parseText, linked)) { + if (LOG.isTraceEnabled()) + LOG.trace("Key " + key + " dropped by " + filter.getClass().getName()); + return false; + } + } + if (LOG.isTraceEnabled()) + LOG.trace("Key " + key + " accepted for merge."); + return true; + } +}
