http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/protocol/ProtocolStatus.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/protocol/ProtocolStatus.java b/nutch-core/src/main/java/org/apache/nutch/protocol/ProtocolStatus.java new file mode 100644 index 0000000..9e75531 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/protocol/ProtocolStatus.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.protocol; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; + +import org.apache.hadoop.io.VersionMismatchException; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +/** + * @author Andrzej Bialecki + */ +public class ProtocolStatus implements Writable { + + private final static byte VERSION = 2; + + /** Content was retrieved without errors. */ + public static final int SUCCESS = 1; + /** Content was not retrieved. Any further errors may be indicated in args. */ + public static final int FAILED = 2; + + /** This protocol was not found. Application may attempt to retry later. */ + public static final int PROTO_NOT_FOUND = 10; + /** Resource is gone. */ + public static final int GONE = 11; + /** Resource has moved permanently. New url should be found in args. */ + public static final int MOVED = 12; + /** Resource has moved temporarily. New url should be found in args. */ + public static final int TEMP_MOVED = 13; + /** Resource was not found. */ + public static final int NOTFOUND = 14; + /** Temporary failure. Application may retry immediately. */ + public static final int RETRY = 15; + /** + * Unspecified exception occured. Further information may be provided in args. + */ + public static final int EXCEPTION = 16; + /** Access denied - authorization required, but missing/incorrect. */ + public static final int ACCESS_DENIED = 17; + /** Access denied by robots.txt rules. */ + public static final int ROBOTS_DENIED = 18; + /** Too many redirects. */ + public static final int REDIR_EXCEEDED = 19; + /** Not fetching. */ + public static final int NOTFETCHING = 20; + /** Unchanged since the last fetch. */ + public static final int NOTMODIFIED = 21; + /** + * Request was refused by protocol plugins, because it would block. The + * expected number of milliseconds to wait before retry may be provided in + * args. + */ + public static final int WOULDBLOCK = 22; + /** Thread was blocked http.max.delays times during fetching. */ + public static final int BLOCKED = 23; + + // Useful static instances for status codes that don't usually require any + // additional arguments. + public static final ProtocolStatus STATUS_SUCCESS = new ProtocolStatus( + SUCCESS); + public static final ProtocolStatus STATUS_FAILED = new ProtocolStatus(FAILED); + public static final ProtocolStatus STATUS_GONE = new ProtocolStatus(GONE); + public static final ProtocolStatus STATUS_NOTFOUND = new ProtocolStatus( + NOTFOUND); + public static final ProtocolStatus STATUS_RETRY = new ProtocolStatus(RETRY); + public static final ProtocolStatus STATUS_ROBOTS_DENIED = new ProtocolStatus( + ROBOTS_DENIED); + public static final ProtocolStatus STATUS_REDIR_EXCEEDED = new ProtocolStatus( + REDIR_EXCEEDED); + public static final ProtocolStatus STATUS_NOTFETCHING = new ProtocolStatus( + NOTFETCHING); + public static final ProtocolStatus STATUS_NOTMODIFIED = new ProtocolStatus( + NOTMODIFIED); + public static final ProtocolStatus STATUS_WOULDBLOCK = new ProtocolStatus( + WOULDBLOCK); + public static final ProtocolStatus STATUS_BLOCKED = new ProtocolStatus( + BLOCKED); + + private int code; + private long lastModified; + private String[] args; + + private static final HashMap<Integer, String> codeToName = new HashMap<Integer, String>(); + static { + codeToName.put(new Integer(SUCCESS), "success"); + codeToName.put(new Integer(FAILED), "failed"); + codeToName.put(new Integer(PROTO_NOT_FOUND), "proto_not_found"); + codeToName.put(new Integer(GONE), "gone"); + codeToName.put(new Integer(MOVED), "moved"); + codeToName.put(new Integer(TEMP_MOVED), "temp_moved"); + codeToName.put(new Integer(NOTFOUND), "notfound"); + codeToName.put(new Integer(RETRY), "retry"); + codeToName.put(new Integer(EXCEPTION), "exception"); + codeToName.put(new Integer(ACCESS_DENIED), "access_denied"); + codeToName.put(new Integer(ROBOTS_DENIED), "robots_denied"); + codeToName.put(new Integer(REDIR_EXCEEDED), "redir_exceeded"); + codeToName.put(new Integer(NOTFETCHING), "notfetching"); + codeToName.put(new Integer(NOTMODIFIED), "notmodified"); + codeToName.put(new Integer(WOULDBLOCK), "wouldblock"); + codeToName.put(new Integer(BLOCKED), "blocked"); + } + + public ProtocolStatus() { + + } + + public ProtocolStatus(int code, String[] args) { + this.code = code; + this.args = args; + } + + public ProtocolStatus(int code, String[] args, long lastModified) { + this.code = code; + this.args = args; + this.lastModified = lastModified; + } + + public ProtocolStatus(int code) { + this(code, null); + } + + public ProtocolStatus(int code, long lastModified) { + this(code, null, lastModified); + } + + public ProtocolStatus(int code, Object message) { + this(code, message, 0L); + } + + public ProtocolStatus(int code, Object message, long lastModified) { + this.code = code; + this.lastModified = lastModified; + if (message != null) + this.args = new String[] { String.valueOf(message) }; + } + + public ProtocolStatus(Throwable t) { + this(EXCEPTION, t); + } + + public static ProtocolStatus read(DataInput in) throws IOException { + ProtocolStatus res = new ProtocolStatus(); + res.readFields(in); + return res; + } + + public void readFields(DataInput in) throws IOException { + byte version = in.readByte(); + switch (version) { + case 1: + code = in.readByte(); + lastModified = in.readLong(); + args = WritableUtils.readCompressedStringArray(in); + break; + case VERSION: + code = in.readByte(); + lastModified = in.readLong(); + args = WritableUtils.readStringArray(in); + break; + default: + throw new VersionMismatchException(VERSION, version); + } + } + + public void write(DataOutput out) throws IOException { + out.writeByte(VERSION); + out.writeByte((byte) code); + out.writeLong(lastModified); + if (args == null) { + out.writeInt(-1); + } else { + WritableUtils.writeStringArray(out, args); + } + } + + public void setArgs(String[] args) { + this.args = args; + } + + public String[] getArgs() { + return args; + } + + public int getCode() { + return code; + } + + public String getName() { + return codeToName.get(this.code); + } + + public void setCode(int code) { + this.code = code; + } + + public boolean isSuccess() { + return code == SUCCESS; + } + + public boolean isTransientFailure() { + return code == ACCESS_DENIED || code == EXCEPTION || code == REDIR_EXCEEDED + || code == RETRY || code == TEMP_MOVED || code == WOULDBLOCK + || code == PROTO_NOT_FOUND; + } + + public boolean isPermanentFailure() { + return code == FAILED || code == GONE || code == MOVED || code == NOTFOUND + || code == ROBOTS_DENIED; + } + + public boolean isRedirect() { + return code == MOVED || code == TEMP_MOVED; + } + + public String getMessage() { + if (args != null && args.length > 0) + return args[0]; + return null; + } + + public void setMessage(String msg) { + if (args != null && args.length > 0) + args[0] = msg; + else + args = new String[] { msg }; + } + + public long getLastModified() { + return lastModified; + } + + public void setLastModified(long lastModified) { + this.lastModified = lastModified; + } + + public boolean equals(Object o) { + if (o == null) + return false; + if (!(o instanceof ProtocolStatus)) + return false; + ProtocolStatus other = (ProtocolStatus) o; + if (this.code != other.code || this.lastModified != other.lastModified) + return false; + if (this.args == null) { + if (other.args == null) + return true; + else + return false; + } else { + if (other.args == null) + return false; + if (other.args.length != this.args.length) + return false; + for (int i = 0; i < this.args.length; i++) { + if (!this.args[i].equals(other.args[i])) + return false; + } + } + return true; + } + + public String toString() { + StringBuffer res = new StringBuffer(); + res.append(codeToName.get(new Integer(code)) + "(" + code + + "), lastModified=" + lastModified); + if (args != null) { + if (args.length == 1) { + res.append(": " + String.valueOf(args[0])); + } else { + for (int i = 0; i < args.length; i++) { + if (args[i] != null) + res.append(", args[" + i + "]=" + String.valueOf(args[i])); + } + } + } + return res.toString(); + } +}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/protocol/RobotRulesParser.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/protocol/RobotRulesParser.java b/nutch-core/src/main/java/org/apache/nutch/protocol/RobotRulesParser.java new file mode 100644 index 0000000..475aef4 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/protocol/RobotRulesParser.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.protocol; + +// JDK imports +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.LineNumberReader; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Set; +import java.util.StringTokenizer; + +// Commons Logging imports +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// Nutch imports +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.SuffixStringMatcher; + +import crawlercommons.robots.BaseRobotRules; +import crawlercommons.robots.SimpleRobotRules; +import crawlercommons.robots.SimpleRobotRules.RobotRulesMode; +import crawlercommons.robots.SimpleRobotRulesParser; + +/** + * This class uses crawler-commons for handling the parsing of + * {@code robots.txt} files. It emits SimpleRobotRules objects, which describe + * the download permissions as described in SimpleRobotRulesParser. + * + * Protocol-specific implementations have to implement the method + * {@link getRobotRulesSet}. + */ +public abstract class RobotRulesParser implements Tool { + + public static final Logger LOG = LoggerFactory + .getLogger(RobotRulesParser.class); + + protected static final Hashtable<String, BaseRobotRules> CACHE = new Hashtable<String, BaseRobotRules>(); + + /** + * A {@link BaseRobotRules} object appropriate for use when the + * {@code robots.txt} file is empty or missing; all requests are allowed. + */ + public static final BaseRobotRules EMPTY_RULES = new SimpleRobotRules( + RobotRulesMode.ALLOW_ALL); + + /** + * A {@link BaseRobotRules} object appropriate for use when the + * {@code robots.txt} file is not fetched due to a {@code 403/Forbidden} + * response; all requests are disallowed. + */ + public static BaseRobotRules FORBID_ALL_RULES = new SimpleRobotRules( + RobotRulesMode.ALLOW_NONE); + + private static SimpleRobotRulesParser robotParser = new SimpleRobotRulesParser(); + protected Configuration conf; + protected String agentNames; + + /** set of host names or IPs to be explicitly excluded from robots.txt checking */ + protected Set<String> whiteList = new HashSet<String>(); + + /* Matcher user for efficiently matching URLs against a set of suffixes. */ + private SuffixStringMatcher matcher = null; + + public RobotRulesParser() { + } + + public RobotRulesParser(Configuration conf) { + setConf(conf); + } + + /** + * Set the {@link Configuration} object + */ + public void setConf(Configuration conf) { + this.conf = conf; + + // Grab the agent names we advertise to robots files. + String agentName = conf.get("http.agent.name"); + if (agentName == null || (agentName = agentName.trim()).isEmpty()) { + throw new RuntimeException("Agent name not configured!"); + } + agentNames = agentName; + + // If there are any other agents specified, append those to the list of + // agents + String otherAgents = conf.get("http.robots.agents"); + if (otherAgents != null && !otherAgents.trim().isEmpty()) { + StringTokenizer tok = new StringTokenizer(otherAgents, ","); + StringBuilder sb = new StringBuilder(agentNames); + while (tok.hasMoreTokens()) { + String str = tok.nextToken().trim(); + if (str.equals("*") || str.equals(agentName)) { + // skip wildcard "*" or agent name itself + // (required for backward compatibility, cf. NUTCH-1715 and + // NUTCH-1718) + } else { + sb.append(",").append(str); + } + } + + agentNames = sb.toString(); + } + + String[] confWhiteList = conf.getStrings("http.robot.rules.whitelist"); + if (confWhiteList == null) { + LOG.info("robots.txt whitelist not configured."); + } + else { + for (int i = 0; i < confWhiteList.length; i++) { + if (confWhiteList[i].isEmpty()) { + LOG.info("Empty whitelisted URL skipped!"); + continue; + } + whiteList.add(confWhiteList[i]); + } + + if (whiteList.size() > 0) { + matcher = new SuffixStringMatcher(whiteList); + LOG.info("Whitelisted hosts: " + whiteList); + } + } + } + + /** + * Get the {@link Configuration} object + */ + public Configuration getConf() { + return conf; + } + + /** + * Check whether a URL belongs to a whitelisted host. + */ + public boolean isWhiteListed(URL url) { + boolean match = false; + String urlString = url.getHost(); + + if (matcher != null) { + match = matcher.matches(urlString); + } + + return match; + } + + /** + * Parses the robots content using the {@link SimpleRobotRulesParser} from + * crawler commons + * + * @param url + * A string containing url + * @param content + * Contents of the robots file in a byte array + * @param contentType + * The content type of the robots file + * @param robotName + * A string containing all the robots agent names used by parser for + * matching + * @return BaseRobotRules object + */ + public BaseRobotRules parseRules(String url, byte[] content, + String contentType, String robotName) { + return robotParser.parseContent(url, content, contentType, robotName); + } + + public BaseRobotRules getRobotRulesSet(Protocol protocol, Text url) { + URL u = null; + try { + u = new URL(url.toString()); + } catch (Exception e) { + return EMPTY_RULES; + } + return getRobotRulesSet(protocol, u); + } + + /** + * Fetch robots.txt (or it's protocol-specific equivalent) which applies to + * the given URL, parse it and return the set of robot rules applicable for + * the configured agent name(s). + * + * @param protocol + * protocol implementation + * @param url + * URL to be checked whether fetching is allowed by robot rules + * @return robot rules + */ + public abstract BaseRobotRules getRobotRulesSet(Protocol protocol, URL url); + + @Override + public int run(String[] args) { + + if (args.length < 2) { + String[] help = { + "Usage: RobotRulesParser <robots-file> <url-file> [<agent-names>]\n", + "\tThe <robots-file> will be parsed as a robots.txt file,", + "\tusing the given <agent-name> to select rules.", + "\tURLs will be read (one per line) from <url-file>,", + "\tand tested against the rules.", + "\tMultiple agent names can be provided using", + "\tcomma as a delimiter without any spaces.", + "\tIf no agent name is given the property http.agent.name", + "\tis used. If http.agent.name is empty, robots.txt is checked", + "\tfor rules assigned to the user agent `*' (meaning any other)." }; + for (String s : help) { + System.err.println(s); + } + System.exit(-1); + } + + File robotsFile = new File(args[0]); + File urlFile = new File(args[1]); + + if (args.length > 2) { + // set agent name from command-line in configuration and update parser + String agents = args[2]; + conf.set("http.agent.name", agents); + setConf(conf); + } + + try { + BaseRobotRules rules = getRobotRulesSet(null, robotsFile.toURI().toURL()); + + LineNumberReader testsIn = new LineNumberReader(new FileReader(urlFile)); + String testPath; + testPath = testsIn.readLine().trim(); + while (testPath != null) { + try { + // testPath can be just a path or a complete URL + URL url = new URL(testPath); + String status; + if (isWhiteListed(url)) { + status = "whitelisted"; + } else if (rules.isAllowed(testPath)) { + status = "allowed"; + } else { + status = "not allowed"; + } + System.out.println(status + ":\t" + testPath); + } catch (MalformedURLException e) { + } + testPath = testsIn.readLine(); + } + testsIn.close(); + } catch (IOException e) { + LOG.error("Failed to run: " + StringUtils.stringifyException(e)); + return -1; + } + + return 0; + } + + /** + * {@link RobotRulesParser} implementation which expects the location of the + * robots.txt passed by URL (usually pointing to a local file) in + * {@link getRobotRulesSet}. + */ + private static class TestRobotRulesParser extends RobotRulesParser { + + public TestRobotRulesParser(Configuration conf) { + // make sure that agent name is set so that setConf() does not complain, + // the agent name is later overwritten by command-line argument + if (conf.get("http.agent.name") == null) { + conf.set("http.agent.name", "*"); + } + setConf(conf); + } + + /** + * @param protocol (ignored) + * @param url + * location of the robots.txt file + * */ + public BaseRobotRules getRobotRulesSet(Protocol protocol, URL url) { + BaseRobotRules rules; + try { + int contentLength = url.openConnection().getContentLength(); + byte[] robotsBytes = new byte[contentLength]; + InputStream openStream = url.openStream(); + openStream.read(robotsBytes); + openStream.close(); + rules = robotParser.parseContent(url.toString(), robotsBytes, + "text/plain", this.conf.get("http.agent.name")); + } catch (IOException e) { + LOG.error("Failed to open robots.txt file " + url + + StringUtils.stringifyException(e)); + rules = EMPTY_RULES; + } + return rules; + } + + } + + public static void main(String[] args) throws Exception { + Configuration conf = NutchConfiguration.create(); + int res = ToolRunner.run(conf, new TestRobotRulesParser(conf), args); + System.exit(res); + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/protocol/package-info.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/protocol/package-info.java b/nutch-core/src/main/java/org/apache/nutch/protocol/package-info.java new file mode 100644 index 0000000..6685249 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/protocol/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Classes related to the {@link org.apache.nutch.protocol.Protocol Protocol} interface, + * see also {@link org.apache.nutch.net.protocols}. + */ +package org.apache.nutch.protocol; + http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/AbstractScoringFilter.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/AbstractScoringFilter.java b/nutch-core/src/main/java/org/apache/nutch/scoring/AbstractScoringFilter.java new file mode 100644 index 0000000..d74c7fb --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/scoring/AbstractScoringFilter.java @@ -0,0 +1,68 @@ +package org.apache.nutch.scoring; + +import java.util.Collection; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.Inlinks; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.parse.Parse; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.protocol.Content; +import org.apache.nutch.scoring.ScoringFilter; +import org.apache.nutch.scoring.ScoringFilterException; + +public abstract class AbstractScoringFilter implements ScoringFilter { + + private Configuration conf; + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public void injectedScore(Text url, CrawlDatum datum) + throws ScoringFilterException { + } + + public void initialScore(Text url, CrawlDatum datum) + throws ScoringFilterException { + } + + public float generatorSortValue(Text url, CrawlDatum datum, float initSort) + throws ScoringFilterException { + return initSort; + } + + public void passScoreBeforeParsing(Text url, CrawlDatum datum, Content content) + throws ScoringFilterException { + } + + public void passScoreAfterParsing(Text url, Content content, Parse parse) + throws ScoringFilterException { + } + + public CrawlDatum distributeScoreToOutlinks(Text fromUrl, + ParseData parseData, Collection<Entry<Text, CrawlDatum>> targets, + CrawlDatum adjust, int allCount) throws ScoringFilterException { + return adjust; + } + + public void updateDbScore(Text url, CrawlDatum old, CrawlDatum datum, + List<CrawlDatum> inlinked) throws ScoringFilterException { + } + + @Override + public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum, + CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) + throws ScoringFilterException { + return initScore; + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilter.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilter.java b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilter.java new file mode 100644 index 0000000..4061a75 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilter.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.scoring; + +import java.util.Collection; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.io.Text; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.Inlinks; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.parse.Parse; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.plugin.Pluggable; +import org.apache.nutch.protocol.Content; + +/** + * A contract defining behavior of scoring plugins. + * + * A scoring filter will manipulate scoring variables in CrawlDatum and in + * resulting search indexes. Filters can be chained in a specific order, to + * provide multi-stage scoring adjustments. + * + * @author Andrzej Bialecki + */ +public interface ScoringFilter extends Configurable, Pluggable { + /** The name of the extension point. */ + public final static String X_POINT_ID = ScoringFilter.class.getName(); + + /** + * Set an initial score for newly injected pages. Note: newly injected pages + * may have no inlinks, so filter implementations may wish to set this score + * to a non-zero value, to give newly injected pages some initial credit. + * + * @param url + * url of the page + * @param datum + * new datum. Filters will modify it in-place. + * @throws ScoringFilterException + */ + public void injectedScore(Text url, CrawlDatum datum) + throws ScoringFilterException; + + /** + * Set an initial score for newly discovered pages. Note: newly discovered + * pages have at least one inlink with its score contribution, so filter + * implementations may choose to set initial score to zero (unknown value), + * and then the inlink score contribution will set the "real" value of the new + * page. + * + * @param url + * url of the page + * @param datum + * new datum. Filters will modify it in-place. + * @throws ScoringFilterException + */ + public void initialScore(Text url, CrawlDatum datum) + throws ScoringFilterException; + + /** + * This method prepares a sort value for the purpose of sorting and selecting + * top N scoring pages during fetchlist generation. + * + * @param url + * url of the page + * @param datum + * page's datum, should not be modified + * @param initSort + * initial sort value, or a value from previous filters in chain + */ + public float generatorSortValue(Text url, CrawlDatum datum, float initSort) + throws ScoringFilterException; + + /** + * This method takes all relevant score information from the current datum + * (coming from a generated fetchlist) and stores it into + * {@link org.apache.nutch.protocol.Content} metadata. This is needed in order + * to pass this value(s) to the mechanism that distributes it to outlinked + * pages. + * + * @param url + * url of the page + * @param datum + * source datum. NOTE: modifications to this value are not persisted. + * @param content + * instance of content. Implementations may modify this in-place, + * primarily by setting some metadata properties. + */ + public void passScoreBeforeParsing(Text url, CrawlDatum datum, Content content) + throws ScoringFilterException; + + /** + * Currently a part of score distribution is performed using only data coming + * from the parsing process. We need this method in order to ensure the + * presence of score data in these steps. + * + * @param url + * page url + * @param content + * original content. NOTE: modifications to this value are not + * persisted. + * @param parse + * target instance to copy the score information to. Implementations + * may modify this in-place, primarily by setting some metadata + * properties. + */ + public void passScoreAfterParsing(Text url, Content content, Parse parse) + throws ScoringFilterException; + + /** + * Distribute score value from the current page to all its outlinked pages. + * + * @param fromUrl + * url of the source page + * @param parseData + * ParseData instance, which stores relevant score value(s) in its + * metadata. NOTE: filters may modify this in-place, all changes will + * be persisted. + * @param targets + * <url, CrawlDatum> pairs. NOTE: filters can modify this + * in-place, all changes will be persisted. + * @param adjust + * a CrawlDatum instance, initially null, which implementations may + * use to pass adjustment values to the original CrawlDatum. When + * creating this instance, set its status to + * {@link CrawlDatum#STATUS_LINKED}. + * @param allCount + * number of all collected outlinks from the source page + * @return if needed, implementations may return an instance of CrawlDatum, + * with status {@link CrawlDatum#STATUS_LINKED}, which contains + * adjustments to be applied to the original CrawlDatum score(s) and + * metadata. This can be null if not needed. + * @throws ScoringFilterException + */ + public CrawlDatum distributeScoreToOutlinks(Text fromUrl, + ParseData parseData, Collection<Entry<Text, CrawlDatum>> targets, + CrawlDatum adjust, int allCount) throws ScoringFilterException; + + /** + * This method calculates a new score of CrawlDatum during CrawlDb update, + * based on the initial value of the original CrawlDatum, and also score + * values contributed by inlinked pages. + * + * @param url + * url of the page + * @param old + * original datum, with original score. May be null if this is a + * newly discovered page. If not null, filters should use score + * values from this parameter as the starting values - the + * <code>datum</code> parameter may contain values that are no longer + * valid, if other updates occured between generation and this + * update. + * @param datum + * the new datum, with the original score saved at the time when + * fetchlist was generated. Filters should update this in-place, and + * it will be saved in the crawldb. + * @param inlinked + * (partial) list of CrawlDatum-s (with their scores) from links + * pointing to this page, found in the current update batch. + * @throws ScoringFilterException + */ + public void updateDbScore(Text url, CrawlDatum old, CrawlDatum datum, + List<CrawlDatum> inlinked) throws ScoringFilterException; + + /** + * This method calculates a Lucene document boost. + * + * @param url + * url of the page + * @param doc + * Lucene document. NOTE: this already contains all information + * collected by indexing filters. Implementations may modify this + * instance, in order to store/remove some information. + * @param dbDatum + * current page from CrawlDb. NOTE: changes made to this instance are + * not persisted. + * @param fetchDatum + * datum from FetcherOutput (containing among others the fetching + * status) + * @param parse + * parsing result. NOTE: changes made to this instance are not + * persisted. + * @param inlinks + * current inlinks from LinkDb. NOTE: changes made to this instance + * are not persisted. + * @param initScore + * initial boost value for the Lucene document. + * @return boost value for the Lucene document. This value is passed as an + * argument to the next scoring filter in chain. NOTE: implementations + * may also express other scoring strategies by modifying Lucene + * document directly. + * @throws ScoringFilterException + */ + public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum, + CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) + throws ScoringFilterException; +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilterException.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilterException.java b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilterException.java new file mode 100644 index 0000000..f363c4b --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilterException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.scoring; + +/** + * Specialized exception for errors during scoring. + * + * @author Andrzej Bialecki + */ +@SuppressWarnings("serial") +public class ScoringFilterException extends Exception { + + public ScoringFilterException() { + super(); + } + + public ScoringFilterException(String message) { + super(message); + } + + public ScoringFilterException(String message, Throwable cause) { + super(message, cause); + } + + public ScoringFilterException(Throwable cause) { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilters.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilters.java b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilters.java new file mode 100644 index 0000000..5bad78f --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilters.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.scoring; + +import java.util.Collection; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.Text; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.Inlinks; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.parse.Parse; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.plugin.PluginRepository; +import org.apache.nutch.protocol.Content; + +/** + * Creates and caches {@link ScoringFilter} implementing plugins. + * + * @author Andrzej Bialecki + */ +public class ScoringFilters extends Configured implements ScoringFilter { + + private ScoringFilter[] filters; + + public ScoringFilters(Configuration conf) { + super(conf); + this.filters = (ScoringFilter[]) PluginRepository.get(conf) + .getOrderedPlugins(ScoringFilter.class, ScoringFilter.X_POINT_ID, + "scoring.filter.order"); + } + + /** Calculate a sort value for Generate. */ + public float generatorSortValue(Text url, CrawlDatum datum, float initSort) + throws ScoringFilterException { + for (int i = 0; i < this.filters.length; i++) { + initSort = this.filters[i].generatorSortValue(url, datum, initSort); + } + return initSort; + } + + /** Calculate a new initial score, used when adding newly discovered pages. */ + public void initialScore(Text url, CrawlDatum datum) + throws ScoringFilterException { + for (int i = 0; i < this.filters.length; i++) { + this.filters[i].initialScore(url, datum); + } + } + + /** Calculate a new initial score, used when injecting new pages. */ + public void injectedScore(Text url, CrawlDatum datum) + throws ScoringFilterException { + for (int i = 0; i < this.filters.length; i++) { + this.filters[i].injectedScore(url, datum); + } + } + + /** Calculate updated page score during CrawlDb.update(). */ + public void updateDbScore(Text url, CrawlDatum old, CrawlDatum datum, + List<CrawlDatum> inlinked) throws ScoringFilterException { + for (int i = 0; i < this.filters.length; i++) { + this.filters[i].updateDbScore(url, old, datum, inlinked); + } + } + + public void passScoreBeforeParsing(Text url, CrawlDatum datum, Content content) + throws ScoringFilterException { + for (int i = 0; i < this.filters.length; i++) { + this.filters[i].passScoreBeforeParsing(url, datum, content); + } + } + + public void passScoreAfterParsing(Text url, Content content, Parse parse) + throws ScoringFilterException { + for (int i = 0; i < this.filters.length; i++) { + this.filters[i].passScoreAfterParsing(url, content, parse); + } + } + + public CrawlDatum distributeScoreToOutlinks(Text fromUrl, + ParseData parseData, Collection<Entry<Text, CrawlDatum>> targets, + CrawlDatum adjust, int allCount) throws ScoringFilterException { + for (int i = 0; i < this.filters.length; i++) { + adjust = this.filters[i].distributeScoreToOutlinks(fromUrl, parseData, + targets, adjust, allCount); + } + return adjust; + } + + public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum, + CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) + throws ScoringFilterException { + for (int i = 0; i < this.filters.length; i++) { + initScore = this.filters[i].indexerScore(url, doc, dbDatum, fetchDatum, + parse, inlinks, initScore); + } + return initScore; + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/package-info.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/package-info.java b/nutch-core/src/main/java/org/apache/nutch/scoring/package-info.java new file mode 100644 index 0000000..b6a578b --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/scoring/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The {@link org.apache.nutch.scoring.ScoringFilter ScoringFilter} interface. + */ +package org.apache.nutch.scoring; + http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDatum.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDatum.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDatum.java new file mode 100644 index 0000000..67c9366 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDatum.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.scoring.webgraph; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * A class for holding link information including the url, anchor text, a score, + * the timestamp of the link and a link type. + */ +public class LinkDatum implements Writable { + + public final static byte INLINK = 1; + public final static byte OUTLINK = 2; + + private String url = null; + private String anchor = ""; + private float score = 0.0f; + private long timestamp = 0L; + private byte linkType = 0; + + /** + * Default constructor, no url, timestamp, score, or link type. + */ + public LinkDatum() { + + } + + /** + * Creates a LinkDatum with a given url. Timestamp is set to current time. + * + * @param url + * The link url. + */ + public LinkDatum(String url) { + this(url, "", System.currentTimeMillis()); + } + + /** + * Creates a LinkDatum with a url and an anchor text. Timestamp is set to + * current time. + * + * @param url + * The link url. + * @param anchor + * The link anchor text. + */ + public LinkDatum(String url, String anchor) { + this(url, anchor, System.currentTimeMillis()); + } + + public LinkDatum(String url, String anchor, long timestamp) { + this.url = url; + this.anchor = anchor; + this.timestamp = timestamp; + } + + public String getUrl() { + return url; + } + + public String getAnchor() { + return anchor; + } + + public void setAnchor(String anchor) { + this.anchor = anchor; + } + + public float getScore() { + return score; + } + + public void setScore(float score) { + this.score = score; + } + + public void setUrl(String url) { + this.url = url; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public byte getLinkType() { + return linkType; + } + + public void setLinkType(byte linkType) { + this.linkType = linkType; + } + + public void readFields(DataInput in) throws IOException { + url = Text.readString(in); + anchor = Text.readString(in); + score = in.readFloat(); + timestamp = in.readLong(); + linkType = in.readByte(); + } + + public void write(DataOutput out) throws IOException { + Text.writeString(out, url); + Text.writeString(out, anchor != null ? anchor : ""); + out.writeFloat(score); + out.writeLong(timestamp); + out.writeByte(linkType); + } + + public String toString() { + + String type = (linkType == INLINK ? "inlink" + : (linkType == OUTLINK) ? "outlink" : "unknown"); + return "url: " + url + ", anchor: " + anchor + ", score: " + score + + ", timestamp: " + timestamp + ", link type: " + type; + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDumper.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDumper.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDumper.java new file mode 100644 index 0000000..1569c4d --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDumper.java @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.scoring.webgraph; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapFileOutputFormat; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.lib.HashPartitioner; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.util.FSUtils; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.TimingUtil; + +/** + * The LinkDumper tool creates a database of node to inlink information that can + * be read using the nested Reader class. This allows the inlink and scoring + * state of a single url to be reviewed quickly to determine why a given url is + * ranking a certain way. This tool is to be used with the LinkRank analysis. + */ +public class LinkDumper extends Configured implements Tool { + + public static final Logger LOG = LoggerFactory.getLogger(LinkDumper.class); + public static final String DUMP_DIR = "linkdump"; + + /** + * Reader class which will print out the url and all of its inlinks to system + * out. Each inlinkwill be displayed with its node information including score + * and number of in and outlinks. + */ + public static class Reader { + + public static void main(String[] args) throws Exception { + + if (args == null || args.length < 2) { + System.out.println("LinkDumper$Reader usage: <webgraphdb> <url>"); + return; + } + + // open the readers for the linkdump directory + Configuration conf = NutchConfiguration.create(); + FileSystem fs = FileSystem.get(conf); + Path webGraphDb = new Path(args[0]); + String url = args[1]; + MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path( + webGraphDb, DUMP_DIR), conf); + + // get the link nodes for the url + Text key = new Text(url); + LinkNodes nodes = new LinkNodes(); + MapFileOutputFormat.getEntry(readers, + new HashPartitioner<Text, LinkNodes>(), key, nodes); + + // print out the link nodes + LinkNode[] linkNodesAr = nodes.getLinks(); + System.out.println(url + ":"); + for (LinkNode node : linkNodesAr) { + System.out.println(" " + node.getUrl() + " - " + + node.getNode().toString()); + } + + // close the readers + FSUtils.closeReaders(readers); + } + } + + /** + * Bean class which holds url to node information. + */ + public static class LinkNode implements Writable { + + private String url = null; + private Node node = null; + + public LinkNode() { + + } + + public LinkNode(String url, Node node) { + this.url = url; + this.node = node; + } + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public Node getNode() { + return node; + } + + public void setNode(Node node) { + this.node = node; + } + + public void readFields(DataInput in) throws IOException { + url = in.readUTF(); + node = new Node(); + node.readFields(in); + } + + public void write(DataOutput out) throws IOException { + out.writeUTF(url); + node.write(out); + } + + } + + /** + * Writable class which holds an array of LinkNode objects. + */ + public static class LinkNodes implements Writable { + + private LinkNode[] links; + + public LinkNodes() { + + } + + public LinkNodes(LinkNode[] links) { + this.links = links; + } + + public LinkNode[] getLinks() { + return links; + } + + public void setLinks(LinkNode[] links) { + this.links = links; + } + + public void readFields(DataInput in) throws IOException { + int numLinks = in.readInt(); + if (numLinks > 0) { + links = new LinkNode[numLinks]; + for (int i = 0; i < numLinks; i++) { + LinkNode node = new LinkNode(); + node.readFields(in); + links[i] = node; + } + } + } + + public void write(DataOutput out) throws IOException { + if (links != null && links.length > 0) { + int numLinks = links.length; + out.writeInt(numLinks); + for (int i = 0; i < numLinks; i++) { + links[i].write(out); + } + } + } + } + + /** + * Inverts outlinks from the WebGraph to inlinks and attaches node + * information. + */ + public static class Inverter implements + Mapper<Text, Writable, Text, ObjectWritable>, + Reducer<Text, ObjectWritable, Text, LinkNode> { + + private JobConf conf; + + public void configure(JobConf conf) { + this.conf = conf; + } + + /** + * Wraps all values in ObjectWritables. + */ + public void map(Text key, Writable value, + OutputCollector<Text, ObjectWritable> output, Reporter reporter) + throws IOException { + + ObjectWritable objWrite = new ObjectWritable(); + objWrite.set(value); + output.collect(key, objWrite); + } + + /** + * Inverts outlinks to inlinks while attaching node information to the + * outlink. + */ + public void reduce(Text key, Iterator<ObjectWritable> values, + OutputCollector<Text, LinkNode> output, Reporter reporter) + throws IOException { + + String fromUrl = key.toString(); + List<LinkDatum> outlinks = new ArrayList<LinkDatum>(); + Node node = null; + + // loop through all values aggregating outlinks, saving node + while (values.hasNext()) { + ObjectWritable write = values.next(); + Object obj = write.get(); + if (obj instanceof Node) { + node = (Node) obj; + } else if (obj instanceof LinkDatum) { + outlinks.add(WritableUtils.clone((LinkDatum) obj, conf)); + } + } + + // only collect if there are outlinks + int numOutlinks = node.getNumOutlinks(); + if (numOutlinks > 0) { + for (int i = 0; i < outlinks.size(); i++) { + LinkDatum outlink = outlinks.get(i); + String toUrl = outlink.getUrl(); + + // collect the outlink as an inlink with the node + output.collect(new Text(toUrl), new LinkNode(fromUrl, node)); + } + } + } + + public void close() { + } + } + + /** + * Merges LinkNode objects into a single array value per url. This allows all + * values to be quickly retrieved and printed via the Reader tool. + */ + public static class Merger implements + Reducer<Text, LinkNode, Text, LinkNodes> { + + private JobConf conf; + private int maxInlinks = 50000; + + public void configure(JobConf conf) { + this.conf = conf; + } + + /** + * Aggregate all LinkNode objects for a given url. + */ + public void reduce(Text key, Iterator<LinkNode> values, + OutputCollector<Text, LinkNodes> output, Reporter reporter) + throws IOException { + + List<LinkNode> nodeList = new ArrayList<LinkNode>(); + int numNodes = 0; + + while (values.hasNext()) { + LinkNode cur = values.next(); + if (numNodes < maxInlinks) { + nodeList.add(WritableUtils.clone(cur, conf)); + numNodes++; + } else { + break; + } + } + + LinkNode[] linkNodesAr = nodeList.toArray(new LinkNode[nodeList.size()]); + LinkNodes linkNodes = new LinkNodes(linkNodesAr); + output.collect(key, linkNodes); + } + + public void close() { + + } + } + + /** + * Runs the inverter and merger jobs of the LinkDumper tool to create the url + * to inlink node database. + */ + public void dumpLinks(Path webGraphDb) throws IOException { + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("NodeDumper: starting at " + sdf.format(start)); + Configuration conf = getConf(); + FileSystem fs = FileSystem.get(conf); + + Path linkdump = new Path(webGraphDb, DUMP_DIR); + Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR); + Path outlinkDb = new Path(webGraphDb, WebGraph.OUTLINK_DIR); + + // run the inverter job + Path tempInverted = new Path(webGraphDb, "inverted-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + JobConf inverter = new NutchJob(conf); + inverter.setJobName("LinkDumper: inverter"); + FileInputFormat.addInputPath(inverter, nodeDb); + FileInputFormat.addInputPath(inverter, outlinkDb); + inverter.setInputFormat(SequenceFileInputFormat.class); + inverter.setMapperClass(Inverter.class); + inverter.setReducerClass(Inverter.class); + inverter.setMapOutputKeyClass(Text.class); + inverter.setMapOutputValueClass(ObjectWritable.class); + inverter.setOutputKeyClass(Text.class); + inverter.setOutputValueClass(LinkNode.class); + FileOutputFormat.setOutputPath(inverter, tempInverted); + inverter.setOutputFormat(SequenceFileOutputFormat.class); + + try { + LOG.info("LinkDumper: running inverter"); + JobClient.runJob(inverter); + LOG.info("LinkDumper: finished inverter"); + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + throw e; + } + + // run the merger job + JobConf merger = new NutchJob(conf); + merger.setJobName("LinkDumper: merger"); + FileInputFormat.addInputPath(merger, tempInverted); + merger.setInputFormat(SequenceFileInputFormat.class); + merger.setReducerClass(Merger.class); + merger.setMapOutputKeyClass(Text.class); + merger.setMapOutputValueClass(LinkNode.class); + merger.setOutputKeyClass(Text.class); + merger.setOutputValueClass(LinkNodes.class); + FileOutputFormat.setOutputPath(merger, linkdump); + merger.setOutputFormat(MapFileOutputFormat.class); + + try { + LOG.info("LinkDumper: running merger"); + JobClient.runJob(merger); + LOG.info("LinkDumper: finished merger"); + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + throw e; + } + + fs.delete(tempInverted, true); + long end = System.currentTimeMillis(); + LOG.info("LinkDumper: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new LinkDumper(), + args); + System.exit(res); + } + + /** + * Runs the LinkDumper tool. This simply creates the database, to read the + * values the nested Reader tool must be used. + */ + public int run(String[] args) throws Exception { + + Options options = new Options(); + OptionBuilder.withArgName("help"); + OptionBuilder.withDescription("show this help message"); + Option helpOpts = OptionBuilder.create("help"); + options.addOption(helpOpts); + + OptionBuilder.withArgName("webgraphdb"); + OptionBuilder.hasArg(); + OptionBuilder.withDescription("the web graph database to use"); + Option webGraphDbOpts = OptionBuilder.create("webgraphdb"); + options.addOption(webGraphDbOpts); + + CommandLineParser parser = new GnuParser(); + try { + + CommandLine line = parser.parse(options, args); + if (line.hasOption("help") || !line.hasOption("webgraphdb")) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("LinkDumper", options); + return -1; + } + + String webGraphDb = line.getOptionValue("webgraphdb"); + dumpLinks(new Path(webGraphDb)); + return 0; + } catch (Exception e) { + LOG.error("LinkDumper: " + StringUtils.stringifyException(e)); + return -2; + } + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkRank.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkRank.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkRank.java new file mode 100644 index 0000000..bd22828 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkRank.java @@ -0,0 +1,677 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.scoring.webgraph; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapFileOutputFormat; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.util.FSUtils; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.TimingUtil; +import org.apache.nutch.util.URLUtil; + +public class LinkRank extends Configured implements Tool { + + public static final Logger LOG = LoggerFactory.getLogger(LinkRank.class); + private static final String NUM_NODES = "_num_nodes_"; + + /** + * Runs the counter job. The counter job determines the number of links in the + * webgraph. This is used during analysis. + * + * @param fs + * The job file system. + * @param webGraphDb + * The web graph database to use. + * + * @return The number of nodes in the web graph. + * @throws IOException + * If an error occurs while running the counter job. + */ + private int runCounter(FileSystem fs, Path webGraphDb) throws IOException { + + // configure the counter job + Path numLinksPath = new Path(webGraphDb, NUM_NODES); + Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR); + JobConf counter = new NutchJob(getConf()); + counter.setJobName("LinkRank Counter"); + FileInputFormat.addInputPath(counter, nodeDb); + FileOutputFormat.setOutputPath(counter, numLinksPath); + counter.setInputFormat(SequenceFileInputFormat.class); + counter.setMapperClass(Counter.class); + counter.setCombinerClass(Counter.class); + counter.setReducerClass(Counter.class); + counter.setMapOutputKeyClass(Text.class); + counter.setMapOutputValueClass(LongWritable.class); + counter.setOutputKeyClass(Text.class); + counter.setOutputValueClass(LongWritable.class); + counter.setNumReduceTasks(1); + counter.setOutputFormat(TextOutputFormat.class); + counter.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", + false); + + // run the counter job, outputs to a single reduce task and file + LOG.info("Starting link counter job"); + try { + JobClient.runJob(counter); + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + throw e; + } + LOG.info("Finished link counter job"); + + // read the first (and only) line from the file which should be the + // number of links in the web graph + LOG.info("Reading numlinks temp file"); + FSDataInputStream readLinks = fs.open(new Path(numLinksPath, "part-00000")); + BufferedReader buffer = new BufferedReader(new InputStreamReader(readLinks)); + String numLinksLine = buffer.readLine(); + readLinks.close(); + + // check if there are links to process, if none, webgraph might be empty + if (numLinksLine == null || numLinksLine.length() == 0) { + fs.delete(numLinksPath, true); + throw new IOException("No links to process, is the webgraph empty?"); + } + + // delete temp file and convert and return the number of links as an int + LOG.info("Deleting numlinks temp file"); + fs.delete(numLinksPath, true); + String numLinks = numLinksLine.split("\\s+")[1]; + return Integer.parseInt(numLinks); + } + + /** + * Runs the initializer job. The initializer job sets up the nodes with a + * default starting score for link analysis. + * + * @param nodeDb + * The node database to use. + * @param output + * The job output directory. + * + * @throws IOException + * If an error occurs while running the initializer job. + */ + private void runInitializer(Path nodeDb, Path output) throws IOException { + + // configure the initializer + JobConf initializer = new NutchJob(getConf()); + initializer.setJobName("LinkAnalysis Initializer"); + FileInputFormat.addInputPath(initializer, nodeDb); + FileOutputFormat.setOutputPath(initializer, output); + initializer.setInputFormat(SequenceFileInputFormat.class); + initializer.setMapperClass(Initializer.class); + initializer.setMapOutputKeyClass(Text.class); + initializer.setMapOutputValueClass(Node.class); + initializer.setOutputKeyClass(Text.class); + initializer.setOutputValueClass(Node.class); + initializer.setOutputFormat(MapFileOutputFormat.class); + initializer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", + false); + + // run the initializer + LOG.info("Starting initialization job"); + try { + JobClient.runJob(initializer); + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + throw e; + } + LOG.info("Finished initialization job."); + } + + /** + * Runs the inverter job. The inverter job flips outlinks to inlinks to be + * passed into the analysis job. + * + * @param nodeDb + * The node database to use. + * @param outlinkDb + * The outlink database to use. + * @param output + * The output directory. + * + * @throws IOException + * If an error occurs while running the inverter job. + */ + private void runInverter(Path nodeDb, Path outlinkDb, Path output) + throws IOException { + + // configure the inverter + JobConf inverter = new NutchJob(getConf()); + inverter.setJobName("LinkAnalysis Inverter"); + FileInputFormat.addInputPath(inverter, nodeDb); + FileInputFormat.addInputPath(inverter, outlinkDb); + FileOutputFormat.setOutputPath(inverter, output); + inverter.setInputFormat(SequenceFileInputFormat.class); + inverter.setMapperClass(Inverter.class); + inverter.setReducerClass(Inverter.class); + inverter.setMapOutputKeyClass(Text.class); + inverter.setMapOutputValueClass(ObjectWritable.class); + inverter.setOutputKeyClass(Text.class); + inverter.setOutputValueClass(LinkDatum.class); + inverter.setOutputFormat(SequenceFileOutputFormat.class); + inverter.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", + false); + + // run the inverter job + LOG.info("Starting inverter job"); + try { + JobClient.runJob(inverter); + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + throw e; + } + LOG.info("Finished inverter job."); + } + + /** + * Runs the link analysis job. The link analysis job applies the link rank + * formula to create a score per url and stores that score in the NodeDb. + * + * Typically the link analysis job is run a number of times to allow the link + * rank scores to converge. + * + * @param nodeDb + * The node database from which we are getting previous link rank + * scores. + * @param inverted + * The inverted inlinks + * @param output + * The link analysis output. + * @param iteration + * The current iteration number. + * @param numIterations + * The total number of link analysis iterations + * + * @throws IOException + * If an error occurs during link analysis. + */ + private void runAnalysis(Path nodeDb, Path inverted, Path output, + int iteration, int numIterations, float rankOne) throws IOException { + + JobConf analyzer = new NutchJob(getConf()); + analyzer.set("link.analyze.iteration", String.valueOf(iteration + 1)); + analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (iteration + 1) + + " of " + numIterations); + FileInputFormat.addInputPath(analyzer, nodeDb); + FileInputFormat.addInputPath(analyzer, inverted); + FileOutputFormat.setOutputPath(analyzer, output); + analyzer.set("link.analyze.rank.one", String.valueOf(rankOne)); + analyzer.setMapOutputKeyClass(Text.class); + analyzer.setMapOutputValueClass(ObjectWritable.class); + analyzer.setInputFormat(SequenceFileInputFormat.class); + analyzer.setMapperClass(Analyzer.class); + analyzer.setReducerClass(Analyzer.class); + analyzer.setOutputKeyClass(Text.class); + analyzer.setOutputValueClass(Node.class); + analyzer.setOutputFormat(MapFileOutputFormat.class); + analyzer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", + false); + + LOG.info("Starting analysis job"); + try { + JobClient.runJob(analyzer); + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + throw e; + } + LOG.info("Finished analysis job."); + } + + /** + * The Counter job that determines the total number of nodes in the WebGraph. + * This is used to determine a rank one score for pages with zero inlinks but + * that contain outlinks. + */ + private static class Counter implements + Mapper<Text, Node, Text, LongWritable>, + Reducer<Text, LongWritable, Text, LongWritable> { + + private static Text numNodes = new Text(NUM_NODES); + private static LongWritable one = new LongWritable(1L); + + public void configure(JobConf conf) { + } + + /** + * Outputs one for every node. + */ + public void map(Text key, Node value, + OutputCollector<Text, LongWritable> output, Reporter reporter) + throws IOException { + output.collect(numNodes, one); + } + + /** + * Totals the node number and outputs a single total value. + */ + public void reduce(Text key, Iterator<LongWritable> values, + OutputCollector<Text, LongWritable> output, Reporter reporter) + throws IOException { + + long total = 0; + while (values.hasNext()) { + total += values.next().get(); + } + output.collect(numNodes, new LongWritable(total)); + } + + public void close() { + } + } + + private static class Initializer implements Mapper<Text, Node, Text, Node> { + + private JobConf conf; + private float initialScore = 1.0f; + + public void configure(JobConf conf) { + this.conf = conf; + initialScore = conf.getFloat("link.analyze.initial.score", 1.0f); + } + + public void map(Text key, Node node, OutputCollector<Text, Node> output, + Reporter reporter) throws IOException { + + String url = key.toString(); + Node outNode = WritableUtils.clone(node, conf); + outNode.setInlinkScore(initialScore); + + output.collect(new Text(url), outNode); + } + + public void close() { + } + } + + /** + * Inverts outlinks and attaches current score from the NodeDb of the + * WebGraph. The link analysis process consists of inverting, analyzing and + * scoring, in a loop for a given number of iterations. + */ + private static class Inverter implements + Mapper<Text, Writable, Text, ObjectWritable>, + Reducer<Text, ObjectWritable, Text, LinkDatum> { + + private JobConf conf; + + public void configure(JobConf conf) { + this.conf = conf; + } + + /** + * Convert values to ObjectWritable + */ + public void map(Text key, Writable value, + OutputCollector<Text, ObjectWritable> output, Reporter reporter) + throws IOException { + + ObjectWritable objWrite = new ObjectWritable(); + objWrite.set(value); + output.collect(key, objWrite); + } + + /** + * Inverts outlinks to inlinks, attaches current score for the outlink from + * the NodeDb of the WebGraph. + */ + public void reduce(Text key, Iterator<ObjectWritable> values, + OutputCollector<Text, LinkDatum> output, Reporter reporter) + throws IOException { + + String fromUrl = key.toString(); + List<LinkDatum> outlinks = new ArrayList<LinkDatum>(); + Node node = null; + + // aggregate outlinks, assign other values + while (values.hasNext()) { + ObjectWritable write = values.next(); + Object obj = write.get(); + if (obj instanceof Node) { + node = (Node) obj; + } else if (obj instanceof LinkDatum) { + outlinks.add(WritableUtils.clone((LinkDatum) obj, conf)); + } + } + + // get the number of outlinks and the current inlink and outlink scores + // from the node of the url + int numOutlinks = node.getNumOutlinks(); + float inlinkScore = node.getInlinkScore(); + float outlinkScore = node.getOutlinkScore(); + LOG.debug(fromUrl + ": num outlinks " + numOutlinks); + + // can't invert if no outlinks + if (numOutlinks > 0) { + for (int i = 0; i < outlinks.size(); i++) { + LinkDatum outlink = outlinks.get(i); + String toUrl = outlink.getUrl(); + + outlink.setUrl(fromUrl); + outlink.setScore(outlinkScore); + + // collect the inverted outlink + output.collect(new Text(toUrl), outlink); + LOG.debug(toUrl + ": inverting inlink from " + fromUrl + + " origscore: " + inlinkScore + " numOutlinks: " + numOutlinks + + " inlinkscore: " + outlinkScore); + } + } + } + + public void close() { + } + } + + /** + * Runs a single link analysis iteration. + */ + private static class Analyzer implements + Mapper<Text, Writable, Text, ObjectWritable>, + Reducer<Text, ObjectWritable, Text, Node> { + + private JobConf conf; + private float dampingFactor = 0.85f; + private float rankOne = 0.0f; + private int itNum = 0; + private boolean limitPages = true; + private boolean limitDomains = true; + + /** + * Configures the job, sets the damping factor, rank one score, and other + * needed values for analysis. + */ + public void configure(JobConf conf) { + + try { + this.conf = conf; + this.dampingFactor = conf + .getFloat("link.analyze.damping.factor", 0.85f); + this.rankOne = conf.getFloat("link.analyze.rank.one", 0.0f); + this.itNum = conf.getInt("link.analyze.iteration", 0); + limitPages = conf.getBoolean("link.ignore.limit.page", true); + limitDomains = conf.getBoolean("link.ignore.limit.domain", true); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + throw new IllegalArgumentException(e); + } + } + + /** + * Convert values to ObjectWritable + */ + public void map(Text key, Writable value, + OutputCollector<Text, ObjectWritable> output, Reporter reporter) + throws IOException { + + ObjectWritable objWrite = new ObjectWritable(); + objWrite.set(WritableUtils.clone(value, conf)); + output.collect(key, objWrite); + } + + /** + * Performs a single iteration of link analysis. The resulting scores are + * stored in a temporary NodeDb which replaces the NodeDb of the WebGraph. + */ + public void reduce(Text key, Iterator<ObjectWritable> values, + OutputCollector<Text, Node> output, Reporter reporter) + throws IOException { + + String url = key.toString(); + Set<String> domains = new HashSet<String>(); + Set<String> pages = new HashSet<String>(); + Node node = null; + + // a page with zero inlinks has a score of rankOne + int numInlinks = 0; + float totalInlinkScore = rankOne; + + while (values.hasNext()) { + + ObjectWritable next = values.next(); + Object value = next.get(); + if (value instanceof Node) { + node = (Node) value; + } else if (value instanceof LinkDatum) { + + LinkDatum linkDatum = (LinkDatum) value; + float scoreFromInlink = linkDatum.getScore(); + String inlinkUrl = linkDatum.getUrl(); + String inLinkDomain = URLUtil.getDomainName(inlinkUrl); + String inLinkPage = URLUtil.getPage(inlinkUrl); + + // limit counting duplicate inlinks by pages or domains + if ((limitPages && pages.contains(inLinkPage)) + || (limitDomains && domains.contains(inLinkDomain))) { + LOG.debug(url + ": ignoring " + scoreFromInlink + " from " + + inlinkUrl + ", duplicate page or domain"); + continue; + } + + // aggregate total inlink score + numInlinks++; + totalInlinkScore += scoreFromInlink; + domains.add(inLinkDomain); + pages.add(inLinkPage); + LOG.debug(url + ": adding " + scoreFromInlink + " from " + inlinkUrl + + ", total: " + totalInlinkScore); + } + } + + // calculate linkRank score formula + float linkRankScore = (1 - this.dampingFactor) + + (this.dampingFactor * totalInlinkScore); + + LOG.debug(url + ": score: " + linkRankScore + " num inlinks: " + + numInlinks + " iteration: " + itNum); + + // store the score in a temporary NodeDb + Node outNode = WritableUtils.clone(node, conf); + outNode.setInlinkScore(linkRankScore); + output.collect(key, outNode); + } + + public void close() throws IOException { + } + } + + /** + * Default constructor. + */ + public LinkRank() { + super(); + } + + /** + * Configurable constructor. + */ + public LinkRank(Configuration conf) { + super(conf); + } + + public void close() { + } + + /** + * Runs the complete link analysis job. The complete job determins rank one + * score. Then runs through a given number of invert and analyze iterations, + * by default 10. And finally replaces the NodeDb in the WebGraph with the + * link rank output. + * + * @param webGraphDb + * The WebGraph to run link analysis on. + * + * @throws IOException + * If an error occurs during link analysis. + */ + public void analyze(Path webGraphDb) throws IOException { + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("Analysis: starting at " + sdf.format(start)); + + // store the link rank under the webgraphdb temporarily, final scores get + // upddated into the nodedb + Path linkRank = new Path(webGraphDb, "linkrank"); + Configuration conf = getConf(); + FileSystem fs = FileSystem.get(conf); + + // create the linkrank directory if needed + if (!fs.exists(linkRank)) { + fs.mkdirs(linkRank); + } + + // the webgraph outlink and node database paths + Path wgOutlinkDb = new Path(webGraphDb, WebGraph.OUTLINK_DIR); + Path wgNodeDb = new Path(webGraphDb, WebGraph.NODE_DIR); + Path nodeDb = new Path(linkRank, WebGraph.NODE_DIR); + + // get the number of total nodes in the webgraph, used for rank one, then + // initialze all urls with a default score + int numLinks = runCounter(fs, webGraphDb); + runInitializer(wgNodeDb, nodeDb); + float rankOneScore = (1f / (float) numLinks); + + if (LOG.isInfoEnabled()) { + LOG.info("Analysis: Number of links: " + numLinks); + LOG.info("Analysis: Rank One: " + rankOneScore); + } + + // run invert and analysis for a given number of iterations to allow the + // link rank scores to converge + int numIterations = conf.getInt("link.analyze.num.iterations", 10); + for (int i = 0; i < numIterations; i++) { + + // the input to inverting is always the previous output from analysis + LOG.info("Analysis: Starting iteration " + (i + 1) + " of " + + numIterations); + Path tempRank = new Path(linkRank + "-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + fs.mkdirs(tempRank); + Path tempInverted = new Path(tempRank, "inverted"); + Path tempNodeDb = new Path(tempRank, WebGraph.NODE_DIR); + + // run invert and analysis + runInverter(nodeDb, wgOutlinkDb, tempInverted); + runAnalysis(nodeDb, tempInverted, tempNodeDb, i, numIterations, + rankOneScore); + + // replace the temporary NodeDb with the output from analysis + LOG.info("Analysis: Installing new link scores"); + FSUtils.replace(fs, linkRank, tempRank, true); + LOG.info("Analysis: finished iteration " + (i + 1) + " of " + + numIterations); + } + + // replace the NodeDb in the WebGraph with the final output of analysis + LOG.info("Analysis: Installing web graph nodes"); + FSUtils.replace(fs, wgNodeDb, nodeDb, true); + + // remove the temporary link rank folder + fs.delete(linkRank, true); + long end = System.currentTimeMillis(); + LOG.info("Analysis: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new LinkRank(), args); + System.exit(res); + } + + /** + * Runs the LinkRank tool. + */ + public int run(String[] args) throws Exception { + + Options options = new Options(); + OptionBuilder.withArgName("help"); + OptionBuilder.withDescription("show this help message"); + Option helpOpts = OptionBuilder.create("help"); + options.addOption(helpOpts); + + OptionBuilder.withArgName("webgraphdb"); + OptionBuilder.hasArg(); + OptionBuilder.withDescription("the web graph db to use"); + Option webgraphOpts = OptionBuilder.create("webgraphdb"); + options.addOption(webgraphOpts); + + CommandLineParser parser = new GnuParser(); + try { + + CommandLine line = parser.parse(options, args); + if (line.hasOption("help") || !line.hasOption("webgraphdb")) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("LinkRank", options); + return -1; + } + + String webGraphDb = line.getOptionValue("webgraphdb"); + + analyze(new Path(webGraphDb)); + return 0; + } catch (Exception e) { + LOG.error("LinkAnalysis: " + StringUtils.stringifyException(e)); + return -2; + } + } +}
