http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDb.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDb.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDb.java new file mode 100644 index 0000000..3ba3c81 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDb.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.hostdb; + +import java.text.SimpleDateFormat; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.KeyValueTextInputFormat; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.lib.MultipleInputs; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.CrawlDb; +import org.apache.nutch.crawl.NutchWritable; +import org.apache.nutch.util.FSUtils; +import org.apache.nutch.util.LockUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.TimingUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tool to create a HostDB from the CrawlDB. It aggregates fetch status values + * by host and checks DNS entries for hosts. + */ +public class UpdateHostDb extends Configured implements Tool { + + public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDb.class); + public static final String LOCK_NAME = ".locked"; + + public static final String HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD = "hostdb.purge.failed.hosts.threshold"; + public static final String HOSTDB_NUM_RESOLVER_THREADS = "hostdb.num.resolvers.threads"; + public static final String HOSTDB_RECHECK_INTERVAL = "hostdb.recheck.interval"; + public static final String HOSTDB_CHECK_FAILED = "hostdb.check.failed"; + public static final String HOSTDB_CHECK_NEW = "hostdb.check.new"; + public static final String HOSTDB_CHECK_KNOWN = "hostdb.check.known"; + public static final String HOSTDB_FORCE_CHECK = "hostdb.force.check"; + public static final String HOSTDB_URL_FILTERING = "hostdb.url.filter"; + public static final String HOSTDB_URL_NORMALIZING = "hostdb.url.normalize"; + public static final String HOSTDB_NUMERIC_FIELDS = "hostdb.numeric.fields"; + public static final String HOSTDB_STRING_FIELDS = "hostdb.string.fields"; + public static final String HOSTDB_PERCENTILES = "hostdb.percentiles"; + + private void updateHostDb(Path hostDb, Path crawlDb, Path topHosts, + boolean checkFailed, boolean checkNew, boolean checkKnown, + boolean force, boolean filter, boolean normalize) throws Exception { + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("UpdateHostDb: starting at " + sdf.format(start)); + + JobConf job = new NutchJob(getConf()); + boolean preserveBackup = job.getBoolean("db.preserve.backup", true); + job.setJarByClass(UpdateHostDb.class); + job.setJobName("UpdateHostDb"); + + // Check whether the urlfilter-domainblacklist plugin is loaded + if (filter && new String("urlfilter-domainblacklist").matches(job.get("plugin.includes"))) { + throw new Exception("domainblacklist-urlfilter must not be enabled"); + } + + // Check whether the urlnormalizer-host plugin is loaded + if (normalize && new String("urlnormalizer-host").matches(job.get("plugin.includes"))) { + throw new Exception("urlnormalizer-host must not be enabled"); + } + + FileSystem fs = FileSystem.get(job); + Path old = new Path(hostDb, "old"); + Path current = new Path(hostDb, "current"); + Path tempHostDb = new Path(hostDb, "hostdb-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + // lock an existing hostdb to prevent multiple simultaneous updates + Path lock = new Path(hostDb, LOCK_NAME); + if (!fs.exists(current)) { + fs.mkdirs(current); + } + LockUtil.createLockFile(fs, lock, false); + + MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class); + + if (topHosts != null) { + MultipleInputs.addInputPath(job, topHosts, KeyValueTextInputFormat.class); + } + if (crawlDb != null) { + // Tell the job we read from CrawlDB + job.setBoolean("hostdb.reading.crawldb", true); + MultipleInputs.addInputPath(job, new Path(crawlDb, + CrawlDb.CURRENT_NAME), SequenceFileInputFormat.class); + } + + FileOutputFormat.setOutputPath(job, tempHostDb); + + job.setOutputFormat(SequenceFileOutputFormat.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(NutchWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(HostDatum.class); + job.setMapperClass(UpdateHostDbMapper.class); + job.setReducerClass(UpdateHostDbReducer.class); + + job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); + job.setSpeculativeExecution(false); + job.setBoolean(HOSTDB_CHECK_FAILED, checkFailed); + job.setBoolean(HOSTDB_CHECK_NEW, checkNew); + job.setBoolean(HOSTDB_CHECK_KNOWN, checkKnown); + job.setBoolean(HOSTDB_FORCE_CHECK, force); + job.setBoolean(HOSTDB_URL_FILTERING, filter); + job.setBoolean(HOSTDB_URL_NORMALIZING, normalize); + job.setClassLoader(Thread.currentThread().getContextClassLoader()); + + try { + JobClient.runJob(job); + + FSUtils.replace(fs, old, current, true); + FSUtils.replace(fs, current, tempHostDb, true); + + if (!preserveBackup && fs.exists(old)) fs.delete(old, true); + } catch (Exception e) { + if (fs.exists(tempHostDb)) { + fs.delete(tempHostDb, true); + } + LockUtil.removeLockFile(fs, lock); + throw e; + } + + LockUtil.removeLockFile(fs, lock); + long end = System.currentTimeMillis(); + LOG.info("UpdateHostDb: finished at " + sdf.format(end) + + ", elapsed: " + TimingUtil.elapsedTime(start, end)); + } + + public static void main(String args[]) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new UpdateHostDb(), args); + System.exit(res); + } + + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: UpdateHostDb -hostdb <hostdb> " + + "[-tophosts <tophosts>] [-crawldb <crawldb>] [-checkAll] [-checkFailed]" + + " [-checkNew] [-checkKnown] [-force] [-filter] [-normalize]"); + return -1; + } + + Path hostDb = null; + Path crawlDb = null; + Path topHosts = null; + + boolean checkFailed = false; + boolean checkNew = false; + boolean checkKnown = false; + boolean force = false; + + boolean filter = false; + boolean normalize = false; + + for (int i = 0; i < args.length; i++) { + if (args[i].equals("-hostdb")) { + hostDb = new Path(args[i + 1]); + LOG.info("UpdateHostDb: hostdb: " + hostDb); + i++; + } + if (args[i].equals("-crawldb")) { + crawlDb = new Path(args[i + 1]); + LOG.info("UpdateHostDb: crawldb: " + crawlDb); + i++; + } + if (args[i].equals("-tophosts")) { + topHosts = new Path(args[i + 1]); + LOG.info("UpdateHostDb: tophosts: " + topHosts); + i++; + } + + if (args[i].equals("-checkFailed")) { + LOG.info("UpdateHostDb: checking failed hosts"); + checkFailed = true; + } + if (args[i].equals("-checkNew")) { + LOG.info("UpdateHostDb: checking new hosts"); + checkNew = true; + } + if (args[i].equals("-checkKnown")) { + LOG.info("UpdateHostDb: checking known hosts"); + checkKnown = true; + } + if (args[i].equals("-checkAll")) { + LOG.info("UpdateHostDb: checking all hosts"); + checkFailed = true; + checkNew = true; + checkKnown = true; + } + if (args[i].equals("-force")) { + LOG.info("UpdateHostDb: forced check"); + force = true; + } + if (args[i].equals("-filter")) { + LOG.info("UpdateHostDb: filtering enabled"); + filter = true; + } + if (args[i].equals("-normalize")) { + LOG.info("UpdateHostDb: normalizing enabled"); + normalize = true; + } + } + + if (hostDb == null) { + System.err.println("hostDb is mandatory"); + return -1; + } + + try { + updateHostDb(hostDb, crawlDb, topHosts, checkFailed, checkNew, + checkKnown, force, filter, normalize); + + return 0; + } catch (Exception e) { + LOG.error("UpdateHostDb: " + StringUtils.stringifyException(e)); + return -1; + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java new file mode 100644 index 0000000..5844b04 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.hostdb; + +import java.io.IOException; + + +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.CrawlDb; +import org.apache.nutch.crawl.NutchWritable; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.net.URLNormalizers; +import org.apache.nutch.protocol.ProtocolStatus; +import org.apache.nutch.util.URLUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mapper ingesting HostDB and CrawlDB entries. Additionally it can also read + * host score info from a plain text key/value file generated by the + * Webgraph's NodeDumper tool. + */ +public class UpdateHostDbMapper + implements Mapper<Text, Writable, Text, NutchWritable> { + + public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDbMapper.class); + protected Text host = new Text(); + protected HostDatum hostDatum = null; + protected CrawlDatum crawlDatum = null; + protected String reprUrl = null; + protected String buffer = null; + protected String[] args = null; + protected boolean filter = false; + protected boolean normalize = false; + protected boolean readingCrawlDb = false; + protected URLFilters filters = null; + protected URLNormalizers normalizers = null; + + public void close() {} + + /** + * @param JobConf + * @return void + */ + public void configure(JobConf job) { + readingCrawlDb = job.getBoolean("hostdb.reading.crawldb", false); + filter = job.getBoolean(UpdateHostDb.HOSTDB_URL_FILTERING, false); + normalize = job.getBoolean(UpdateHostDb.HOSTDB_URL_NORMALIZING, false); + + if (filter) + filters = new URLFilters(job); + if (normalize) + normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_DEFAULT); + } + + /** + * Filters and or normalizes the input URL + * + * @param String + * @return String + */ + protected String filterNormalize(String url) { + // We actually receive a hostname here so let's make a URL + // TODO: we force shop.fcgroningen to be https, how do we know that here? + // http://issues.openindex.io/browse/SPIDER-40 + url = "http://" + url + "/"; + + try { + if (normalize) + url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT); + if (filter) + url = filters.filter(url); + if (url == null) + return null; + } catch (Exception e) { + return null; + } + + // Turn back to host + return URLUtil.getHost(url); + } + + /** + * Mapper ingesting records from the HostDB, CrawlDB and plaintext host + * scores file. Statistics and scores are passed on. + * + * @param Text key + * @param Writable value + * @param OutputCollector<Text,NutchWritable> output + * @param Reporter reporter + * @return void + */ + public void map(Text key, Writable value, + OutputCollector<Text,NutchWritable> output, Reporter reporter) + throws IOException { + + // Get the key! + String keyStr = key.toString(); + + // Check if we process records from the CrawlDB + if (key instanceof Text && value instanceof CrawlDatum) { + // Get the normalized and filtered host of this URL + buffer = filterNormalize(URLUtil.getHost(keyStr)); + + // Filtered out? + if (buffer == null) { + reporter.incrCounter("UpdateHostDb", "filtered_records", 1); + LOG.info("UpdateHostDb: " + URLUtil.getHost(keyStr) + " crawldatum has been filtered"); + return; + } + + // Set the host of this URL + host.set(buffer); + crawlDatum = (CrawlDatum)value; + hostDatum = new HostDatum(); + + /** + * TODO: fix multi redirects: host_a => host_b/page => host_c/page/whatever + * http://www.ferienwohnung-armbruster.de/ + * http://www.ferienwohnung-armbruster.de/website/ + * http://www.ferienwohnung-armbruster.de/website/willkommen.php + * + * We cannot reresolve redirects for host objects as CrawlDatum metadata is + * not available. We also cannot reliably use the reducer in all cases + * since redirects may be across hosts or even domains. The example + * above has redirects that will end up in the same reducer. During that + * phase, however, we do not know which URL redirects to the next URL. + */ + // Do not resolve homepages when the root URL is unfetched + if (crawlDatum.getStatus() != CrawlDatum.STATUS_DB_UNFETCHED) { + // Get the protocol + String protocol = URLUtil.getProtocol(keyStr); + + // Get the proposed homepage URL + String homepage = protocol + "://" + buffer + "/"; + + // Check if the current key is equals the host + if (keyStr.equals(homepage)) { + // Check if this is a redirect to the real home page + if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM || + crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) { + + // Obtain the repr url for this redirect via protocolstatus from the metadata + ProtocolStatus z = (ProtocolStatus)crawlDatum.getMetaData(). + get(Nutch.WRITABLE_PROTO_STATUS_KEY); + + // Get the protocol status' arguments + args = z.getArgs(); + + // ..and the possible redirect URL + reprUrl = args[0]; + + // Am i a redirect? + if (reprUrl != null) { + LOG.info("UpdateHostDb: homepage: " + keyStr + " redirects to: " + args[0]); + output.collect(host, new NutchWritable(hostDatum)); + hostDatum.setHomepageUrl(reprUrl); + } else { + LOG.info("UpdateHostDb: homepage: " + keyStr + + " redirects to: " + args[0] + " but has been filtered out"); + } + } else { + hostDatum.setHomepageUrl(homepage); + output.collect(host, new NutchWritable(hostDatum)); + LOG.info("UpdateHostDb: homepage: " + homepage); + } + } + } + + // Always emit crawl datum + output.collect(host, new NutchWritable(crawlDatum)); + } + + // Check if we got a record from the hostdb + if (key instanceof Text && value instanceof HostDatum) { + buffer = filterNormalize(keyStr); + + // Filtered out? + if (buffer == null) { + reporter.incrCounter("UpdateHostDb", "filtered_records", 1); + LOG.info("UpdateHostDb: " + key.toString() + " hostdatum has been filtered"); + return; + } + + // Get a HostDatum + hostDatum = (HostDatum)value; + key.set(buffer); + + // If we're also reading CrawlDb entries, reset db_* statistics because + // we're aggregating them from CrawlDB anyway + if (readingCrawlDb) { + hostDatum.resetStatistics(); + } + + output.collect(key, new NutchWritable(hostDatum)); + } + + // Check if we got a record with host scores + if (key instanceof Text && value instanceof Text) { + buffer = filterNormalize(keyStr); + + // Filtered out? + if (buffer == null) { + reporter.incrCounter("UpdateHostDb", "filtered_records", 1); + LOG.info("UpdateHostDb: " + key.toString() + " score has been filtered"); + return; + } + + key.set(buffer); + + output.collect(key, + new NutchWritable(new FloatWritable(Float.parseFloat(value.toString())))); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java new file mode 100644 index 0000000..33dd18b --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.hostdb; + +import java.io.IOException; +import java.util.Date; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.StringUtils; + +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.NutchWritable; + +import com.tdunning.math.stats.TDigest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * + */ +public class UpdateHostDbReducer + implements Reducer<Text, NutchWritable, Text, HostDatum> { + + public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDbReducer.class); + protected ResolverThread resolverThread = null; + protected Integer numResolverThreads = 10; + protected static Integer purgeFailedHostsThreshold = -1; + protected static Integer recheckInterval = 86400000; + protected static boolean checkFailed = false; + protected static boolean checkNew = false; + protected static boolean checkKnown = false; + protected static boolean force = false; + protected static long now = new Date().getTime(); + protected static String[] numericFields; + protected static String[] stringFields; + protected static int[] percentiles; + protected static Text[] numericFieldWritables; + protected static Text[] stringFieldWritables; + + protected BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>(); + protected ThreadPoolExecutor executor = null; + + /** + * Configures the thread pool and prestarts all resolver threads. + * + * @param JobConf + */ + public void configure(JobConf job) { + purgeFailedHostsThreshold = job.getInt(UpdateHostDb.HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD, -1); + numResolverThreads = job.getInt(UpdateHostDb.HOSTDB_NUM_RESOLVER_THREADS, 10); + recheckInterval = job.getInt(UpdateHostDb.HOSTDB_RECHECK_INTERVAL, 86400) * 1000; + checkFailed = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_FAILED, false); + checkNew = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_NEW, false); + checkKnown = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_KNOWN, false); + force = job.getBoolean(UpdateHostDb.HOSTDB_FORCE_CHECK, false); + numericFields = job.getStrings(UpdateHostDb.HOSTDB_NUMERIC_FIELDS); + stringFields = job.getStrings(UpdateHostDb.HOSTDB_STRING_FIELDS); + percentiles = job.getInts(UpdateHostDb.HOSTDB_PERCENTILES); + + // What fields do we need to collect metadata from + if (numericFields != null) { + numericFieldWritables = new Text[numericFields.length]; + for (int i = 0; i < numericFields.length; i++) { + numericFieldWritables[i] = new Text(numericFields[i]); + } + } + + if (stringFields != null) { + stringFieldWritables = new Text[stringFields.length]; + for (int i = 0; i < stringFields.length; i++) { + stringFieldWritables[i] = new Text(stringFields[i]); + } + } + + // Initialize the thread pool with our queue + executor = new ThreadPoolExecutor(numResolverThreads, numResolverThreads, + 5, TimeUnit.SECONDS, queue); + + // Run all threads in the pool + executor.prestartAllCoreThreads(); + } + + /** + * + */ + public void reduce(Text key, Iterator<NutchWritable> values, + OutputCollector<Text,HostDatum> output, Reporter reporter) throws IOException { + + Map<String,Map<String,Integer>> stringCounts = new HashMap<String,Map<String, Integer>>(); + Map<String,Float> maximums = new HashMap<String,Float>(); + Map<String,Float> sums = new HashMap<String,Float>(); // used to calc averages + Map<String,Integer> counts = new HashMap<String,Integer>(); // used to calc averages + Map<String,Float> minimums = new HashMap<String,Float>(); + Map<String,TDigest> tdigests = new HashMap<String,TDigest>(); + + HostDatum hostDatum = new HostDatum(); + float score = 0; + + if (stringFields != null) { + for (int i = 0; i < stringFields.length; i++) { + stringCounts.put(stringFields[i], new HashMap<String,Integer>()); + } + } + + // Loop through all values until we find a non-empty HostDatum or use + // an empty if this is a new host for the host db + while (values.hasNext()) { + Writable value = values.next().get(); + + // Count crawl datum status's and collect metadata from fields + if (value instanceof CrawlDatum) { + CrawlDatum buffer = (CrawlDatum)value; + + // Set the correct status field + switch (buffer.getStatus()) { + case CrawlDatum.STATUS_DB_UNFETCHED: + hostDatum.setUnfetched(hostDatum.getUnfetched() + 1); + break; + + case CrawlDatum.STATUS_DB_FETCHED: + hostDatum.setFetched(hostDatum.getFetched() + 1); + break; + + case CrawlDatum.STATUS_DB_GONE: + hostDatum.setGone(hostDatum.getGone() + 1); + break; + + case CrawlDatum.STATUS_DB_REDIR_TEMP: + hostDatum.setRedirTemp(hostDatum.getRedirTemp() + 1); + break; + + case CrawlDatum.STATUS_DB_REDIR_PERM: + hostDatum.setRedirPerm(hostDatum.getRedirPerm() + 1); + break; + + case CrawlDatum.STATUS_DB_NOTMODIFIED: + hostDatum.setNotModified(hostDatum.getNotModified() + 1); + break; + } + + // Record connection failures + if (buffer.getRetriesSinceFetch() != 0) { + hostDatum.incConnectionFailures(); + } + + // Only gather metadata statistics for proper fetched pages + if (buffer.getStatus() == CrawlDatum.STATUS_DB_FETCHED || buffer.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) { + // Deal with the string fields + if (stringFields != null) { + for (int i = 0; i < stringFields.length; i++) { + // Does this field exist? + if (buffer.getMetaData().get(stringFieldWritables[i]) != null) { + // Get it! + String metadataValue = null; + try { + metadataValue = buffer.getMetaData().get(stringFieldWritables[i]).toString(); + } catch (Exception e) { + LOG.error("Metadata field " + stringFields[i] + " is probably not a numeric value"); + } + + // Does the value exist? + if (stringCounts.get(stringFields[i]).containsKey(metadataValue)) { + // Yes, increment it + stringCounts.get(stringFields[i]).put(metadataValue, stringCounts.get(stringFields[i]).get(metadataValue) + 1); + } else { + // Create it! + stringCounts.get(stringFields[i]).put(metadataValue, 1); + } + } + } + } + + // Deal with the numeric fields + if (numericFields != null) { + for (int i = 0; i < numericFields.length; i++) { + // Does this field exist? + if (buffer.getMetaData().get(numericFieldWritables[i]) != null) { + try { + // Get it! + Float metadataValue = Float.parseFloat(buffer.getMetaData().get(numericFieldWritables[i]).toString()); + + // Does the median value exist? + if (tdigests.containsKey(numericFields[i])) { + tdigests.get(numericFields[i]).add(metadataValue); + } else { + // Create it! + TDigest tdigest = TDigest.createDigest(100); + tdigest.add((double)metadataValue); + tdigests.put(numericFields[i], tdigest); + } + + // Does the minimum value exist? + if (minimums.containsKey(numericFields[i])) { + // Write if this is lower than existing value + if (metadataValue < minimums.get(numericFields[i])) { + minimums.put(numericFields[i], metadataValue); + } + } else { + // Create it! + minimums.put(numericFields[i], metadataValue); + } + + // Does the maximum value exist? + if (maximums.containsKey(numericFields[i])) { + // Write if this is lower than existing value + if (metadataValue > maximums.get(numericFields[i])) { + maximums.put(numericFields[i], metadataValue); + } + } else { + // Create it! + maximums.put(numericFields[i], metadataValue); + } + + // Sum it up! + if (sums.containsKey(numericFields[i])) { + // Increment + sums.put(numericFields[i], sums.get(numericFields[i]) + metadataValue); + counts.put(numericFields[i], counts.get(numericFields[i]) + 1); + } else { + // Create it! + sums.put(numericFields[i], metadataValue); + counts.put(numericFields[i], 1); + } + } catch (Exception e) { + LOG.error(e.getMessage() + " when processing values for " + key.toString()); + } + } + } + } + } + } + + // + if (value instanceof HostDatum) { + HostDatum buffer = (HostDatum)value; + + // Check homepage URL + if (buffer.hasHomepageUrl()) { + hostDatum.setHomepageUrl(buffer.getHomepageUrl()); + } + + // Check lastCheck timestamp + if (!buffer.isEmpty()) { + hostDatum.setLastCheck(buffer.getLastCheck()); + } + + // Check and set DNS failures + if (buffer.getDnsFailures() > 0) { + hostDatum.setDnsFailures(buffer.getDnsFailures()); + } + + // Check and set connection failures + if (buffer.getConnectionFailures() > 0) { + hostDatum.setConnectionFailures(buffer.getConnectionFailures()); + } + + // Check metadata + if (!buffer.getMetaData().isEmpty()) { + hostDatum.setMetaData(buffer.getMetaData()); + } + + // Check and set score (score from Web Graph has precedence) + if (buffer.getScore() > 0) { + hostDatum.setScore(buffer.getScore()); + } + } + + // Check for the score + if (value instanceof FloatWritable) { + FloatWritable buffer = (FloatWritable)value; + score = buffer.get(); + } + } + + // Check if score was set from Web Graph + if (score > 0) { + hostDatum.setScore(score); + } + + // Set metadata + for (Map.Entry<String, Map<String,Integer>> entry : stringCounts.entrySet()) { + for (Map.Entry<String,Integer> subEntry : entry.getValue().entrySet()) { + hostDatum.getMetaData().put(new Text(entry.getKey() + "." + subEntry.getKey()), new IntWritable(subEntry.getValue())); + } + } + for (Map.Entry<String, Float> entry : maximums.entrySet()) { + hostDatum.getMetaData().put(new Text("max." + entry.getKey()), new FloatWritable(entry.getValue())); + } + for (Map.Entry<String, Float> entry : sums.entrySet()) { + hostDatum.getMetaData().put(new Text("avg." + entry.getKey()), new FloatWritable(entry.getValue() / counts.get(entry.getKey()))); + } + for (Map.Entry<String, TDigest> entry : tdigests.entrySet()) { + // Emit all percentiles + for (int i = 0; i < percentiles.length; i++) { + hostDatum.getMetaData().put(new Text("pct" + Integer.toString(percentiles[i]) + "." + entry.getKey()), new FloatWritable((float)entry.getValue().quantile(0.5))); + } + } + for (Map.Entry<String, Float> entry : minimums.entrySet()) { + hostDatum.getMetaData().put(new Text("min." + entry.getKey()), new FloatWritable(entry.getValue())); + } + + reporter.incrCounter("UpdateHostDb", "total_hosts", 1); + + // See if this record is to be checked + if (shouldCheck(hostDatum)) { + // Make an entry + resolverThread = new ResolverThread(key.toString(), hostDatum, output, reporter, purgeFailedHostsThreshold); + + // Add the entry to the queue (blocking) + try { + queue.put(resolverThread); + } catch (InterruptedException e) { + LOG.error("UpdateHostDb: " + StringUtils.stringifyException(e)); + } + + // Do not progress, the datum will be written in the resolver thread + return; + } else { + reporter.incrCounter("UpdateHostDb", "skipped_not_eligible", 1); + LOG.info("UpdateHostDb: " + key.toString() + ": skipped_not_eligible"); + } + + // Write the host datum if it wasn't written by the resolver thread + output.collect(key, hostDatum); + } + + /** + * Determines whether a record should be checked. + * + * @param HostDatum + * @return boolean + */ + protected boolean shouldCheck(HostDatum datum) { + // Whether a new record is to be checked + if (checkNew && datum.isEmpty()) { + return true; + } + + // Whether existing known hosts should be rechecked + if (checkKnown && !datum.isEmpty() && datum.getDnsFailures() == 0) { + return isEligibleForCheck(datum); + } + + // Whether failed records are forced to be rechecked + if (checkFailed && datum.getDnsFailures() > 0) { + return isEligibleForCheck(datum); + } + + // It seems this record is not to be checked + return false; + } + + /** + * Determines whether a record is eligible for recheck. + * + * @param HostDatum + * @return boolean + */ + protected boolean isEligibleForCheck(HostDatum datum) { + // Whether an existing host, known or unknown, if forced to be rechecked + if (force || datum.getLastCheck().getTime() + + (recheckInterval * datum.getDnsFailures() + 1) > now) { + return true; + } + + return false; + } + + /** + * Shut down all running threads and wait for completion. + */ + public void close() { + LOG.info("UpdateHostDb: feeder finished, waiting for shutdown"); + + // If we're here all keys have been fed and we can issue a shut down + executor.shutdown(); + + boolean finished = false; + + // Wait until all resolvers have finished + while (!finished) { + try { + // Wait for the executor to shut down completely + if (!executor.isTerminated()) { + LOG.info("UpdateHostDb: resolver threads waiting: " + Integer.toString(executor.getPoolSize())); + Thread.sleep(1000); + } else { + // All is well, get out + finished = true; + } + } catch (InterruptedException e) { + // Huh? + LOG.warn(StringUtils.stringifyException(e)); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/CleaningJob.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/CleaningJob.java b/nutch-core/src/main/java/org/apache/nutch/indexer/CleaningJob.java new file mode 100644 index 0000000..c16003a --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/indexer/CleaningJob.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.indexer; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.CrawlDb; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.TimingUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The class scans CrawlDB looking for entries with status DB_GONE (404) or + * DB_DUPLICATE and sends delete requests to indexers for those documents. + */ + +public class CleaningJob implements Tool { + public static final Logger LOG = LoggerFactory.getLogger(CleaningJob.class); + private Configuration conf; + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + public static class DBFilter implements + Mapper<Text, CrawlDatum, ByteWritable, Text> { + private ByteWritable OUT = new ByteWritable(CrawlDatum.STATUS_DB_GONE); + + @Override + public void configure(JobConf arg0) { + } + + @Override + public void close() throws IOException { + } + + @Override + public void map(Text key, CrawlDatum value, + OutputCollector<ByteWritable, Text> output, Reporter reporter) + throws IOException { + + if (value.getStatus() == CrawlDatum.STATUS_DB_GONE + || value.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) { + output.collect(OUT, key); + } + } + } + + public static class DeleterReducer implements + Reducer<ByteWritable, Text, Text, ByteWritable> { + private static final int NUM_MAX_DELETE_REQUEST = 1000; + private int numDeletes = 0; + private int totalDeleted = 0; + + private boolean noCommit = false; + + IndexWriters writers = null; + + @Override + public void configure(JobConf job) { + writers = new IndexWriters(job); + try { + writers.open(job, "Deletion"); + } catch (IOException e) { + throw new RuntimeException(e); + } + noCommit = job.getBoolean("noCommit", false); + } + + @Override + public void close() throws IOException { + // BUFFERING OF CALLS TO INDEXER SHOULD BE HANDLED AT INDEXER LEVEL + // if (numDeletes > 0) { + // LOG.info("CleaningJob: deleting " + numDeletes + " documents"); + // // TODO updateRequest.process(solr); + // totalDeleted += numDeletes; + // } + + writers.close(); + + if (totalDeleted > 0 && !noCommit) { + writers.commit(); + } + + LOG.info("CleaningJob: deleted a total of " + totalDeleted + " documents"); + } + + @Override + public void reduce(ByteWritable key, Iterator<Text> values, + OutputCollector<Text, ByteWritable> output, Reporter reporter) + throws IOException { + while (values.hasNext()) { + Text document = values.next(); + writers.delete(document.toString()); + totalDeleted++; + reporter.incrCounter("CleaningJobStatus", "Deleted documents", 1); + // if (numDeletes >= NUM_MAX_DELETE_REQUEST) { + // LOG.info("CleaningJob: deleting " + numDeletes + // + " documents"); + // // TODO updateRequest.process(solr); + // // TODO updateRequest = new UpdateRequest(); + // writers.delete(key.toString()); + // totalDeleted += numDeletes; + // numDeletes = 0; + // } + } + } + } + + public void delete(String crawldb, boolean noCommit) throws IOException { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("CleaningJob: starting at " + sdf.format(start)); + + JobConf job = new NutchJob(getConf()); + + FileInputFormat.addInputPath(job, new Path(crawldb, CrawlDb.CURRENT_NAME)); + job.setBoolean("noCommit", noCommit); + job.setInputFormat(SequenceFileInputFormat.class); + job.setOutputFormat(NullOutputFormat.class); + job.setMapOutputKeyClass(ByteWritable.class); + job.setMapOutputValueClass(Text.class); + job.setMapperClass(DBFilter.class); + job.setReducerClass(DeleterReducer.class); + + job.setJobName("CleaningJob"); + + // need to expicitely allow deletions + job.setBoolean(IndexerMapReduce.INDEXER_DELETE, true); + + JobClient.runJob(job); + + long end = System.currentTimeMillis(); + LOG.info("CleaningJob: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + } + + public int run(String[] args) throws IOException { + if (args.length < 1) { + String usage = "Usage: CleaningJob <crawldb> [-noCommit]"; + LOG.error("Missing crawldb. " + usage); + System.err.println(usage); + IndexWriters writers = new IndexWriters(getConf()); + System.err.println(writers.describe()); + return 1; + } + + boolean noCommit = false; + if (args.length == 2 && args[1].equals("-noCommit")) { + noCommit = true; + } + + try { + delete(args[0], noCommit); + } catch (final Exception e) { + LOG.error("CleaningJob: " + StringUtils.stringifyException(e)); + System.err.println("ERROR CleaningJob: " + + StringUtils.stringifyException(e)); + return -1; + } + return 0; + } + + public static void main(String[] args) throws Exception { + int result = ToolRunner.run(NutchConfiguration.create(), new CleaningJob(), + args); + System.exit(result); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriter.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriter.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriter.java new file mode 100644 index 0000000..fbbf2e8 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.indexer; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.plugin.Pluggable; + +public interface IndexWriter extends Pluggable, Configurable { + /** The name of the extension point. */ + final static String X_POINT_ID = IndexWriter.class.getName(); + + public void open(JobConf job, String name) throws IOException; + + public void write(NutchDocument doc) throws IOException; + + public void delete(String key) throws IOException; + + public void update(NutchDocument doc) throws IOException; + + public void commit() throws IOException; + + public void close() throws IOException; + + /** + * Returns a String describing the IndexWriter instance and the specific + * parameters it can take + */ + public String describe(); +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriters.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriters.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriters.java new file mode 100644 index 0000000..681812b --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriters.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.indexer; + +import java.io.IOException; +import java.util.HashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.plugin.Extension; +import org.apache.nutch.plugin.ExtensionPoint; +import org.apache.nutch.plugin.PluginRepository; +import org.apache.nutch.plugin.PluginRuntimeException; +import org.apache.nutch.util.ObjectCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Creates and caches {@link IndexWriter} implementing plugins. */ +public class IndexWriters { + + public final static Logger LOG = LoggerFactory.getLogger(IndexWriters.class); + + private IndexWriter[] indexWriters; + + public IndexWriters(Configuration conf) { + ObjectCache objectCache = ObjectCache.get(conf); + synchronized (objectCache) { + this.indexWriters = (IndexWriter[]) objectCache + .getObject(IndexWriter.class.getName()); + if (this.indexWriters == null) { + try { + ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint( + IndexWriter.X_POINT_ID); + if (point == null) + throw new RuntimeException(IndexWriter.X_POINT_ID + " not found."); + Extension[] extensions = point.getExtensions(); + HashMap<String, IndexWriter> indexerMap = new HashMap<String, IndexWriter>(); + for (int i = 0; i < extensions.length; i++) { + Extension extension = extensions[i]; + IndexWriter writer = (IndexWriter) extension.getExtensionInstance(); + LOG.info("Adding " + writer.getClass().getName()); + if (!indexerMap.containsKey(writer.getClass().getName())) { + indexerMap.put(writer.getClass().getName(), writer); + } + } + objectCache.setObject(IndexWriter.class.getName(), indexerMap + .values().toArray(new IndexWriter[0])); + } catch (PluginRuntimeException e) { + throw new RuntimeException(e); + } + this.indexWriters = (IndexWriter[]) objectCache + .getObject(IndexWriter.class.getName()); + } + } + } + + public void open(JobConf job, String name) throws IOException { + for (int i = 0; i < this.indexWriters.length; i++) { + try { + this.indexWriters[i].open(job, name); + } catch (IOException ioe) { + throw ioe; + } + } + } + + public void write(NutchDocument doc) throws IOException { + for (int i = 0; i < this.indexWriters.length; i++) { + try { + this.indexWriters[i].write(doc); + } catch (IOException ioe) { + throw ioe; + } + } + } + + public void update(NutchDocument doc) throws IOException { + for (int i = 0; i < this.indexWriters.length; i++) { + try { + this.indexWriters[i].update(doc); + } catch (IOException ioe) { + throw ioe; + } + } + } + + public void delete(String key) throws IOException { + for (int i = 0; i < this.indexWriters.length; i++) { + try { + this.indexWriters[i].delete(key); + } catch (IOException ioe) { + throw ioe; + } + } + } + + public void close() throws IOException { + for (int i = 0; i < this.indexWriters.length; i++) { + try { + this.indexWriters[i].close(); + } catch (IOException ioe) { + throw ioe; + } + } + } + + public void commit() throws IOException { + for (int i = 0; i < this.indexWriters.length; i++) { + try { + this.indexWriters[i].commit(); + } catch (IOException ioe) { + throw ioe; + } + } + } + + // lists the active IndexWriters and their configuration + public String describe() throws IOException { + StringBuffer buffer = new StringBuffer(); + if (this.indexWriters.length == 0) + buffer.append("No IndexWriters activated - check your configuration\n"); + else + buffer.append("Active IndexWriters :\n"); + for (int i = 0; i < this.indexWriters.length; i++) { + buffer.append(this.indexWriters[i].describe()).append("\n"); + } + return buffer.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerMapReduce.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerMapReduce.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerMapReduce.java new file mode 100644 index 0000000..5025525 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerMapReduce.java @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.indexer; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.codec.binary.StringUtils; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.CrawlDb; +import org.apache.nutch.crawl.Inlinks; +import org.apache.nutch.crawl.LinkDb; +import org.apache.nutch.crawl.NutchWritable; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.net.URLNormalizers; +import org.apache.nutch.parse.Parse; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.parse.ParseImpl; +import org.apache.nutch.parse.ParseText; +import org.apache.nutch.protocol.Content; +import org.apache.nutch.scoring.ScoringFilterException; +import org.apache.nutch.scoring.ScoringFilters; + +public class IndexerMapReduce extends Configured implements + Mapper<Text, Writable, Text, NutchWritable>, + Reducer<Text, NutchWritable, Text, NutchIndexAction> { + + public static final Logger LOG = LoggerFactory + .getLogger(IndexerMapReduce.class); + + public static final String INDEXER_PARAMS = "indexer.additional.params"; + public static final String INDEXER_DELETE = "indexer.delete"; + public static final String INDEXER_DELETE_ROBOTS_NOINDEX = "indexer.delete.robots.noindex"; + public static final String INDEXER_DELETE_SKIPPED = "indexer.delete.skipped.by.indexingfilter"; + public static final String INDEXER_SKIP_NOTMODIFIED = "indexer.skip.notmodified"; + public static final String URL_FILTERING = "indexer.url.filters"; + public static final String URL_NORMALIZING = "indexer.url.normalizers"; + public static final String INDEXER_BINARY_AS_BASE64 = "indexer.binary.base64"; + + private boolean skip = false; + private boolean delete = false; + private boolean deleteRobotsNoIndex = false; + private boolean deleteSkippedByIndexingFilter = false; + private boolean base64 = false; + private IndexingFilters filters; + private ScoringFilters scfilters; + + // using normalizers and/or filters + private boolean normalize = false; + private boolean filter = false; + + // url normalizers, filters and job configuration + private URLNormalizers urlNormalizers; + private URLFilters urlFilters; + + /** Predefined action to delete documents from the index */ + private static final NutchIndexAction DELETE_ACTION = new NutchIndexAction( + null, NutchIndexAction.DELETE); + + public void configure(JobConf job) { + setConf(job); + this.filters = new IndexingFilters(getConf()); + this.scfilters = new ScoringFilters(getConf()); + this.delete = job.getBoolean(INDEXER_DELETE, false); + this.deleteRobotsNoIndex = job.getBoolean(INDEXER_DELETE_ROBOTS_NOINDEX, + false); + this.deleteSkippedByIndexingFilter = job.getBoolean(INDEXER_DELETE_SKIPPED, + false); + this.skip = job.getBoolean(INDEXER_SKIP_NOTMODIFIED, false); + this.base64 = job.getBoolean(INDEXER_BINARY_AS_BASE64, false); + + normalize = job.getBoolean(URL_NORMALIZING, false); + filter = job.getBoolean(URL_FILTERING, false); + + if (normalize) { + urlNormalizers = new URLNormalizers(getConf(), + URLNormalizers.SCOPE_INDEXER); + } + + if (filter) { + urlFilters = new URLFilters(getConf()); + } + } + + /** + * Normalizes and trims extra whitespace from the given url. + * + * @param url + * The url to normalize. + * + * @return The normalized url. + */ + private String normalizeUrl(String url) { + if (!normalize) { + return url; + } + + String normalized = null; + if (urlNormalizers != null) { + try { + + // normalize and trim the url + normalized = urlNormalizers + .normalize(url, URLNormalizers.SCOPE_INDEXER); + normalized = normalized.trim(); + } catch (Exception e) { + LOG.warn("Skipping " + url + ":" + e); + normalized = null; + } + } + + return normalized; + } + + /** + * Filters the given url. + * + * @param url + * The url to filter. + * + * @return The filtered url or null. + */ + private String filterUrl(String url) { + if (!filter) { + return url; + } + + try { + url = urlFilters.filter(url); + } catch (Exception e) { + url = null; + } + + return url; + } + + public void map(Text key, Writable value, + OutputCollector<Text, NutchWritable> output, Reporter reporter) + throws IOException { + + String urlString = filterUrl(normalizeUrl(key.toString())); + if (urlString == null) { + return; + } else { + key.set(urlString); + } + + output.collect(key, new NutchWritable(value)); + } + + public void reduce(Text key, Iterator<NutchWritable> values, + OutputCollector<Text, NutchIndexAction> output, Reporter reporter) + throws IOException { + Inlinks inlinks = null; + CrawlDatum dbDatum = null; + CrawlDatum fetchDatum = null; + Content content = null; + ParseData parseData = null; + ParseText parseText = null; + + while (values.hasNext()) { + final Writable value = values.next().get(); // unwrap + if (value instanceof Inlinks) { + inlinks = (Inlinks) value; + } else if (value instanceof CrawlDatum) { + final CrawlDatum datum = (CrawlDatum) value; + if (CrawlDatum.hasDbStatus(datum)) { + dbDatum = datum; + } else if (CrawlDatum.hasFetchStatus(datum)) { + // don't index unmodified (empty) pages + if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) { + fetchDatum = datum; + } + } else if (CrawlDatum.STATUS_LINKED == datum.getStatus() + || CrawlDatum.STATUS_SIGNATURE == datum.getStatus() + || CrawlDatum.STATUS_PARSE_META == datum.getStatus()) { + continue; + } else { + throw new RuntimeException("Unexpected status: " + datum.getStatus()); + } + } else if (value instanceof ParseData) { + parseData = (ParseData) value; + + // Handle robots meta? https://issues.apache.org/jira/browse/NUTCH-1434 + if (deleteRobotsNoIndex) { + // Get the robots meta data + String robotsMeta = parseData.getMeta("robots"); + + // Has it a noindex for this url? + if (robotsMeta != null + && robotsMeta.toLowerCase().indexOf("noindex") != -1) { + // Delete it! + output.collect(key, DELETE_ACTION); + reporter.incrCounter("IndexerStatus", "deleted (robots=noindex)", 1); + return; + } + } + } else if (value instanceof ParseText) { + parseText = (ParseText) value; + } else if (value instanceof Content) { + content = (Content)value; + } else if (LOG.isWarnEnabled()) { + LOG.warn("Unrecognized type: " + value.getClass()); + } + } + + // Whether to delete GONE or REDIRECTS + if (delete && fetchDatum != null && dbDatum != null) { + if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_GONE + || dbDatum.getStatus() == CrawlDatum.STATUS_DB_GONE) { + reporter.incrCounter("IndexerStatus", "deleted (gone)", 1); + output.collect(key, DELETE_ACTION); + return; + } + + if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM + || fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP + || dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM + || dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) { + reporter.incrCounter("IndexerStatus", "deleted (redirects)", 1); + output.collect(key, DELETE_ACTION); + return; + } + } + + if (fetchDatum == null || dbDatum == null || parseText == null + || parseData == null) { + return; // only have inlinks + } + + // Whether to delete pages marked as duplicates + if (delete && dbDatum.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) { + reporter.incrCounter("IndexerStatus", "deleted (duplicates)", 1); + output.collect(key, DELETE_ACTION); + return; + } + + // Whether to skip DB_NOTMODIFIED pages + if (skip && dbDatum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) { + reporter.incrCounter("IndexerStatus", "skipped (not modified)", 1); + return; + } + + if (!parseData.getStatus().isSuccess() + || fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) { + return; + } + + NutchDocument doc = new NutchDocument(); + doc.add("id", key.toString()); + + final Metadata metadata = parseData.getContentMeta(); + + // add segment, used to map from merged index back to segment files + doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY)); + + // add digest, used by dedup + doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY)); + + final Parse parse = new ParseImpl(parseText, parseData); + float boost = 1.0f; + // run scoring filters + try { + boost = this.scfilters.indexerScore(key, doc, dbDatum, fetchDatum, parse, + inlinks, boost); + } catch (final ScoringFilterException e) { + reporter.incrCounter("IndexerStatus", "errors (ScoringFilter)", 1); + if (LOG.isWarnEnabled()) { + LOG.warn("Error calculating score {}: {}", key, e); + } + return; + } + // apply boost to all indexed fields. + doc.setWeight(boost); + // store boost for use by explain and dedup + doc.add("boost", Float.toString(boost)); + + try { + // Indexing filters may also be interested in the signature + fetchDatum.setSignature(dbDatum.getSignature()); + + // extract information from dbDatum and pass it to + // fetchDatum so that indexing filters can use it + final Text url = (Text) dbDatum.getMetaData().get( + Nutch.WRITABLE_REPR_URL_KEY); + if (url != null) { + // Representation URL also needs normalization and filtering. + // If repr URL is excluded by filters we still accept this document + // but represented by its primary URL ("key") which has passed URL + // filters. + String urlString = filterUrl(normalizeUrl(url.toString())); + if (urlString != null) { + url.set(urlString); + fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url); + } + } + // run indexing filters + doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks); + } catch (final IndexingException e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Error indexing " + key + ": " + e); + } + reporter.incrCounter("IndexerStatus", "errors (IndexingFilter)", 1); + return; + } + + // skip documents discarded by indexing filters + if (doc == null) { + // https://issues.apache.org/jira/browse/NUTCH-1449 + if (deleteSkippedByIndexingFilter) { + NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE); + output.collect(key, action); + reporter.incrCounter("IndexerStatus", "deleted (IndexingFilter)", 1); + } else { + reporter.incrCounter("IndexerStatus", "skipped (IndexingFilter)", 1); + } + return; + } + + if (content != null) { + // Add the original binary content + String binary; + if (base64) { + // optionally encode as base64 + binary = Base64.encodeBase64String(content.getContent()); + } else { + binary = new String(content.getContent()); + } + doc.add("binaryContent", binary); + } + + reporter.incrCounter("IndexerStatus", "indexed (add/update)", 1); + + NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD); + output.collect(key, action); + } + + public void close() throws IOException { + } + + public static void initMRJob(Path crawlDb, Path linkDb, + Collection<Path> segments, JobConf job, boolean addBinaryContent) { + + LOG.info("IndexerMapReduce: crawldb: {}", crawlDb); + + if (linkDb != null) + LOG.info("IndexerMapReduce: linkdb: {}", linkDb); + + for (final Path segment : segments) { + LOG.info("IndexerMapReduces: adding segment: {}", segment); + FileInputFormat.addInputPath(job, new Path(segment, + CrawlDatum.FETCH_DIR_NAME)); + FileInputFormat.addInputPath(job, new Path(segment, + CrawlDatum.PARSE_DIR_NAME)); + FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME)); + FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME)); + + if (addBinaryContent) { + FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME)); + } + } + + FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME)); + + if (linkDb != null) { + Path currentLinkDb = new Path(linkDb, LinkDb.CURRENT_NAME); + try { + if (FileSystem.get(job).exists(currentLinkDb)) { + FileInputFormat.addInputPath(job, currentLinkDb); + } else { + LOG.warn("Ignoring linkDb for indexing, no linkDb found in path: {}", + linkDb); + } + } catch (IOException e) { + LOG.warn("Failed to use linkDb ({}) for indexing: {}", linkDb, + org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + } + + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(IndexerMapReduce.class); + job.setReducerClass(IndexerMapReduce.class); + + job.setOutputFormat(IndexerOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setMapOutputValueClass(NutchWritable.class); + job.setOutputValueClass(NutchWritable.class); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerOutputFormat.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerOutputFormat.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerOutputFormat.java new file mode 100644 index 0000000..baa9ce6 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerOutputFormat.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.indexer; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; + +public class IndexerOutputFormat extends + FileOutputFormat<Text, NutchIndexAction> { + + @Override + public RecordWriter<Text, NutchIndexAction> getRecordWriter( + FileSystem ignored, JobConf job, String name, Progressable progress) + throws IOException { + + final IndexWriters writers = new IndexWriters(job); + + writers.open(job, name); + + return new RecordWriter<Text, NutchIndexAction>() { + + public void close(Reporter reporter) throws IOException { + writers.close(); + } + + public void write(Text key, NutchIndexAction indexAction) + throws IOException { + if (indexAction.action == NutchIndexAction.ADD) { + writers.write(indexAction.doc); + } else if (indexAction.action == NutchIndexAction.DELETE) { + writers.delete(key.toString()); + } + } + }; + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingException.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingException.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingException.java new file mode 100644 index 0000000..adfefeb --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.indexer; + +@SuppressWarnings("serial") +public class IndexingException extends Exception { + + public IndexingException() { + super(); + } + + public IndexingException(String message) { + super(message); + } + + public IndexingException(String message, Throwable cause) { + super(message, cause); + } + + public IndexingException(Throwable cause) { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilter.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilter.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilter.java new file mode 100644 index 0000000..f22a0e5 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilter.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.indexer; + +// Hadoop imports +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.io.Text; + +// Nutch imports +import org.apache.nutch.parse.Parse; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.Inlinks; +import org.apache.nutch.plugin.Pluggable; + +/** + * Extension point for indexing. Permits one to add metadata to the indexed + * fields. All plugins found which implement this extension point are run + * sequentially on the parse. + */ +public interface IndexingFilter extends Pluggable, Configurable { + /** The name of the extension point. */ + final static String X_POINT_ID = IndexingFilter.class.getName(); + + /** + * Adds fields or otherwise modifies the document that will be indexed for a + * parse. Unwanted documents can be removed from indexing by returning a null + * value. + * + * @param doc + * document instance for collecting fields + * @param parse + * parse data instance + * @param url + * page url + * @param datum + * crawl datum for the page (fetch datum from segment containing + * fetch status and fetch time) + * @param inlinks + * page inlinks + * @return modified (or a new) document instance, or null (meaning the + * document should be discarded) + * @throws IndexingException + */ + NutchDocument filter(NutchDocument doc, Parse parse, Text url, + CrawlDatum datum, Inlinks inlinks) throws IndexingException; +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilters.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilters.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilters.java new file mode 100644 index 0000000..334fcad --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilters.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.indexer; + +// Commons Logging imports +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.nutch.plugin.PluginRepository; +import org.apache.nutch.parse.Parse; +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.Inlinks; +import org.apache.hadoop.io.Text; + +/** Creates and caches {@link IndexingFilter} implementing plugins. */ +public class IndexingFilters { + + public static final String INDEXINGFILTER_ORDER = "indexingfilter.order"; + + public final static Logger LOG = LoggerFactory + .getLogger(IndexingFilters.class); + + private IndexingFilter[] indexingFilters; + + public IndexingFilters(Configuration conf) { + indexingFilters = (IndexingFilter[]) PluginRepository.get(conf) + .getOrderedPlugins(IndexingFilter.class, IndexingFilter.X_POINT_ID, + INDEXINGFILTER_ORDER); + } + + /** Run all defined filters. */ + public NutchDocument filter(NutchDocument doc, Parse parse, Text url, + CrawlDatum datum, Inlinks inlinks) throws IndexingException { + for (int i = 0; i < this.indexingFilters.length; i++) { + doc = this.indexingFilters[i].filter(doc, parse, url, datum, inlinks); + // break the loop if an indexing filter discards the doc + if (doc == null) + return null; + } + + return doc; + } + +}
