http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/FetchSchedule.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/FetchSchedule.java b/nutch-core/src/main/java/org/apache/nutch/crawl/FetchSchedule.java new file mode 100755 index 0000000..10ee185 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/FetchSchedule.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.io.Text; + +/** + * This interface defines the contract for implementations that manipulate fetch + * times and re-fetch intervals. + * + * @author Andrzej Bialecki + */ +public interface FetchSchedule extends Configurable { + + /** It is unknown whether page was changed since our last visit. */ + public static final int STATUS_UNKNOWN = 0; + /** Page is known to have been modified since our last visit. */ + public static final int STATUS_MODIFIED = 1; + /** Page is known to remain unmodified since our last visit. */ + public static final int STATUS_NOTMODIFIED = 2; + + public static final int SECONDS_PER_DAY = 3600 * 24; + + /** + * Initialize fetch schedule related data. Implementations should at least set + * the <code>fetchTime</code> and <code>fetchInterval</code>. The default + * implementation set the <code>fetchTime</code> to now, using the default + * <code>fetchInterval</code>. + * + * @param url + * URL of the page. + * + * @param datum + * datum instance to be initialized. + * + * @return adjusted page information, including all original information. + * NOTE: this may be a different instance than @see CrawlDatum, but + * implementations should make sure that it contains at least all + * information from @see CrawlDatum. + */ + public CrawlDatum initializeSchedule(Text url, CrawlDatum datum); + + /** + * Sets the <code>fetchInterval</code> and <code>fetchTime</code> on a + * successfully fetched page. Implementations may use supplied arguments to + * support different re-fetching schedules. + * + * @param url + * url of the page + * + * @param datum + * page description to be adjusted. NOTE: this instance, passed by + * reference, may be modified inside the method. + * + * @param prevFetchTime + * previous value of fetch time, or 0 if not available. + * + * @param prevModifiedTime + * previous value of modifiedTime, or 0 if not available. + * + * @param fetchTime + * the latest time, when the page was recently re-fetched. Most + * FetchSchedule implementations should update the value in @see + * CrawlDatum to something greater than this value. + * + * @param modifiedTime + * last time the content was modified. This information comes from + * the protocol implementations, or is set to < 0 if not available. + * Most FetchSchedule implementations should update the value in @see + * CrawlDatum to this value. + * + * @param state + * if {@link #STATUS_MODIFIED}, then the content is considered to be + * "changed" before the <code>fetchTime</code>, if + * {@link #STATUS_NOTMODIFIED} then the content is known to be + * unchanged. This information may be obtained by comparing page + * signatures before and after fetching. If this is set to + * {@link #STATUS_UNKNOWN}, then it is unknown whether the page was + * changed; implementations are free to follow a sensible default + * behavior. + * + * @return adjusted page information, including all original information. + * NOTE: this may be a different instance than @see CrawlDatum, but + * implementations should make sure that it contains at least all + * information from @see CrawlDatum}. + */ + public CrawlDatum setFetchSchedule(Text url, CrawlDatum datum, + long prevFetchTime, long prevModifiedTime, long fetchTime, + long modifiedTime, int state); + + /** + * This method specifies how to schedule refetching of pages marked as GONE. + * Default implementation increases fetchInterval by 50%, and if it exceeds + * the <code>maxInterval</code> it calls + * {@link #forceRefetch(Text, CrawlDatum, boolean)}. + * + * @param url + * URL of the page + * + * @param datum + * datum instance to be adjusted. + * + * @return adjusted page information, including all original information. + * NOTE: this may be a different instance than @see CrawlDatum, but + * implementations should make sure that it contains at least all + * information from @see CrawlDatum. + */ + public CrawlDatum setPageGoneSchedule(Text url, CrawlDatum datum, + long prevFetchTime, long prevModifiedTime, long fetchTime); + + /** + * This method adjusts the fetch schedule if fetching needs to be re-tried due + * to transient errors. The default implementation sets the next fetch time 1 + * day in the future and increases the retry counter. + * + * @param url + * URL of the page. + * + * @param datum + * page information. + * + * @param prevFetchTime + * previous fetch time. + * + * @param prevModifiedTime + * previous modified time. + * + * @param fetchTime + * current fetch time. + * + * @return adjusted page information, including all original information. + * NOTE: this may be a different instance than @see CrawlDatum, but + * implementations should make sure that it contains at least all + * information from @see CrawlDatum. + */ + public CrawlDatum setPageRetrySchedule(Text url, CrawlDatum datum, + long prevFetchTime, long prevModifiedTime, long fetchTime); + + /** + * Calculates last fetch time of the given CrawlDatum. + * + * @return the date as a long. + */ + public long calculateLastFetchTime(CrawlDatum datum); + + /** + * This method provides information whether the page is suitable for selection + * in the current fetchlist. NOTE: a true return value does not guarantee that + * the page will be fetched, it just allows it to be included in the further + * selection process based on scores. The default implementation checks + * <code>fetchTime</code>, if it is higher than the curTime it returns false, + * and true otherwise. It will also check that fetchTime is not too remote + * (more than <code>maxInterval</code), in which case it lowers the interval + * and returns true. + * + * @param url + * URL of the page. + * + * @param datum + * datum instance. + * + * @param curTime + * reference time (usually set to the time when the fetchlist + * generation process was started). + * + * @return true, if the page should be considered for inclusion in the current + * fetchlist, otherwise false. + */ + public boolean shouldFetch(Text url, CrawlDatum datum, long curTime); + + /** + * This method resets fetchTime, fetchInterval, modifiedTime and page + * signature, so that it forces refetching. + * + * @param url + * URL of the page. + * + * @param datum + * datum instance. + * + * @param asap + * if true, force refetch as soon as possible - this sets the + * fetchTime to now. If false, force refetch whenever the next fetch + * time is set. + * + * @return adjusted page information, including all original information. + * NOTE: this may be a different instance than @see CrawlDatum, but + * implementations should make sure that it contains at least all + * information from @see CrawlDatum. + */ + public CrawlDatum forceRefetch(Text url, CrawlDatum datum, boolean asap); +}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/FetchScheduleFactory.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/FetchScheduleFactory.java b/nutch-core/src/main/java/org/apache/nutch/crawl/FetchScheduleFactory.java new file mode 100755 index 0000000..7a84524 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/FetchScheduleFactory.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.util.ObjectCache; + +/** Creates and caches a {@link FetchSchedule} implementation. */ +public class FetchScheduleFactory { + + public static final Logger LOG = LoggerFactory + .getLogger(FetchScheduleFactory.class); + + private FetchScheduleFactory() { + } // no public ctor + + /** Return the FetchSchedule implementation. */ + public synchronized static FetchSchedule getFetchSchedule(Configuration conf) { + String clazz = conf.get("db.fetch.schedule.class", + DefaultFetchSchedule.class.getName()); + ObjectCache objectCache = ObjectCache.get(conf); + FetchSchedule impl = (FetchSchedule) objectCache.getObject(clazz); + if (impl == null) { + try { + LOG.info("Using FetchSchedule impl: " + clazz); + Class<?> implClass = Class.forName(clazz); + impl = (FetchSchedule) implClass.newInstance(); + impl.setConf(conf); + objectCache.setObject(clazz, impl); + } catch (Exception e) { + throw new RuntimeException("Couldn't create " + clazz, e); + } + } + return impl; + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/Generator.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/Generator.java b/nutch-core/src/main/java/org/apache/nutch/crawl/Generator.java new file mode 100644 index 0000000..9a82089 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/Generator.java @@ -0,0 +1,859 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.text.*; + +// rLogging imports +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.jexl2.Expression; +import org.apache.hadoop.io.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat; +import org.apache.hadoop.util.*; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.net.URLFilterException; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.net.URLNormalizers; +import org.apache.nutch.scoring.ScoringFilterException; +import org.apache.nutch.scoring.ScoringFilters; +import org.apache.nutch.util.JexlUtil; +import org.apache.nutch.util.LockUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.NutchTool; +import org.apache.nutch.util.TimingUtil; +import org.apache.nutch.util.URLUtil; + + +/** + * Generates a subset of a crawl db to fetch. This version allows to generate + * fetchlists for several segments in one go. Unlike in the initial version + * (OldGenerator), the IP resolution is done ONLY on the entries which have been + * selected for fetching. The URLs are partitioned by IP, domain or host within + * a segment. We can chose separately how to count the URLS i.e. by domain or + * host to limit the entries. + **/ +public class Generator extends NutchTool implements Tool { + + public static final Logger LOG = LoggerFactory.getLogger(Generator.class); + + public static final String GENERATE_UPDATE_CRAWLDB = "generate.update.crawldb"; + public static final String GENERATOR_MIN_SCORE = "generate.min.score"; + public static final String GENERATOR_MIN_INTERVAL = "generate.min.interval"; + public static final String GENERATOR_RESTRICT_STATUS = "generate.restrict.status"; + public static final String GENERATOR_FILTER = "generate.filter"; + public static final String GENERATOR_NORMALISE = "generate.normalise"; + public static final String GENERATOR_MAX_COUNT = "generate.max.count"; + public static final String GENERATOR_COUNT_MODE = "generate.count.mode"; + public static final String GENERATOR_COUNT_VALUE_DOMAIN = "domain"; + public static final String GENERATOR_COUNT_VALUE_HOST = "host"; + public static final String GENERATOR_TOP_N = "generate.topN"; + public static final String GENERATOR_CUR_TIME = "generate.curTime"; + public static final String GENERATOR_DELAY = "crawl.gen.delay"; + public static final String GENERATOR_MAX_NUM_SEGMENTS = "generate.max.num.segments"; + public static final String GENERATOR_EXPR = "generate.expr"; + + public static class SelectorEntry implements Writable { + public Text url; + public CrawlDatum datum; + public IntWritable segnum; + + public SelectorEntry() { + url = new Text(); + datum = new CrawlDatum(); + segnum = new IntWritable(0); + } + + public void readFields(DataInput in) throws IOException { + url.readFields(in); + datum.readFields(in); + segnum.readFields(in); + } + + public void write(DataOutput out) throws IOException { + url.write(out); + datum.write(out); + segnum.write(out); + } + + public String toString() { + return "url=" + url.toString() + ", datum=" + datum.toString() + + ", segnum=" + segnum.toString(); + } + } + + /** Selects entries due for fetch. */ + public static class Selector implements + Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry>, + Partitioner<FloatWritable, Writable>, + Reducer<FloatWritable, SelectorEntry, FloatWritable, SelectorEntry> { + private LongWritable genTime = new LongWritable(System.currentTimeMillis()); + private long curTime; + private long limit; + private long count; + private HashMap<String, int[]> hostCounts = new HashMap<String, int[]>(); + private int segCounts[]; + private int maxCount; + private boolean byDomain = false; + private Partitioner<Text, Writable> partitioner = new URLPartitioner(); + private URLFilters filters; + private URLNormalizers normalizers; + private ScoringFilters scfilters; + private SelectorEntry entry = new SelectorEntry(); + private FloatWritable sortValue = new FloatWritable(); + private boolean filter; + private boolean normalise; + private long genDelay; + private FetchSchedule schedule; + private float scoreThreshold = 0f; + private int intervalThreshold = -1; + private String restrictStatus = null; + private int maxNumSegments = 1; + private Expression expr = null; + private int currentsegmentnum = 1; + + public void configure(JobConf job) { + curTime = job.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis()); + limit = job.getLong(GENERATOR_TOP_N, Long.MAX_VALUE) + / job.getNumReduceTasks(); + maxCount = job.getInt(GENERATOR_MAX_COUNT, -1); + if (maxCount == -1) { + byDomain = false; + } + if (GENERATOR_COUNT_VALUE_DOMAIN.equals(job.get(GENERATOR_COUNT_MODE))) + byDomain = true; + filters = new URLFilters(job); + normalise = job.getBoolean(GENERATOR_NORMALISE, true); + if (normalise) + normalizers = new URLNormalizers(job, + URLNormalizers.SCOPE_GENERATE_HOST_COUNT); + scfilters = new ScoringFilters(job); + partitioner.configure(job); + filter = job.getBoolean(GENERATOR_FILTER, true); + genDelay = job.getLong(GENERATOR_DELAY, 7L) * 3600L * 24L * 1000L; + long time = job.getLong(Nutch.GENERATE_TIME_KEY, 0L); + if (time > 0) + genTime.set(time); + schedule = FetchScheduleFactory.getFetchSchedule(job); + scoreThreshold = job.getFloat(GENERATOR_MIN_SCORE, Float.NaN); + intervalThreshold = job.getInt(GENERATOR_MIN_INTERVAL, -1); + restrictStatus = job.get(GENERATOR_RESTRICT_STATUS, null); + expr = JexlUtil.parseExpression(job.get(GENERATOR_EXPR, null)); + maxNumSegments = job.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1); + segCounts = new int[maxNumSegments]; + } + + public void close() { + } + + /** Select & invert subset due for fetch. */ + public void map(Text key, CrawlDatum value, + OutputCollector<FloatWritable, SelectorEntry> output, Reporter reporter) + throws IOException { + Text url = key; + if (filter) { + // If filtering is on don't generate URLs that don't pass + // URLFilters + try { + if (filters.filter(url.toString()) == null) + return; + } catch (URLFilterException e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + + ")"); + } + } + } + CrawlDatum crawlDatum = value; + + // check fetch schedule + if (!schedule.shouldFetch(url, crawlDatum, curTime)) { + LOG.debug("-shouldFetch rejected '" + url + "', fetchTime=" + + crawlDatum.getFetchTime() + ", curTime=" + curTime); + return; + } + + LongWritable oldGenTime = (LongWritable) crawlDatum.getMetaData().get( + Nutch.WRITABLE_GENERATE_TIME_KEY); + if (oldGenTime != null) { // awaiting fetch & update + if (oldGenTime.get() + genDelay > curTime) // still wait for + // update + return; + } + float sort = 1.0f; + try { + sort = scfilters.generatorSortValue(key, crawlDatum, sort); + } catch (ScoringFilterException sfe) { + if (LOG.isWarnEnabled()) { + LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + sfe); + } + } + + // check expr + if (expr != null) { + if (!crawlDatum.evaluate(expr)) { + return; + } + } + + if (restrictStatus != null + && !restrictStatus.equalsIgnoreCase(CrawlDatum + .getStatusName(crawlDatum.getStatus()))) + return; + + // consider only entries with a score superior to the threshold + if (scoreThreshold != Float.NaN && sort < scoreThreshold) + return; + + // consider only entries with a retry (or fetch) interval lower than + // threshold + if (intervalThreshold != -1 + && crawlDatum.getFetchInterval() > intervalThreshold) + return; + + // sort by decreasing score, using DecreasingFloatComparator + sortValue.set(sort); + // record generation time + crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime); + entry.datum = crawlDatum; + entry.url = key; + output.collect(sortValue, entry); // invert for sort by score + } + + /** Partition by host / domain or IP. */ + public int getPartition(FloatWritable key, Writable value, + int numReduceTasks) { + return partitioner.getPartition(((SelectorEntry) value).url, key, + numReduceTasks); + } + + /** Collect until limit is reached. */ + public void reduce(FloatWritable key, Iterator<SelectorEntry> values, + OutputCollector<FloatWritable, SelectorEntry> output, Reporter reporter) + throws IOException { + + while (values.hasNext()) { + + if (count == limit) { + // do we have any segments left? + if (currentsegmentnum < maxNumSegments) { + count = 0; + currentsegmentnum++; + } else + break; + } + + SelectorEntry entry = values.next(); + Text url = entry.url; + String urlString = url.toString(); + URL u = null; + + String hostordomain = null; + + try { + if (normalise && normalizers != null) { + urlString = normalizers.normalize(urlString, + URLNormalizers.SCOPE_GENERATE_HOST_COUNT); + } + u = new URL(urlString); + if (byDomain) { + hostordomain = URLUtil.getDomainName(u); + } else { + hostordomain = new URL(urlString).getHost(); + } + } catch (Exception e) { + LOG.warn("Malformed URL: '" + urlString + "', skipping (" + + StringUtils.stringifyException(e) + ")"); + reporter.getCounter("Generator", "MALFORMED_URL").increment(1); + continue; + } + + hostordomain = hostordomain.toLowerCase(); + + // only filter if we are counting hosts or domains + if (maxCount > 0) { + int[] hostCount = hostCounts.get(hostordomain); + if (hostCount == null) { + hostCount = new int[] { 1, 0 }; + hostCounts.put(hostordomain, hostCount); + } + + // increment hostCount + hostCount[1]++; + + // check if topN reached, select next segment if it is + while (segCounts[hostCount[0] - 1] >= limit + && hostCount[0] < maxNumSegments) { + hostCount[0]++; + hostCount[1] = 0; + } + + // reached the limit of allowed URLs per host / domain + // see if we can put it in the next segment? + if (hostCount[1] >= maxCount) { + if (hostCount[0] < maxNumSegments) { + hostCount[0]++; + hostCount[1] = 0; + } else { + if (hostCount[1] == maxCount + 1 && LOG.isInfoEnabled()) { + LOG.info("Host or domain " + + hostordomain + + " has more than " + + maxCount + + " URLs for all " + + maxNumSegments + + " segments. Additional URLs won't be included in the fetchlist."); + } + // skip this entry + continue; + } + } + entry.segnum = new IntWritable(hostCount[0]); + segCounts[hostCount[0] - 1]++; + } else { + entry.segnum = new IntWritable(currentsegmentnum); + segCounts[currentsegmentnum - 1]++; + } + + output.collect(key, entry); + + // Count is incremented only when we keep the URL + // maxCount may cause us to skip it. + count++; + } + } + } + + // Allows the reducers to generate one subfile per + public static class GeneratorOutputFormat extends + MultipleSequenceFileOutputFormat<FloatWritable, SelectorEntry> { + // generate a filename based on the segnum stored for this entry + protected String generateFileNameForKeyValue(FloatWritable key, + SelectorEntry value, String name) { + return "fetchlist-" + value.segnum.toString() + "/" + name; + } + + } + + public static class DecreasingFloatComparator extends + FloatWritable.Comparator { + + /** Compares two FloatWritables decreasing. */ + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + return super.compare(b2, s2, l2, b1, s1, l1); + } + } + + public static class SelectorInverseMapper extends MapReduceBase implements + Mapper<FloatWritable, SelectorEntry, Text, SelectorEntry> { + + public void map(FloatWritable key, SelectorEntry value, + OutputCollector<Text, SelectorEntry> output, Reporter reporter) + throws IOException { + SelectorEntry entry = value; + output.collect(entry.url, entry); + } + } + + public static class PartitionReducer extends MapReduceBase implements + Reducer<Text, SelectorEntry, Text, CrawlDatum> { + + public void reduce(Text key, Iterator<SelectorEntry> values, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + // if using HashComparator, we get only one input key in case of + // hash collision + // so use only URLs from values + while (values.hasNext()) { + SelectorEntry entry = values.next(); + output.collect(entry.url, entry.datum); + } + } + + } + + /** Sort fetch lists by hash of URL. */ + public static class HashComparator extends WritableComparator { + public HashComparator() { + super(Text.class); + } + + @SuppressWarnings("rawtypes") + public int compare(WritableComparable a, WritableComparable b) { + Text url1 = (Text) a; + Text url2 = (Text) b; + int hash1 = hash(url1.getBytes(), 0, url1.getLength()); + int hash2 = hash(url2.getBytes(), 0, url2.getLength()); + return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1)); + } + + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + int hash1 = hash(b1, s1, l1); + int hash2 = hash(b2, s2, l2); + return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1)); + } + + private static int hash(byte[] bytes, int start, int length) { + int hash = 1; + // make later bytes more significant in hash code, so that sorting + // by + // hashcode correlates less with by-host ordering. + for (int i = length - 1; i >= 0; i--) + hash = (31 * hash) + (int) bytes[start + i]; + return hash; + } + } + + /** + * Update the CrawlDB so that the next generate won't include the same URLs. + */ + public static class CrawlDbUpdater extends MapReduceBase implements + Mapper<Text, CrawlDatum, Text, CrawlDatum>, + Reducer<Text, CrawlDatum, Text, CrawlDatum> { + long generateTime; + + public void configure(JobConf job) { + generateTime = job.getLong(Nutch.GENERATE_TIME_KEY, 0L); + } + + public void map(Text key, CrawlDatum value, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + output.collect(key, value); + } + + private CrawlDatum orig = new CrawlDatum(); + private LongWritable genTime = new LongWritable(0L); + + public void reduce(Text key, Iterator<CrawlDatum> values, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + genTime.set(0L); + while (values.hasNext()) { + CrawlDatum val = values.next(); + if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) { + LongWritable gt = (LongWritable) val.getMetaData().get( + Nutch.WRITABLE_GENERATE_TIME_KEY); + genTime.set(gt.get()); + if (genTime.get() != generateTime) { + orig.set(val); + genTime.set(0L); + continue; + } + } else { + orig.set(val); + } + } + if (genTime.get() != 0L) { + orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime); + } + output.collect(key, orig); + } + } + + public Generator() { + } + + public Generator(Configuration conf) { + setConf(conf); + } + + public Path[] generate(Path dbDir, Path segments, int numLists, long topN, + long curTime) throws IOException { + + JobConf job = new NutchJob(getConf()); + boolean filter = job.getBoolean(GENERATOR_FILTER, true); + boolean normalise = job.getBoolean(GENERATOR_NORMALISE, true); + return generate(dbDir, segments, numLists, topN, curTime, filter, + normalise, false, 1, null); + } + + /** + * old signature used for compatibility - does not specify whether or not to + * normalise and set the number of segments to 1 + **/ + public Path[] generate(Path dbDir, Path segments, int numLists, long topN, + long curTime, boolean filter, boolean force) throws IOException { + return generate(dbDir, segments, numLists, topN, curTime, filter, true, + force, 1, null); + } + + /** + * Generate fetchlists in one or more segments. Whether to filter URLs or not + * is read from the crawl.generate.filter property in the configuration files. + * If the property is not found, the URLs are filtered. Same for the + * normalisation. + * + * @param dbDir + * Crawl database directory + * @param segments + * Segments directory + * @param numLists + * Number of reduce tasks + * @param topN + * Number of top URLs to be selected + * @param curTime + * Current time in milliseconds + * + * @return Path to generated segment or null if no entries were selected + * + * @throws IOException + * When an I/O error occurs + */ + public Path[] generate(Path dbDir, Path segments, int numLists, long topN, + long curTime, boolean filter, boolean norm, boolean force, + int maxNumSegments, String expr) throws IOException { + + Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + + "/generate-temp-" + java.util.UUID.randomUUID().toString()); + + Path lock = new Path(dbDir, CrawlDb.LOCK_NAME); + FileSystem fs = FileSystem.get(getConf()); + LockUtil.createLockFile(fs, lock, force); + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("Generator: starting at " + sdf.format(start)); + LOG.info("Generator: Selecting best-scoring urls due for fetch."); + LOG.info("Generator: filtering: " + filter); + LOG.info("Generator: normalizing: " + norm); + if (topN != Long.MAX_VALUE) { + LOG.info("Generator: topN: " + topN); + } + if (expr != null) { + LOG.info("Generator: expr: " + expr); + } + + // map to inverted subset due for fetch, sort by score + JobConf job = new NutchJob(getConf()); + job.setJobName("generate: select from " + dbDir); + + if (numLists == -1) { // for politeness make + numLists = job.getNumMapTasks(); // a partition per fetch task + } + if ("local".equals(job.get("mapreduce.framework.name")) && numLists != 1) { + // override + LOG.info("Generator: running in local mode, generating exactly one partition."); + numLists = 1; + } + job.setLong(GENERATOR_CUR_TIME, curTime); + // record real generation time + long generateTime = System.currentTimeMillis(); + job.setLong(Nutch.GENERATE_TIME_KEY, generateTime); + job.setLong(GENERATOR_TOP_N, topN); + job.setBoolean(GENERATOR_FILTER, filter); + job.setBoolean(GENERATOR_NORMALISE, norm); + job.setInt(GENERATOR_MAX_NUM_SEGMENTS, maxNumSegments); + if (expr != null) { + job.set(GENERATOR_EXPR, expr); + } + FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(Selector.class); + job.setPartitionerClass(Selector.class); + job.setReducerClass(Selector.class); + + FileOutputFormat.setOutputPath(job, tempDir); + job.setOutputFormat(SequenceFileOutputFormat.class); + job.setOutputKeyClass(FloatWritable.class); + job.setOutputKeyComparatorClass(DecreasingFloatComparator.class); + job.setOutputValueClass(SelectorEntry.class); + job.setOutputFormat(GeneratorOutputFormat.class); + + try { + JobClient.runJob(job); + } catch (IOException e) { + LockUtil.removeLockFile(fs, lock); + fs.delete(tempDir, true); + throw e; + } + + // read the subdirectories generated in the temp + // output and turn them into segments + List<Path> generatedSegments = new ArrayList<Path>(); + + FileStatus[] status = fs.listStatus(tempDir); + try { + for (FileStatus stat : status) { + Path subfetchlist = stat.getPath(); + if (!subfetchlist.getName().startsWith("fetchlist-")) + continue; + // start a new partition job for this segment + Path newSeg = partitionSegment(fs, segments, subfetchlist, numLists); + generatedSegments.add(newSeg); + } + } catch (Exception e) { + LOG.warn("Generator: exception while partitioning segments, exiting ..."); + fs.delete(tempDir, true); + return null; + } + + if (generatedSegments.size() == 0) { + LOG.warn("Generator: 0 records selected for fetching, exiting ..."); + LockUtil.removeLockFile(fs, lock); + fs.delete(tempDir, true); + return null; + } + + if (getConf().getBoolean(GENERATE_UPDATE_CRAWLDB, false)) { + // update the db from tempDir + Path tempDir2 = new Path(getConf().get("mapred.temp.dir", ".") + + "/generate-temp-" + java.util.UUID.randomUUID().toString()); + + job = new NutchJob(getConf()); + job.setJobName("generate: updatedb " + dbDir); + job.setLong(Nutch.GENERATE_TIME_KEY, generateTime); + for (Path segmpaths : generatedSegments) { + Path subGenDir = new Path(segmpaths, CrawlDatum.GENERATE_DIR_NAME); + FileInputFormat.addInputPath(job, subGenDir); + } + FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); + job.setInputFormat(SequenceFileInputFormat.class); + job.setMapperClass(CrawlDbUpdater.class); + job.setReducerClass(CrawlDbUpdater.class); + job.setOutputFormat(MapFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(CrawlDatum.class); + FileOutputFormat.setOutputPath(job, tempDir2); + try { + JobClient.runJob(job); + CrawlDb.install(job, dbDir); + } catch (IOException e) { + LockUtil.removeLockFile(fs, lock); + fs.delete(tempDir, true); + fs.delete(tempDir2, true); + throw e; + } + fs.delete(tempDir2, true); + } + + LockUtil.removeLockFile(fs, lock); + fs.delete(tempDir, true); + + long end = System.currentTimeMillis(); + LOG.info("Generator: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + + Path[] patharray = new Path[generatedSegments.size()]; + return generatedSegments.toArray(patharray); + } + + private Path partitionSegment(FileSystem fs, Path segmentsDir, Path inputDir, + int numLists) throws IOException { + // invert again, partition by host/domain/IP, sort by url hash + if (LOG.isInfoEnabled()) { + LOG.info("Generator: Partitioning selected urls for politeness."); + } + Path segment = new Path(segmentsDir, generateSegmentName()); + Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME); + + LOG.info("Generator: segment: " + segment); + + NutchJob job = new NutchJob(getConf()); + job.setJobName("generate: partition " + segment); + + job.setInt("partition.url.seed", new Random().nextInt()); + + FileInputFormat.addInputPath(job, inputDir); + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(SelectorInverseMapper.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(SelectorEntry.class); + job.setPartitionerClass(URLPartitioner.class); + job.setReducerClass(PartitionReducer.class); + job.setNumReduceTasks(numLists); + + FileOutputFormat.setOutputPath(job, output); + job.setOutputFormat(SequenceFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(CrawlDatum.class); + job.setOutputKeyComparatorClass(HashComparator.class); + JobClient.runJob(job); + return segment; + } + + private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); + + public static synchronized String generateSegmentName() { + try { + Thread.sleep(1000); + } catch (Throwable t) { + } + ; + return sdf.format(new Date(System.currentTimeMillis())); + } + + /** + * Generate a fetchlist from the crawldb. + */ + public static void main(String args[]) throws Exception { + int res = ToolRunner + .run(NutchConfiguration.create(), new Generator(), args); + System.exit(res); + } + + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.out + .println("Usage: Generator <crawldb> <segments_dir> [-force] [-topN N] [-numFetchers numFetchers] [-expr <expr>] [-adddays <numDays>] [-noFilter] [-noNorm] [-maxNumSegments <num>]"); + return -1; + } + + Path dbDir = new Path(args[0]); + Path segmentsDir = new Path(args[1]); + long curTime = System.currentTimeMillis(); + long topN = Long.MAX_VALUE; + int numFetchers = -1; + boolean filter = true; + boolean norm = true; + boolean force = false; + String expr = null; + int maxNumSegments = 1; + + for (int i = 2; i < args.length; i++) { + if ("-topN".equals(args[i])) { + topN = Long.parseLong(args[i + 1]); + i++; + } else if ("-numFetchers".equals(args[i])) { + numFetchers = Integer.parseInt(args[i + 1]); + i++; + } else if ("-adddays".equals(args[i])) { + long numDays = Integer.parseInt(args[i + 1]); + curTime += numDays * 1000L * 60 * 60 * 24; + } else if ("-noFilter".equals(args[i])) { + filter = false; + } else if ("-noNorm".equals(args[i])) { + norm = false; + } else if ("-force".equals(args[i])) { + force = true; + } else if ("-maxNumSegments".equals(args[i])) { + maxNumSegments = Integer.parseInt(args[i + 1]); + } else if ("-expr".equals(args[i])) { + expr = args[i + 1]; + } + + } + + try { + Path[] segs = generate(dbDir, segmentsDir, numFetchers, topN, curTime, + filter, norm, force, maxNumSegments, expr); + if (segs == null) + return 1; + } catch (Exception e) { + LOG.error("Generator: " + StringUtils.stringifyException(e)); + return -1; + } + return 0; + } + + @Override + public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception { + + Map<String, Object> results = new HashMap<String, Object>(); + + long curTime = System.currentTimeMillis(); + long topN = Long.MAX_VALUE; + int numFetchers = -1; + boolean filter = true; + boolean norm = true; + boolean force = false; + int maxNumSegments = 1; + String expr = null; + + Path crawlDb; + if(args.containsKey(Nutch.ARG_CRAWLDB)) { + Object crawldbPath = args.get(Nutch.ARG_CRAWLDB); + if(crawldbPath instanceof Path) { + crawlDb = (Path) crawldbPath; + } + else { + crawlDb = new Path(crawldbPath.toString()); + } + } + else { + crawlDb = new Path(crawlId+"/crawldb"); + } + + Path segmentsDir; + if(args.containsKey(Nutch.ARG_SEGMENTDIR)) { + Object segDir = args.get(Nutch.ARG_SEGMENTDIR); + if(segDir instanceof Path) { + segmentsDir = (Path) segDir; + } + else { + segmentsDir = new Path(segDir.toString()); + } + } + else { + segmentsDir = new Path(crawlId+"/segments"); + } + + if (args.containsKey("expr")) { + expr = (String)args.get("expr"); + } + if (args.containsKey("topN")) { + topN = Long.parseLong((String)args.get("topN")); + } + if (args.containsKey("numFetchers")) { + numFetchers = Integer.parseInt((String)args.get("numFetchers")); + } + if (args.containsKey("adddays")) { + long numDays = Integer.parseInt((String)args.get("adddays")); + curTime += numDays * 1000L * 60 * 60 * 24; + } + if (args.containsKey("noFilter")) { + filter = false; + } + if (args.containsKey("noNorm")) { + norm = false; + } + if (args.containsKey("force")) { + force = true; + } + if (args.containsKey("maxNumSegments")) { + maxNumSegments = Integer.parseInt((String)args.get("maxNumSegments")); + } + + try { + Path[] segs = generate(crawlDb, segmentsDir, numFetchers, topN, curTime, + filter, norm, force, maxNumSegments, expr); + if (segs == null){ + results.put(Nutch.VAL_RESULT, Integer.toString(1)); + return results; + } + + } catch (Exception e) { + LOG.error("Generator: " + StringUtils.stringifyException(e)); + results.put(Nutch.VAL_RESULT, Integer.toString(-1)); + return results; + } + results.put(Nutch.VAL_RESULT, Integer.toString(0)); + return results; + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/Injector.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/Injector.java b/nutch-core/src/main/java/org/apache/nutch/crawl/Injector.java new file mode 100644 index 0000000..383aaf1 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/Injector.java @@ -0,0 +1,510 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.net.URLNormalizers; +import org.apache.nutch.scoring.ScoringFilterException; +import org.apache.nutch.scoring.ScoringFilters; +import org.apache.nutch.util.LockUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchTool; +import org.apache.nutch.util.TimingUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +/** + * Injector takes a flat file of URLs and merges ("injects") these URLs into the + * CrawlDb. Useful for bootstrapping a Nutch crawl. The URL files contain one + * URL per line, optionally followed by custom metadata separated by tabs with + * the metadata key separated from the corresponding value by '='. + * </p> + * <p> + * Note, that some metadata keys are reserved: + * <dl> + * <dt>nutch.score</dt> + * <dd>allows to set a custom score for a specific URL</dd> + * <dt>nutch.fetchInterval</dt> + * <dd>allows to set a custom fetch interval for a specific URL</dd> + * <dt>nutch.fetchInterval.fixed</dt> + * <dd>allows to set a custom fetch interval for a specific URL that is not + * changed by AdaptiveFetchSchedule</dd> + * </dl> + * </p> + * <p> + * Example: + * + * <pre> + * http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t userType=open_source + * </pre> + * </p> + **/ +public class Injector extends NutchTool implements Tool { + public static final Logger LOG = LoggerFactory.getLogger(Injector.class); + + /** metadata key reserved for setting a custom score for a specific URL */ + public static String nutchScoreMDName = "nutch.score"; + + /** + * metadata key reserved for setting a custom fetchInterval for a specific URL + */ + public static String nutchFetchIntervalMDName = "nutch.fetchInterval"; + + /** + * metadata key reserved for setting a fixed custom fetchInterval for a + * specific URL + */ + public static String nutchFixedFetchIntervalMDName = "nutch.fetchInterval.fixed"; + + public static class InjectMapper + extends Mapper<Text, Writable, Text, CrawlDatum> { + public static final String URL_NORMALIZING_SCOPE = "crawldb.url.normalizers.scope"; + public static final String TAB_CHARACTER = "\t"; + public static final String EQUAL_CHARACTER = "="; + + private URLNormalizers urlNormalizers; + private int interval; + private float scoreInjected; + private URLFilters filters; + private ScoringFilters scfilters; + private long curTime; + private boolean url404Purging; + private String scope; + + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + scope = conf.get(URL_NORMALIZING_SCOPE, URLNormalizers.SCOPE_INJECT); + urlNormalizers = new URLNormalizers(conf, scope); + interval = conf.getInt("db.fetch.interval.default", 2592000); + filters = new URLFilters(conf); + scfilters = new ScoringFilters(conf); + scoreInjected = conf.getFloat("db.score.injected", 1.0f); + curTime = conf.getLong("injector.current.time", + System.currentTimeMillis()); + url404Purging = conf.getBoolean(CrawlDb.CRAWLDB_PURGE_404, false); + } + + /* Filter and normalize the input url */ + private String filterNormalize(String url) { + if (url != null) { + try { + url = urlNormalizers.normalize(url, scope); // normalize the url + url = filters.filter(url); // filter the url + } catch (Exception e) { + LOG.warn("Skipping " + url + ":" + e); + url = null; + } + } + return url; + } + + /** + * Extract metadata that could be passed along with url in a seeds file. + * Metadata must be key-value pair(s) and separated by a TAB_CHARACTER + */ + private void processMetaData(String metadata, CrawlDatum datum, + String url) { + String[] splits = metadata.split(TAB_CHARACTER); + + for (String split : splits) { + // find separation between name and value + int indexEquals = split.indexOf(EQUAL_CHARACTER); + if (indexEquals == -1) // skip anything without a EQUAL_CHARACTER + continue; + + String metaname = split.substring(0, indexEquals); + String metavalue = split.substring(indexEquals + 1); + + try { + if (metaname.equals(nutchScoreMDName)) { + datum.setScore(Float.parseFloat(metavalue)); + } else if (metaname.equals(nutchFetchIntervalMDName)) { + datum.setFetchInterval(Integer.parseInt(metavalue)); + } else if (metaname.equals(nutchFixedFetchIntervalMDName)) { + int fixedInterval = Integer.parseInt(metavalue); + if (fixedInterval > -1) { + // Set writable using float. Float is used by + // AdaptiveFetchSchedule + datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY, + new FloatWritable(fixedInterval)); + datum.setFetchInterval(fixedInterval); + } + } else { + datum.getMetaData().put(new Text(metaname), new Text(metavalue)); + } + } catch (NumberFormatException nfe) { + LOG.error("Invalid number '" + metavalue + "' in metadata '" + + metaname + "' for url " + url); + } + } + } + + public void map(Text key, Writable value, Context context) + throws IOException, InterruptedException { + if (value instanceof Text) { + // if its a url from the seed list + String url = key.toString().trim(); + + // remove empty string or string starting with '#' + if (url.length() == 0 || url.startsWith("#")) + return; + + url = filterNormalize(url); + if (url == null) { + context.getCounter("injector", "urls_filtered").increment(1); + } else { + CrawlDatum datum = new CrawlDatum(); + datum.setStatus(CrawlDatum.STATUS_INJECTED); + datum.setFetchTime(curTime); + datum.setScore(scoreInjected); + datum.setFetchInterval(interval); + + String metadata = value.toString().trim(); + if (metadata.length() > 0) + processMetaData(metadata, datum, url); + + try { + key.set(url); + scfilters.injectedScore(key, datum); + } catch (ScoringFilterException e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Cannot filter injected score for url " + url + + ", using default (" + e.getMessage() + ")"); + } + } + context.getCounter("injector", "urls_injected").increment(1); + context.write(key, datum); + } + } else if (value instanceof CrawlDatum) { + // if its a crawlDatum from the input crawldb, emulate CrawlDbFilter's + // map() + CrawlDatum datum = (CrawlDatum) value; + + // remove 404 urls + if (url404Purging && CrawlDatum.STATUS_DB_GONE == datum.getStatus()) + return; + + String url = filterNormalize(key.toString()); + if (url != null) { + key.set(url); + context.write(key, datum); + } + } + } + } + + /** Combine multiple new entries for a url. */ + public static class InjectReducer + extends Reducer<Text, CrawlDatum, Text, CrawlDatum> { + private int interval; + private float scoreInjected; + private boolean overwrite = false; + private boolean update = false; + private CrawlDatum old = new CrawlDatum(); + private CrawlDatum injected = new CrawlDatum(); + + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + interval = conf.getInt("db.fetch.interval.default", 2592000); + scoreInjected = conf.getFloat("db.score.injected", 1.0f); + overwrite = conf.getBoolean("db.injector.overwrite", false); + update = conf.getBoolean("db.injector.update", false); + LOG.info("Injector: overwrite: " + overwrite); + LOG.info("Injector: update: " + update); + } + + /** + * Merge the input records as per rules below : + * + * <pre> + * 1. If there is ONLY new injected record ==> emit injected record + * 2. If there is ONLY old record ==> emit existing record + * 3. If BOTH new and old records are present: + * (a) If 'overwrite' is true ==> emit injected record + * (b) If 'overwrite' is false : + * (i) If 'update' is false ==> emit existing record + * (ii) If 'update' is true ==> update existing record and emit it + * </pre> + * + * For more details @see NUTCH-1405 + */ + public void reduce(Text key, Iterable<CrawlDatum> values, Context context) + throws IOException, InterruptedException { + + boolean oldSet = false; + boolean injectedSet = false; + + // If we encounter a datum with status as STATUS_INJECTED, then its a + // newly injected record. All other statuses correspond to an old record. + for (CrawlDatum val : values) { + if (val.getStatus() == CrawlDatum.STATUS_INJECTED) { + injected.set(val); + injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); + injectedSet = true; + } else { + old.set(val); + oldSet = true; + } + } + + CrawlDatum result; + if (injectedSet && (!oldSet || overwrite)) { + // corresponds to rules (1) and (3.a) in the method description + result = injected; + } else { + // corresponds to rules (2) and (3.b) in the method description + result = old; + + if (injectedSet && update) { + // corresponds to rule (3.b.ii) in the method description + old.putAllMetaData(injected); + old.setScore(injected.getScore() != scoreInjected + ? injected.getScore() : old.getScore()); + old.setFetchInterval(injected.getFetchInterval() != interval + ? injected.getFetchInterval() : old.getFetchInterval()); + } + } + if (injectedSet && oldSet) { + context.getCounter("injector", "urls_merged").increment(1); + } + context.write(key, result); + } + } + + public Injector() { + } + + public Injector(Configuration conf) { + setConf(conf); + } + + public void inject(Path crawlDb, Path urlDir) + throws IOException, ClassNotFoundException, InterruptedException { + inject(crawlDb, urlDir, false, false); + } + + public void inject(Path crawlDb, Path urlDir, boolean overwrite, + boolean update) throws IOException, ClassNotFoundException, InterruptedException { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + + if (LOG.isInfoEnabled()) { + LOG.info("Injector: starting at " + sdf.format(start)); + LOG.info("Injector: crawlDb: " + crawlDb); + LOG.info("Injector: urlDir: " + urlDir); + LOG.info("Injector: Converting injected urls to crawl db entries."); + } + + // set configuration + Configuration conf = getConf(); + conf.setLong("injector.current.time", System.currentTimeMillis()); + conf.setBoolean("db.injector.overwrite", overwrite); + conf.setBoolean("db.injector.update", update); + conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); + + // create all the required paths + FileSystem fs = FileSystem.get(conf); + Path current = new Path(crawlDb, CrawlDb.CURRENT_NAME); + if (!fs.exists(current)) + fs.mkdirs(current); + + Path tempCrawlDb = new Path(crawlDb, + "crawldb-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + // lock an existing crawldb to prevent multiple simultaneous updates + Path lock = new Path(crawlDb, CrawlDb.LOCK_NAME); + LockUtil.createLockFile(fs, lock, false); + + // configure job + Job job = Job.getInstance(conf, "inject " + urlDir); + job.setJarByClass(Injector.class); + job.setMapperClass(InjectMapper.class); + job.setReducerClass(InjectReducer.class); + job.setOutputFormatClass(MapFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(CrawlDatum.class); + job.setSpeculativeExecution(false); + + // set input and output paths of the job + MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class); + MultipleInputs.addInputPath(job, urlDir, KeyValueTextInputFormat.class); + FileOutputFormat.setOutputPath(job, tempCrawlDb); + + try { + // run the job + job.waitForCompletion(true); + + // save output and perform cleanup + CrawlDb.install(job, crawlDb); + + if (LOG.isInfoEnabled()) { + long urlsInjected = job.getCounters() + .findCounter("injector", "urls_injected").getValue(); + long urlsFiltered = job.getCounters() + .findCounter("injector", "urls_filtered").getValue(); + long urlsMerged = job.getCounters() + .findCounter("injector", "urls_merged").getValue(); + LOG.info("Injector: Total urls rejected by filters: " + urlsFiltered); + LOG.info( + "Injector: Total urls injected after normalization and filtering: " + + urlsInjected); + LOG.info("Injector: Total urls injected but already in CrawlDb: " + + urlsMerged); + LOG.info("Injector: Total new urls injected: " + + (urlsInjected - urlsMerged)); + + long end = System.currentTimeMillis(); + LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + } + } catch (IOException e) { + if (fs.exists(tempCrawlDb)) { + fs.delete(tempCrawlDb, true); + } + LockUtil.removeLockFile(fs, lock); + throw e; + } + } + + public void usage() { + System.err.println( + "Usage: Injector <crawldb> <url_dir> [-overwrite] [-update]\n"); + System.err.println( + " <crawldb>\tPath to a crawldb directory. If not present, a new one would be created."); + System.err.println( + " <url_dir>\tPath to directory with URL file(s) containing urls to be injected. A URL file"); + System.err.println( + " \tshould have one URL per line, optionally followed by custom metadata."); + System.err.println( + " \tBlank lines or lines starting with a '#' would be ignored. Custom metadata must"); + System.err + .println(" \tbe of form 'key=value' and separated by tabs."); + System.err.println(" \tBelow are reserved metadata keys:\n"); + System.err.println(" \t\tnutch.score: A custom score for a url"); + System.err.println( + " \t\tnutch.fetchInterval: A custom fetch interval for a url"); + System.err.println( + " \t\tnutch.fetchInterval.fixed: A custom fetch interval for a url that is not " + + "changed by AdaptiveFetchSchedule\n"); + System.err.println(" \tExample:"); + System.err.println(" \t http://www.apache.org/"); + System.err.println( + " \t http://www.nutch.org/ \\t nutch.score=10 \\t nutch.fetchInterval=2592000 \\t userType=open_source\n"); + System.err.println( + " -overwrite\tOverwite existing crawldb records by the injected records. Has precedence over 'update'"); + System.err.println( + " -update \tUpdate existing crawldb records with the injected records. Old metadata is preserved"); + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new Injector(), args); + System.exit(res); + } + + public int run(String[] args) throws Exception { + if (args.length < 2) { + usage(); + return -1; + } + + boolean overwrite = false; + boolean update = false; + + for (int i = 2; i < args.length; i++) { + if (args[i].equals("-overwrite")) { + overwrite = true; + } else if (args[i].equals("-update")) { + update = true; + } else { + LOG.info("Injector: Found invalid argument \"" + args[i] + "\"\n"); + usage(); + return -1; + } + } + + try { + inject(new Path(args[0]), new Path(args[1]), overwrite, update); + return 0; + } catch (Exception e) { + LOG.error("Injector: " + StringUtils.stringifyException(e)); + return -1; + } + } + + /** + * Used by the Nutch REST service + */ + public Map<String, Object> run(Map<String, Object> args, String crawlId) + throws Exception { + if (args.size() < 1) { + throw new IllegalArgumentException("Required arguments <url_dir>"); + } + Map<String, Object> results = new HashMap<String, Object>(); + + Path crawlDb; + if (args.containsKey(Nutch.ARG_CRAWLDB)) { + Object crawldbPath = args.get(Nutch.ARG_CRAWLDB); + if (crawldbPath instanceof Path) { + crawlDb = (Path) crawldbPath; + } else { + crawlDb = new Path(crawldbPath.toString()); + } + } else { + crawlDb = new Path(crawlId + "/crawldb"); + } + + Path input; + Object path = args.get(Nutch.ARG_SEEDDIR); + if (path instanceof Path) { + input = (Path) path; + } else { + input = new Path(path.toString()); + } + + inject(crawlDb, input); + results.put(Nutch.VAL_RESULT, Integer.toString(0)); + return results; + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/Inlink.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/Inlink.java b/nutch-core/src/main/java/org/apache/nutch/crawl/Inlink.java new file mode 100644 index 0000000..67df357 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/Inlink.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.*; +import org.apache.hadoop.io.*; + +/* An incoming link to a page. */ +public class Inlink implements Writable { + + private String fromUrl; + private String anchor; + + public Inlink() { + } + + public Inlink(String fromUrl, String anchor) { + this.fromUrl = fromUrl; + this.anchor = anchor; + } + + public void readFields(DataInput in) throws IOException { + fromUrl = Text.readString(in); + anchor = Text.readString(in); + } + + /** Skips over one Inlink in the input. */ + public static void skip(DataInput in) throws IOException { + Text.skip(in); // skip fromUrl + Text.skip(in); // skip anchor + } + + public void write(DataOutput out) throws IOException { + Text.writeString(out, fromUrl); + Text.writeString(out, anchor); + } + + public static Inlink read(DataInput in) throws IOException { + Inlink inlink = new Inlink(); + inlink.readFields(in); + return inlink; + } + + public String getFromUrl() { + return fromUrl; + } + + public String getAnchor() { + return anchor; + } + + public boolean equals(Object o) { + if (!(o instanceof Inlink)) + return false; + Inlink other = (Inlink) o; + return this.fromUrl.equals(other.fromUrl) + && this.anchor.equals(other.anchor); + } + + public int hashCode() { + return fromUrl.hashCode() ^ anchor.hashCode(); + } + + public String toString() { + return "fromUrl: " + fromUrl + " anchor: " + anchor; + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/Inlinks.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/Inlinks.java b/nutch-core/src/main/java/org/apache/nutch/crawl/Inlinks.java new file mode 100644 index 0000000..89f9731 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/Inlinks.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.*; +import java.net.*; +import java.util.*; + +import org.apache.hadoop.io.*; + +/** A list of {@link Inlink}s. */ +public class Inlinks implements Writable { + private HashSet<Inlink> inlinks = new HashSet<Inlink>(1); + + public void add(Inlink inlink) { + inlinks.add(inlink); + } + + public void add(Inlinks inlinks) { + this.inlinks.addAll(inlinks.inlinks); + } + + public Iterator<Inlink> iterator() { + return this.inlinks.iterator(); + } + + public int size() { + return inlinks.size(); + } + + public void clear() { + inlinks.clear(); + } + + public void readFields(DataInput in) throws IOException { + int length = in.readInt(); + inlinks.clear(); + for (int i = 0; i < length; i++) { + add(Inlink.read(in)); + } + } + + public void write(DataOutput out) throws IOException { + out.writeInt(inlinks.size()); + Iterator<Inlink> it = inlinks.iterator(); + while (it.hasNext()) { + it.next().write(out); + } + } + + public String toString() { + StringBuffer buffer = new StringBuffer(); + buffer.append("Inlinks:\n"); + Iterator<Inlink> it = inlinks.iterator(); + while (it.hasNext()) { + buffer.append(" "); + buffer.append(it.next()); + buffer.append("\n"); + } + return buffer.toString(); + } + + /** + * Return the set of anchor texts. Only a single anchor with a given text is + * permitted from a given domain. + */ + public String[] getAnchors() { + HashMap<String, Set<String>> domainToAnchors = new HashMap<String, Set<String>>(); + ArrayList<String> results = new ArrayList<String>(); + Iterator<Inlink> it = inlinks.iterator(); + while (it.hasNext()) { + Inlink inlink = it.next(); + String anchor = inlink.getAnchor(); + + if (anchor.length() == 0) // skip empty anchors + continue; + String domain = null; // extract domain name + try { + domain = new URL(inlink.getFromUrl()).getHost(); + } catch (MalformedURLException e) { + } + Set<String> domainAnchors = domainToAnchors.get(domain); + if (domainAnchors == null) { + domainAnchors = new HashSet<String>(); + domainToAnchors.put(domain, domainAnchors); + } + if (domainAnchors.add(anchor)) { // new anchor from domain + results.add(anchor); // collect it + } + } + + return results.toArray(new String[results.size()]); + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDb.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDb.java b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDb.java new file mode 100644 index 0000000..908a8e9 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDb.java @@ -0,0 +1,428 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.*; +import java.text.SimpleDateFormat; +import java.util.*; +import java.net.*; + +// Commons Logging imports +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.io.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.*; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.net.URLNormalizers; +import org.apache.nutch.parse.*; +import org.apache.nutch.util.HadoopFSUtil; +import org.apache.nutch.util.LockUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.NutchTool; +import org.apache.nutch.util.TimingUtil; + +/** Maintains an inverted link map, listing incoming links for each url. */ +public class LinkDb extends NutchTool implements Tool, + Mapper<Text, ParseData, Text, Inlinks> { + + public static final Logger LOG = LoggerFactory.getLogger(LinkDb.class); + + public static final String IGNORE_INTERNAL_LINKS = "linkdb.ignore.internal.links"; + public static final String IGNORE_EXTERNAL_LINKS = "linkdb.ignore.external.links"; + + public static final String CURRENT_NAME = "current"; + public static final String LOCK_NAME = ".locked"; + + private int maxAnchorLength; + private boolean ignoreInternalLinks; + private boolean ignoreExternalLinks; + private URLFilters urlFilters; + private URLNormalizers urlNormalizers; + + public LinkDb() { + } + + public LinkDb(Configuration conf) { + setConf(conf); + } + + public void configure(JobConf job) { + maxAnchorLength = job.getInt("linkdb.max.anchor.length", 100); + ignoreInternalLinks = job.getBoolean(IGNORE_INTERNAL_LINKS, true); + ignoreExternalLinks = job.getBoolean(IGNORE_EXTERNAL_LINKS, false); + + if (job.getBoolean(LinkDbFilter.URL_FILTERING, false)) { + urlFilters = new URLFilters(job); + } + if (job.getBoolean(LinkDbFilter.URL_NORMALIZING, false)) { + urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_LINKDB); + } + } + + public void close() { + } + + public void map(Text key, ParseData parseData, + OutputCollector<Text, Inlinks> output, Reporter reporter) + throws IOException { + String fromUrl = key.toString(); + String fromHost = getHost(fromUrl); + if (urlNormalizers != null) { + try { + fromUrl = urlNormalizers + .normalize(fromUrl, URLNormalizers.SCOPE_LINKDB); // normalize the + // url + } catch (Exception e) { + LOG.warn("Skipping " + fromUrl + ":" + e); + fromUrl = null; + } + } + if (fromUrl != null && urlFilters != null) { + try { + fromUrl = urlFilters.filter(fromUrl); // filter the url + } catch (Exception e) { + LOG.warn("Skipping " + fromUrl + ":" + e); + fromUrl = null; + } + } + if (fromUrl == null) + return; // discard all outlinks + Outlink[] outlinks = parseData.getOutlinks(); + Inlinks inlinks = new Inlinks(); + for (int i = 0; i < outlinks.length; i++) { + Outlink outlink = outlinks[i]; + String toUrl = outlink.getToUrl(); + + if (ignoreInternalLinks) { + String toHost = getHost(toUrl); + if (toHost == null || toHost.equals(fromHost)) { // internal link + continue; // skip it + } + } else if (ignoreExternalLinks) { + String toHost = getHost(toUrl); + if (toHost == null || !toHost.equals(fromHost)) { // external link + continue; // skip it + } + } + if (urlNormalizers != null) { + try { + toUrl = urlNormalizers.normalize(toUrl, URLNormalizers.SCOPE_LINKDB); // normalize + // the + // url + } catch (Exception e) { + LOG.warn("Skipping " + toUrl + ":" + e); + toUrl = null; + } + } + if (toUrl != null && urlFilters != null) { + try { + toUrl = urlFilters.filter(toUrl); // filter the url + } catch (Exception e) { + LOG.warn("Skipping " + toUrl + ":" + e); + toUrl = null; + } + } + if (toUrl == null) + continue; + inlinks.clear(); + String anchor = outlink.getAnchor(); // truncate long anchors + if (anchor.length() > maxAnchorLength) { + anchor = anchor.substring(0, maxAnchorLength); + } + inlinks.add(new Inlink(fromUrl, anchor)); // collect inverted link + output.collect(new Text(toUrl), inlinks); + } + } + + private String getHost(String url) { + try { + return new URL(url).getHost().toLowerCase(); + } catch (MalformedURLException e) { + return null; + } + } + + public void invert(Path linkDb, final Path segmentsDir, boolean normalize, + boolean filter, boolean force) throws IOException { + final FileSystem fs = FileSystem.get(getConf()); + FileStatus[] files = fs.listStatus(segmentsDir, + HadoopFSUtil.getPassDirectoriesFilter(fs)); + invert(linkDb, HadoopFSUtil.getPaths(files), normalize, filter, force); + } + + public void invert(Path linkDb, Path[] segments, boolean normalize, + boolean filter, boolean force) throws IOException { + JobConf job = LinkDb.createJob(getConf(), linkDb, normalize, filter); + Path lock = new Path(linkDb, LOCK_NAME); + FileSystem fs = FileSystem.get(getConf()); + LockUtil.createLockFile(fs, lock, force); + Path currentLinkDb = new Path(linkDb, CURRENT_NAME); + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + if (LOG.isInfoEnabled()) { + LOG.info("LinkDb: starting at " + sdf.format(start)); + LOG.info("LinkDb: linkdb: " + linkDb); + LOG.info("LinkDb: URL normalize: " + normalize); + LOG.info("LinkDb: URL filter: " + filter); + if (job.getBoolean(IGNORE_INTERNAL_LINKS, true)) { + LOG.info("LinkDb: internal links will be ignored."); + } + if (job.getBoolean(IGNORE_EXTERNAL_LINKS, false)) { + LOG.info("LinkDb: external links will be ignored."); + } + } + if (job.getBoolean(IGNORE_INTERNAL_LINKS, true) + && job.getBoolean(IGNORE_EXTERNAL_LINKS, false)) { + LOG.warn("LinkDb: internal and external links are ignored! " + + "Nothing to do, actually. Exiting."); + LockUtil.removeLockFile(fs, lock); + return; + } + + for (int i = 0; i < segments.length; i++) { + if (LOG.isInfoEnabled()) { + LOG.info("LinkDb: adding segment: " + segments[i]); + } + FileInputFormat.addInputPath(job, new Path(segments[i], + ParseData.DIR_NAME)); + } + try { + JobClient.runJob(job); + } catch (IOException e) { + LockUtil.removeLockFile(fs, lock); + throw e; + } + if (fs.exists(currentLinkDb)) { + if (LOG.isInfoEnabled()) { + LOG.info("LinkDb: merging with existing linkdb: " + linkDb); + } + // try to merge + Path newLinkDb = FileOutputFormat.getOutputPath(job); + job = LinkDbMerger.createMergeJob(getConf(), linkDb, normalize, filter); + FileInputFormat.addInputPath(job, currentLinkDb); + FileInputFormat.addInputPath(job, newLinkDb); + try { + JobClient.runJob(job); + } catch (IOException e) { + LockUtil.removeLockFile(fs, lock); + fs.delete(newLinkDb, true); + throw e; + } + fs.delete(newLinkDb, true); + } + LinkDb.install(job, linkDb); + + long end = System.currentTimeMillis(); + LOG.info("LinkDb: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + } + + private static JobConf createJob(Configuration config, Path linkDb, + boolean normalize, boolean filter) { + Path newLinkDb = new Path("linkdb-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + JobConf job = new NutchJob(config); + job.setJobName("linkdb " + linkDb); + + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(LinkDb.class); + job.setCombinerClass(LinkDbMerger.class); + // if we don't run the mergeJob, perform normalization/filtering now + if (normalize || filter) { + try { + FileSystem fs = FileSystem.get(config); + if (!fs.exists(linkDb)) { + job.setBoolean(LinkDbFilter.URL_FILTERING, filter); + job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize); + } + } catch (Exception e) { + LOG.warn("LinkDb createJob: " + e); + } + } + job.setReducerClass(LinkDbMerger.class); + + FileOutputFormat.setOutputPath(job, newLinkDb); + job.setOutputFormat(MapFileOutputFormat.class); + job.setBoolean("mapred.output.compress", true); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Inlinks.class); + + return job; + } + + public static void install(JobConf job, Path linkDb) throws IOException { + Path newLinkDb = FileOutputFormat.getOutputPath(job); + FileSystem fs = new JobClient(job).getFs(); + Path old = new Path(linkDb, "old"); + Path current = new Path(linkDb, CURRENT_NAME); + if (fs.exists(current)) { + if (fs.exists(old)) + fs.delete(old, true); + fs.rename(current, old); + } + fs.mkdirs(linkDb); + fs.rename(newLinkDb, current); + if (fs.exists(old)) + fs.delete(old, true); + LockUtil.removeLockFile(fs, new Path(linkDb, LOCK_NAME)); + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new LinkDb(), args); + System.exit(res); + } + + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err + .println("Usage: LinkDb <linkdb> (-dir <segmentsDir> | <seg1> <seg2> ...) [-force] [-noNormalize] [-noFilter]"); + System.err.println("\tlinkdb\toutput LinkDb to create or update"); + System.err + .println("\t-dir segmentsDir\tparent directory of several segments, OR"); + System.err.println("\tseg1 seg2 ...\t list of segment directories"); + System.err + .println("\t-force\tforce update even if LinkDb appears to be locked (CAUTION advised)"); + System.err.println("\t-noNormalize\tdon't normalize link URLs"); + System.err.println("\t-noFilter\tdon't apply URLFilters to link URLs"); + return -1; + } + final FileSystem fs = FileSystem.get(getConf()); + Path db = new Path(args[0]); + ArrayList<Path> segs = new ArrayList<Path>(); + boolean filter = true; + boolean normalize = true; + boolean force = false; + for (int i = 1; i < args.length; i++) { + if (args[i].equals("-dir")) { + FileStatus[] paths = fs.listStatus(new Path(args[++i]), + HadoopFSUtil.getPassDirectoriesFilter(fs)); + segs.addAll(Arrays.asList(HadoopFSUtil.getPaths(paths))); + } else if (args[i].equalsIgnoreCase("-noNormalize")) { + normalize = false; + } else if (args[i].equalsIgnoreCase("-noFilter")) { + filter = false; + } else if (args[i].equalsIgnoreCase("-force")) { + force = true; + } else + segs.add(new Path(args[i])); + } + try { + invert(db, segs.toArray(new Path[segs.size()]), normalize, filter, force); + return 0; + } catch (Exception e) { + LOG.error("LinkDb: " + StringUtils.stringifyException(e)); + return -1; + } + } + + /* + * Used for Nutch REST service + */ + @Override + public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception { + + Map<String, Object> results = new HashMap<String, Object>(); + + Path linkdb; + if(args.containsKey(Nutch.ARG_LINKDB)) { + Object path = args.get(Nutch.ARG_LINKDB); + if(path instanceof Path) { + linkdb = (Path) path; + } + else { + linkdb = new Path(path.toString()); + } + } + else { + linkdb = new Path(crawlId+"/linkdb"); + } + + + ArrayList<Path> segs = new ArrayList<Path>(); + boolean filter = true; + boolean normalize = true; + boolean force = false; + if (args.containsKey("noNormalize")) { + normalize = false; + } + if (args.containsKey("noFilter")) { + filter = false; + } + if (args.containsKey("force")) { + force = true; + } + + Path segmentsDir; + final FileSystem fs = FileSystem.get(getConf()); + if(args.containsKey(Nutch.ARG_SEGMENTDIR)) { + Object segDir = args.get(Nutch.ARG_SEGMENTDIR); + if(segDir instanceof Path) { + segmentsDir = (Path) segDir; + } + else { + segmentsDir = new Path(segDir.toString()); + } + FileStatus[] paths = fs.listStatus(segmentsDir, + HadoopFSUtil.getPassDirectoriesFilter(fs)); + segs.addAll(Arrays.asList(HadoopFSUtil.getPaths(paths))); + } + else if(args.containsKey(Nutch.ARG_SEGMENT)) { + Object segments = args.get(Nutch.ARG_SEGMENT); + ArrayList<String> segmentList = new ArrayList<String>(); + if(segments instanceof ArrayList) { + segmentList = (ArrayList<String>)segments; + } + for(String segment: segmentList) { + segs.add(new Path(segment)); + } + } + else { + String segment_dir = crawlId+"/segments"; + File dir = new File(segment_dir); + File[] segmentsList = dir.listFiles(); + Arrays.sort(segmentsList, new Comparator<File>(){ + @Override + public int compare(File f1, File f2) { + if(f1.lastModified()>f2.lastModified()) + return -1; + else + return 0; + } + }); + segs.add(new Path(segmentsList[0].getPath())); + } + try { + invert(linkdb, segs.toArray(new Path[segs.size()]), normalize, filter, force); + results.put(Nutch.VAL_RESULT, Integer.toString(0)); + return results; + } catch (Exception e) { + LOG.error("LinkDb: " + StringUtils.stringifyException(e)); + results.put(Nutch.VAL_RESULT, Integer.toString(-1)); + return results; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbFilter.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbFilter.java b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbFilter.java new file mode 100644 index 0000000..1ff9b05 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbFilter.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.IOException; +import java.util.Iterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.net.URLNormalizers; + +/** + * This class provides a way to separate the URL normalization and filtering + * steps from the rest of LinkDb manipulation code. + * + * @author Andrzej Bialecki + */ +public class LinkDbFilter implements Mapper<Text, Inlinks, Text, Inlinks> { + public static final String URL_FILTERING = "linkdb.url.filters"; + + public static final String URL_NORMALIZING = "linkdb.url.normalizer"; + + public static final String URL_NORMALIZING_SCOPE = "linkdb.url.normalizer.scope"; + + private boolean filter; + + private boolean normalize; + + private URLFilters filters; + + private URLNormalizers normalizers; + + private String scope; + + public static final Logger LOG = LoggerFactory.getLogger(LinkDbFilter.class); + + private Text newKey = new Text(); + + public void configure(JobConf job) { + filter = job.getBoolean(URL_FILTERING, false); + normalize = job.getBoolean(URL_NORMALIZING, false); + if (filter) { + filters = new URLFilters(job); + } + if (normalize) { + scope = job.get(URL_NORMALIZING_SCOPE, URLNormalizers.SCOPE_LINKDB); + normalizers = new URLNormalizers(job, scope); + } + } + + public void close() { + } + + public void map(Text key, Inlinks value, + OutputCollector<Text, Inlinks> output, Reporter reporter) + throws IOException { + String url = key.toString(); + Inlinks result = new Inlinks(); + if (normalize) { + try { + url = normalizers.normalize(url, scope); // normalize the url + } catch (Exception e) { + LOG.warn("Skipping " + url + ":" + e); + url = null; + } + } + if (url != null && filter) { + try { + url = filters.filter(url); // filter the url + } catch (Exception e) { + LOG.warn("Skipping " + url + ":" + e); + url = null; + } + } + if (url == null) + return; // didn't pass the filters + Iterator<Inlink> it = value.iterator(); + String fromUrl = null; + while (it.hasNext()) { + Inlink inlink = it.next(); + fromUrl = inlink.getFromUrl(); + if (normalize) { + try { + fromUrl = normalizers.normalize(fromUrl, scope); // normalize the url + } catch (Exception e) { + LOG.warn("Skipping " + fromUrl + ":" + e); + fromUrl = null; + } + } + if (fromUrl != null && filter) { + try { + fromUrl = filters.filter(fromUrl); // filter the url + } catch (Exception e) { + LOG.warn("Skipping " + fromUrl + ":" + e); + fromUrl = null; + } + } + if (fromUrl != null) { + result.add(new Inlink(fromUrl, inlink.getAnchor())); + } + } + if (result.size() > 0) { // don't collect empty inlinks + newKey.set(url); + output.collect(newKey, result); + } + } +}
