http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbMerger.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbMerger.java b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbMerger.java new file mode 100644 index 0000000..39923ac --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbMerger.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.crawl; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Random; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapFileOutputFormat; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.TimingUtil; + +/** + * This tool merges several LinkDb-s into one, optionally filtering URLs through + * the current URLFilters, to skip prohibited URLs and links. + * + * <p> + * It's possible to use this tool just for filtering - in that case only one + * LinkDb should be specified in arguments. + * </p> + * <p> + * If more than one LinkDb contains information about the same URL, all inlinks + * are accumulated, but only at most <code>linkdb.max.inlinks</code> inlinks will + * ever be added. + * </p> + * <p> + * If activated, URLFilters will be applied to both the target URLs and to any + * incoming link URL. If a target URL is prohibited, all inlinks to that target + * will be removed, including the target URL. If some of incoming links are + * prohibited, only they will be removed, and they won't count when checking the + * above-mentioned maximum limit. + * + * @author Andrzej Bialecki + */ +public class LinkDbMerger extends Configured implements Tool, + Reducer<Text, Inlinks, Text, Inlinks> { + private static final Logger LOG = LoggerFactory.getLogger(LinkDbMerger.class); + + private int maxInlinks; + + public LinkDbMerger() { + + } + + public LinkDbMerger(Configuration conf) { + setConf(conf); + } + + public void reduce(Text key, Iterator<Inlinks> values, + OutputCollector<Text, Inlinks> output, Reporter reporter) + throws IOException { + + Inlinks result = new Inlinks(); + + while (values.hasNext()) { + Inlinks inlinks = values.next(); + + int end = Math.min(maxInlinks - result.size(), inlinks.size()); + Iterator<Inlink> it = inlinks.iterator(); + int i = 0; + while (it.hasNext() && i++ < end) { + result.add(it.next()); + } + } + if (result.size() == 0) + return; + output.collect(key, result); + + } + + public void configure(JobConf job) { + maxInlinks = job.getInt("linkdb.max.inlinks", 10000); + } + + public void close() throws IOException { + } + + public void merge(Path output, Path[] dbs, boolean normalize, boolean filter) + throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("LinkDb merge: starting at " + sdf.format(start)); + + JobConf job = createMergeJob(getConf(), output, normalize, filter); + for (int i = 0; i < dbs.length; i++) { + FileInputFormat.addInputPath(job, new Path(dbs[i], LinkDb.CURRENT_NAME)); + } + JobClient.runJob(job); + FileSystem fs = FileSystem.get(getConf()); + fs.mkdirs(output); + fs.rename(FileOutputFormat.getOutputPath(job), new Path(output, + LinkDb.CURRENT_NAME)); + + long end = System.currentTimeMillis(); + LOG.info("LinkDb merge: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + } + + public static JobConf createMergeJob(Configuration config, Path linkDb, + boolean normalize, boolean filter) { + Path newLinkDb = new Path("linkdb-merge-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + JobConf job = new NutchJob(config); + job.setJobName("linkdb merge " + linkDb); + + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(LinkDbFilter.class); + job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize); + job.setBoolean(LinkDbFilter.URL_FILTERING, filter); + job.setReducerClass(LinkDbMerger.class); + + FileOutputFormat.setOutputPath(job, newLinkDb); + job.setOutputFormat(MapFileOutputFormat.class); + job.setBoolean("mapred.output.compress", true); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Inlinks.class); + + // https://issues.apache.org/jira/browse/NUTCH-1069 + job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); + + return job; + } + + /** + * @param args + */ + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new LinkDbMerger(), + args); + System.exit(res); + } + + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err + .println("Usage: LinkDbMerger <output_linkdb> <linkdb1> [<linkdb2> <linkdb3> ...] [-normalize] [-filter]"); + System.err.println("\toutput_linkdb\toutput LinkDb"); + System.err + .println("\tlinkdb1 ...\tinput LinkDb-s (single input LinkDb is ok)"); + System.err + .println("\t-normalize\tuse URLNormalizer on both fromUrls and toUrls in linkdb(s) (usually not needed)"); + System.err + .println("\t-filter\tuse URLFilters on both fromUrls and toUrls in linkdb(s)"); + return -1; + } + Path output = new Path(args[0]); + ArrayList<Path> dbs = new ArrayList<Path>(); + boolean normalize = false; + boolean filter = false; + for (int i = 1; i < args.length; i++) { + if (args[i].equals("-filter")) { + filter = true; + } else if (args[i].equals("-normalize")) { + normalize = true; + } else + dbs.add(new Path(args[i])); + } + try { + merge(output, dbs.toArray(new Path[dbs.size()]), normalize, filter); + return 0; + } catch (Exception e) { + LOG.error("LinkDbMerger: " + StringUtils.stringifyException(e)); + return -1; + } + } + +}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbReader.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbReader.java b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbReader.java new file mode 100644 index 0000000..2e50e9a --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbReader.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.IOException; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +// Commons Logging imports +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapred.lib.HashPartitioner; +import org.apache.hadoop.util.*; +import org.apache.hadoop.conf.Configuration; + +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.TimingUtil; + +import java.text.SimpleDateFormat; +import java.util.Iterator; +import java.io.Closeable; + +/** . */ +public class LinkDbReader extends Configured implements Tool, Closeable { + public static final Logger LOG = LoggerFactory.getLogger(LinkDbReader.class); + + private static final Partitioner<WritableComparable, Writable> PARTITIONER = new HashPartitioner<WritableComparable, Writable>(); + + private FileSystem fs; + private Path directory; + private MapFile.Reader[] readers; + + public LinkDbReader() { + + } + + public LinkDbReader(Configuration conf, Path directory) throws Exception { + setConf(conf); + init(directory); + } + + public void init(Path directory) throws Exception { + this.fs = FileSystem.get(getConf()); + this.directory = directory; + } + + public String[] getAnchors(Text url) throws IOException { + Inlinks inlinks = getInlinks(url); + if (inlinks == null) + return null; + return inlinks.getAnchors(); + } + + public Inlinks getInlinks(Text url) throws IOException { + + if (readers == null) { + synchronized (this) { + readers = MapFileOutputFormat.getReaders(fs, new Path(directory, + LinkDb.CURRENT_NAME), getConf()); + } + } + + return (Inlinks) MapFileOutputFormat.getEntry(readers, PARTITIONER, url, + new Inlinks()); + } + + public void close() throws IOException { + if (readers != null) { + for (int i = 0; i < readers.length; i++) { + readers[i].close(); + } + } + } + + public static class LinkDBDumpMapper implements Mapper<Text, Inlinks, Text, Inlinks> { + Pattern pattern = null; + Matcher matcher = null; + + public void configure(JobConf job) { + if (job.get("linkdb.regex", null) != null) { + pattern = Pattern.compile(job.get("linkdb.regex")); + } + } + + public void close() {} + public void map(Text key, Inlinks value, OutputCollector<Text, Inlinks> output, Reporter reporter) + throws IOException { + + if (pattern != null) { + matcher = pattern.matcher(key.toString()); + if (!matcher.matches()) { + return; + } + } + + output.collect(key, value); + } + } + + public void processDumpJob(String linkdb, String output, String regex) throws IOException { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + if (LOG.isInfoEnabled()) { + LOG.info("LinkDb dump: starting at " + sdf.format(start)); + LOG.info("LinkDb dump: db: " + linkdb); + } + Path outFolder = new Path(output); + + JobConf job = new NutchJob(getConf()); + job.setJobName("read " + linkdb); + + if (regex != null) { + job.set("linkdb.regex", regex); + job.setMapperClass(LinkDBDumpMapper.class); + } + + FileInputFormat.addInputPath(job, new Path(linkdb, LinkDb.CURRENT_NAME)); + job.setInputFormat(SequenceFileInputFormat.class); + + FileOutputFormat.setOutputPath(job, outFolder); + job.setOutputFormat(TextOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Inlinks.class); + + JobClient.runJob(job); + + long end = System.currentTimeMillis(); + LOG.info("LinkDb dump: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new LinkDbReader(), + args); + System.exit(res); + } + + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err + .println("Usage: LinkDbReader <linkdb> (-dump <out_dir> [-regex <regex>]) | -url <url>"); + System.err + .println("\t-dump <out_dir>\tdump whole link db to a text file in <out_dir>"); + System.err + .println("\t\t-regex <regex>\trestrict to url's matching expression"); + System.err + .println("\t-url <url>\tprint information about <url> to System.out"); + return -1; + } + try { + if (args[1].equals("-dump")) { + String regex = null; + for (int i = 2; i < args.length; i++) { + if (args[i].equals("-regex")) { + regex = args[++i]; + } + } + processDumpJob(args[0], args[2], regex); + return 0; + } else if (args[1].equals("-url")) { + init(new Path(args[0])); + Inlinks links = getInlinks(new Text(args[2])); + if (links == null) { + System.out.println(" - no link information."); + } else { + Iterator<Inlink> it = links.iterator(); + while (it.hasNext()) { + System.out.println(it.next().toString()); + } + } + return 0; + } else { + System.err.println("Error: wrong argument " + args[1]); + return -1; + } + } catch (Exception e) { + LOG.error("LinkDbReader: " + StringUtils.stringifyException(e)); + return -1; + } + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/MD5Signature.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/MD5Signature.java b/nutch-core/src/main/java/org/apache/nutch/crawl/MD5Signature.java new file mode 100644 index 0000000..f6ec8dd --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/MD5Signature.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import org.apache.hadoop.io.MD5Hash; +import org.apache.nutch.parse.Parse; +import org.apache.nutch.protocol.Content; + +/** + * Default implementation of a page signature. It calculates an MD5 hash of the + * raw binary content of a page. In case there is no content, it calculates a + * hash from the page's URL. + * + * @author Andrzej Bialecki <[email protected]> + */ +public class MD5Signature extends Signature { + + public byte[] calculate(Content content, Parse parse) { + byte[] data = content.getContent(); + if (data == null) + data = content.getUrl().getBytes(); + return MD5Hash.digest(data).getDigest(); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/MimeAdaptiveFetchSchedule.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/MimeAdaptiveFetchSchedule.java b/nutch-core/src/main/java/org/apache/nutch/crawl/MimeAdaptiveFetchSchedule.java new file mode 100644 index 0000000..4fe5cef --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/MimeAdaptiveFetchSchedule.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.Reader; +import java.util.HashMap; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.metadata.HttpHeaders; +import org.apache.nutch.util.MimeUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extension of @see AdaptiveFetchSchedule that allows for more flexible + * configuration of DEC and INC factors for various MIME-types. + * + * This class can be typically used in cases where a recrawl consists of many + * different MIME-types. It's not very common for MIME-types other than + * text/html to change frequently. Using this class you can configure different + * factors per MIME-type so to prefer frequently changing MIME-types over + * others. + * + * For it to work this class relies on the Content-Type MetaData key being + * present in the CrawlDB. This can either be done when injecting new URL's or + * by adding "Content-Type" to the db.parsemeta.to.crawldb configuration setting + * to force MIME-types of newly discovered URL's to be added to the CrawlDB. + * + * @author markus + */ +public class MimeAdaptiveFetchSchedule extends AdaptiveFetchSchedule { + // Loggg + public static final Logger LOG = LoggerFactory + .getLogger(MimeAdaptiveFetchSchedule.class); + + // Conf directives + public static final String SCHEDULE_INC_RATE = "db.fetch.schedule.adaptive.inc_rate"; + public static final String SCHEDULE_DEC_RATE = "db.fetch.schedule.adaptive.dec_rate"; + public static final String SCHEDULE_MIME_FILE = "db.fetch.schedule.mime.file"; + + // Default values for DEC and INC rate + private float defaultIncRate; + private float defaultDecRate; + + // Structure to store inc and dec rates per MIME-type + private class AdaptiveRate { + public float inc; + public float dec; + + public AdaptiveRate(Float inc, Float dec) { + this.inc = inc; + this.dec = dec; + } + } + + // Here we store the mime's and their delta's + private HashMap<String, AdaptiveRate> mimeMap; + + public void setConf(Configuration conf) { + super.setConf(conf); + if (conf == null) + return; + + // Read and set the default INC and DEC rates in case we cannot set values + // based on MIME-type + defaultIncRate = conf.getFloat(SCHEDULE_INC_RATE, 0.2f); + defaultDecRate = conf.getFloat(SCHEDULE_DEC_RATE, 0.2f); + + // Where's the mime/factor file? + Reader mimeFile = conf.getConfResourceAsReader(conf.get(SCHEDULE_MIME_FILE, + "adaptive-mimetypes.txt")); + + try { + readMimeFile(mimeFile); + } catch (IOException e) { + LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + } + + @Override + public CrawlDatum setFetchSchedule(Text url, CrawlDatum datum, + long prevFetchTime, long prevModifiedTime, long fetchTime, + long modifiedTime, int state) { + + // Set defaults + INC_RATE = defaultIncRate; + DEC_RATE = defaultDecRate; + + // Check if the Content-Type field is available in the CrawlDatum + if (datum.getMetaData().containsKey(HttpHeaders.WRITABLE_CONTENT_TYPE)) { + // Get the MIME-type of the current URL + String currentMime = MimeUtil.cleanMimeType(datum.getMetaData() + .get(HttpHeaders.WRITABLE_CONTENT_TYPE).toString()); + + // Check if this MIME-type exists in our map + if (mimeMap.containsKey(currentMime)) { + // Yes, set the INC and DEC rates for this MIME-type + INC_RATE = mimeMap.get(currentMime).inc; + DEC_RATE = mimeMap.get(currentMime).dec; + } + } + + return super.setFetchSchedule(url, datum, prevFetchTime, prevModifiedTime, + fetchTime, modifiedTime, state); + } + + /** + * Reads the mime types and their associated INC/DEC factors in a HashMap + * + * @param mimeFile + * Reader + * @return void + */ + private void readMimeFile(Reader mimeFile) throws IOException { + // Instance of our mime/factor map + mimeMap = new HashMap<String, AdaptiveRate>(); + + // Open a reader + BufferedReader reader = new BufferedReader(mimeFile); + + String line = null; + String[] splits = null; + + // Read all lines + while ((line = reader.readLine()) != null) { + // Skip blank lines and comments + if (StringUtils.isNotBlank(line) && !line.startsWith("#")) { + // Split the line by TAB + splits = line.split("\t"); + + // Sanity check, we need two or three items + if (splits.length == 3) { + // Add a lower cased MIME-type and the factor to the map + mimeMap.put(StringUtils.lowerCase(splits[0]), new AdaptiveRate( + new Float(splits[1]), new Float(splits[2]))); + } else { + LOG.warn("Invalid configuration line in: " + line); + } + } + } + } + + public static void main(String[] args) throws Exception { + FetchSchedule fs = new MimeAdaptiveFetchSchedule(); + fs.setConf(NutchConfiguration.create()); + // we start the time at 0, for simplicity + long curTime = 0; + long delta = 1000L * 3600L * 24L; // 2 hours + // we trigger the update of the page every 30 days + long update = 1000L * 3600L * 24L * 30L; // 30 days + boolean changed = true; + long lastModified = 0; + int miss = 0; + int totalMiss = 0; + int maxMiss = 0; + int fetchCnt = 0; + int changeCnt = 0; + + // initial fetchInterval is 10 days + CrawlDatum p = new CrawlDatum(1, 3600 * 24 * 30, 1.0f); + + // Set a default MIME-type to test with + org.apache.hadoop.io.MapWritable x = new org.apache.hadoop.io.MapWritable(); + x.put(HttpHeaders.WRITABLE_CONTENT_TYPE, new Text( + "text/html; charset=utf-8")); + p.setMetaData(x); + + p.setFetchTime(0); + LOG.info(p.toString()); + + // let's move the timeline a couple of deltas + for (int i = 0; i < 10000; i++) { + if (lastModified + update < curTime) { + // System.out.println("i=" + i + ", lastModified=" + lastModified + + // ", update=" + update + ", curTime=" + curTime); + changed = true; + changeCnt++; + lastModified = curTime; + } + + LOG.info(i + ". " + changed + "\twill fetch at " + + (p.getFetchTime() / delta) + "\tinterval " + + (p.getFetchInterval() / SECONDS_PER_DAY) + " days" + "\t missed " + + miss); + + if (p.getFetchTime() <= curTime) { + fetchCnt++; + fs.setFetchSchedule(new Text("http://www.example.com"), p, p + .getFetchTime(), p.getModifiedTime(), curTime, lastModified, + changed ? FetchSchedule.STATUS_MODIFIED + : FetchSchedule.STATUS_NOTMODIFIED); + + LOG.info("\tfetched & adjusted: " + "\twill fetch at " + + (p.getFetchTime() / delta) + "\tinterval " + + (p.getFetchInterval() / SECONDS_PER_DAY) + " days"); + + if (!changed) + miss++; + if (miss > maxMiss) + maxMiss = miss; + changed = false; + totalMiss += miss; + miss = 0; + } + + if (changed) + miss++; + curTime += delta; + } + LOG.info("Total missed: " + totalMiss + ", max miss: " + maxMiss); + LOG.info("Page changed " + changeCnt + " times, fetched " + fetchCnt + + " times."); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/NutchWritable.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/NutchWritable.java b/nutch-core/src/main/java/org/apache/nutch/crawl/NutchWritable.java new file mode 100644 index 0000000..589b8b9 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/NutchWritable.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.crawl; + +import org.apache.hadoop.io.Writable; +import org.apache.nutch.util.GenericWritableConfigurable; + +@SuppressWarnings("unchecked") +public class NutchWritable extends GenericWritableConfigurable { + + private static Class<? extends Writable>[] CLASSES = null; + + static { + CLASSES = (Class<? extends Writable>[]) new Class<?>[] { + org.apache.hadoop.io.NullWritable.class, + org.apache.hadoop.io.BooleanWritable.class, + org.apache.hadoop.io.LongWritable.class, + org.apache.hadoop.io.ByteWritable.class, + org.apache.hadoop.io.BytesWritable.class, + org.apache.hadoop.io.FloatWritable.class, + org.apache.hadoop.io.IntWritable.class, + org.apache.hadoop.io.MapWritable.class, + org.apache.hadoop.io.Text.class, org.apache.hadoop.io.MD5Hash.class, + org.apache.nutch.crawl.CrawlDatum.class, + org.apache.nutch.crawl.Inlink.class, + org.apache.nutch.crawl.Inlinks.class, + org.apache.nutch.indexer.NutchIndexAction.class, + org.apache.nutch.metadata.Metadata.class, + org.apache.nutch.parse.Outlink.class, + org.apache.nutch.parse.ParseText.class, + org.apache.nutch.parse.ParseData.class, + org.apache.nutch.parse.ParseImpl.class, + org.apache.nutch.parse.ParseStatus.class, + org.apache.nutch.protocol.Content.class, + org.apache.nutch.protocol.ProtocolStatus.class, + org.apache.nutch.scoring.webgraph.LinkDatum.class, + org.apache.nutch.hostdb.HostDatum.class }; + } + + public NutchWritable() { + } + + public NutchWritable(Writable instance) { + set(instance); + } + + @Override + protected Class<? extends Writable>[] getTypes() { + return CLASSES; + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/Signature.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/Signature.java b/nutch-core/src/main/java/org/apache/nutch/crawl/Signature.java new file mode 100644 index 0000000..21dfe07 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/Signature.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import org.apache.nutch.parse.Parse; +import org.apache.nutch.protocol.Content; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configurable; + +public abstract class Signature implements Configurable { + protected Configuration conf; + + public abstract byte[] calculate(Content content, Parse parse); + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureComparator.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureComparator.java b/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureComparator.java new file mode 100644 index 0000000..d217d93 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureComparator.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.util.Comparator; + +public class SignatureComparator implements Comparator<Object> { + public int compare(Object o1, Object o2) { + return _compare(o1, o2); + } + + public static int _compare(Object o1, Object o2) { + if (o1 == null && o2 == null) + return 0; + if (o1 == null) + return -1; + if (o2 == null) + return 1; + if (!(o1 instanceof byte[])) + return -1; + if (!(o2 instanceof byte[])) + return 1; + byte[] data1 = (byte[]) o1; + byte[] data2 = (byte[]) o2; + return _compare(data1, 0, data1.length, data2, 0, data2.length); + } + + public static int _compare(byte[] data1, int s1, int l1, byte[] data2, + int s2, int l2) { + if (l2 > l1) + return -1; + if (l2 < l1) + return 1; + int res = 0; + for (int i = 0; i < l1; i++) { + res = (data1[s1 + i] - data2[s2 + i]); + if (res != 0) + return res; + } + return 0; + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureFactory.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureFactory.java b/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureFactory.java new file mode 100644 index 0000000..16d8cc0 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureFactory.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +// Commons Logging imports +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// Hadoop imports +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.util.ObjectCache; + +/** + * Factory class, which instantiates a Signature implementation according to the + * current Configuration configuration. This newly created instance is cached in + * the Configuration instance, so that it could be later retrieved. + * + * @author Andrzej Bialecki <[email protected]> + */ +public class SignatureFactory { + private static final Logger LOG = LoggerFactory + .getLogger(SignatureFactory.class); + + private SignatureFactory() { + } // no public ctor + + /** Return the default Signature implementation. */ + public synchronized static Signature getSignature(Configuration conf) { + String clazz = conf.get("db.signature.class", MD5Signature.class.getName()); + ObjectCache objectCache = ObjectCache.get(conf); + Signature impl = (Signature) objectCache.getObject(clazz); + if (impl == null) { + try { + if (LOG.isInfoEnabled()) { + LOG.info("Using Signature impl: " + clazz); + } + Class<?> implClass = Class.forName(clazz); + impl = (Signature) implClass.newInstance(); + impl.setConf(conf); + objectCache.setObject(clazz, impl); + } catch (Exception e) { + throw new RuntimeException("Couldn't create " + clazz, e); + } + } + return impl; + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/TextMD5Signature.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/TextMD5Signature.java b/nutch-core/src/main/java/org/apache/nutch/crawl/TextMD5Signature.java new file mode 100644 index 0000000..b88cfa6 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/TextMD5Signature.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import org.apache.hadoop.io.MD5Hash; +import org.apache.nutch.parse.Parse; +import org.apache.nutch.protocol.Content; + +/** + * Implementation of a page signature. It calculates an MD5 hash of the textual + * content of a page. In case there is no content, it calculates a hash from the + * page's URL. + */ +public class TextMD5Signature extends Signature { + + Signature fallback = new MD5Signature(); + + public byte[] calculate(Content content, Parse parse) { + String text = parse.getText(); + + if (text == null || text.length() == 0) { + return fallback.calculate(content, parse); + } + + return MD5Hash.digest(text).getDigest(); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/TextProfileSignature.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/TextProfileSignature.java b/nutch-core/src/main/java/org/apache/nutch/crawl/TextProfileSignature.java new file mode 100644 index 0000000..5d930f9 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/TextProfileSignature.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; + +import org.apache.hadoop.io.MD5Hash; +import org.apache.nutch.parse.Parse; +import org.apache.nutch.parse.ParseImpl; +import org.apache.nutch.protocol.Content; +import org.apache.nutch.util.StringUtil; +import org.apache.nutch.util.NutchConfiguration; + +/** + * <p> + * An implementation of a page signature. It calculates an MD5 hash of a plain + * text "profile" of a page. In case there is no text, it calculates a hash + * using the {@link MD5Signature}. + * </p> + * <p> + * The algorithm to calculate a page "profile" takes the plain text version of a + * page and performs the following steps: + * <ul> + * <li>remove all characters except letters and digits, and bring all characters + * to lower case,</li> + * <li>split the text into tokens (all consecutive non-whitespace characters),</li> + * <li>discard tokens equal or shorter than MIN_TOKEN_LEN (default 2 + * characters),</li> + * <li>sort the list of tokens by decreasing frequency,</li> + * <li>round down the counts of tokens to the nearest multiple of QUANT ( + * <code>QUANT = QUANT_RATE * maxFreq</code>, where <code>QUANT_RATE</code> is + * 0.01f by default, and <code>maxFreq</code> is the maximum token frequency). + * If <code>maxFreq</code> is higher than 1, then QUANT is always higher than 2 + * (which means that tokens with frequency 1 are always discarded).</li> + * <li>tokens, which frequency after quantization falls below QUANT, are + * discarded.</li> + * <li>create a list of tokens and their quantized frequency, separated by + * spaces, in the order of decreasing frequency.</li> + * </ul> + * This list is then submitted to an MD5 hash calculation. + * + * @author Andrzej Bialecki <[email protected]> + */ +public class TextProfileSignature extends Signature { + + Signature fallback = new MD5Signature(); + + public byte[] calculate(Content content, Parse parse) { + int MIN_TOKEN_LEN = getConf().getInt( + "db.signature.text_profile.min_token_len", 2); + float QUANT_RATE = getConf().getFloat( + "db.signature.text_profile.quant_rate", 0.01f); + HashMap<String, Token> tokens = new HashMap<String, Token>(); + String text = null; + if (parse != null) + text = parse.getText(); + if (text == null || text.length() == 0) + return fallback.calculate(content, parse); + StringBuffer curToken = new StringBuffer(); + int maxFreq = 0; + for (int i = 0; i < text.length(); i++) { + char c = text.charAt(i); + if (Character.isLetterOrDigit(c)) { + curToken.append(Character.toLowerCase(c)); + } else { + if (curToken.length() > 0) { + if (curToken.length() > MIN_TOKEN_LEN) { + // add it + String s = curToken.toString(); + Token tok = tokens.get(s); + if (tok == null) { + tok = new Token(0, s); + tokens.put(s, tok); + } + tok.cnt++; + if (tok.cnt > maxFreq) + maxFreq = tok.cnt; + } + curToken.setLength(0); + } + } + } + // check the last token + if (curToken.length() > MIN_TOKEN_LEN) { + // add it + String s = curToken.toString(); + Token tok = tokens.get(s); + if (tok == null) { + tok = new Token(0, s); + tokens.put(s, tok); + } + tok.cnt++; + if (tok.cnt > maxFreq) + maxFreq = tok.cnt; + } + Iterator<Token> it = tokens.values().iterator(); + ArrayList<Token> profile = new ArrayList<Token>(); + // calculate the QUANT value + int QUANT = Math.round(maxFreq * QUANT_RATE); + if (QUANT < 2) { + if (maxFreq > 1) + QUANT = 2; + else + QUANT = 1; + } + while (it.hasNext()) { + Token t = it.next(); + // round down to the nearest QUANT + t.cnt = (t.cnt / QUANT) * QUANT; + // discard the frequencies below the QUANT + if (t.cnt < QUANT) { + continue; + } + profile.add(t); + } + Collections.sort(profile, new TokenComparator()); + StringBuffer newText = new StringBuffer(); + it = profile.iterator(); + while (it.hasNext()) { + Token t = it.next(); + if (newText.length() > 0) + newText.append("\n"); + newText.append(t.toString()); + } + return MD5Hash.digest(newText.toString()).getDigest(); + } + + private static class Token { + public int cnt; + public String val; + + public Token(int cnt, String val) { + this.cnt = cnt; + this.val = val; + } + + public String toString() { + return val + " " + cnt; + } + } + + private static class TokenComparator implements Comparator<Token> { + public int compare(Token t1, Token t2) { + return t2.cnt - t1.cnt; + } + } + + public static void main(String[] args) throws Exception { + TextProfileSignature sig = new TextProfileSignature(); + sig.setConf(NutchConfiguration.create()); + HashMap<String, byte[]> res = new HashMap<String, byte[]>(); + File[] files = new File(args[0]).listFiles(); + for (int i = 0; i < files.length; i++) { + FileInputStream fis = new FileInputStream(files[i]); + BufferedReader br = new BufferedReader( + new InputStreamReader(fis, "UTF-8")); + StringBuffer text = new StringBuffer(); + String line = null; + while ((line = br.readLine()) != null) { + if (text.length() > 0) + text.append("\n"); + text.append(line); + } + br.close(); + byte[] signature = sig.calculate(null, new ParseImpl(text.toString(), + null)); + res.put(files[i].toString(), signature); + } + Iterator<String> it = res.keySet().iterator(); + while (it.hasNext()) { + String name = it.next(); + byte[] signature = res.get(name); + System.out.println(name + "\t" + StringUtil.toHexString(signature)); + } + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/URLPartitioner.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/URLPartitioner.java b/nutch-core/src/main/java/org/apache/nutch/crawl/URLPartitioner.java new file mode 100644 index 0000000..4675f83 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/URLPartitioner.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.net.InetAddress; +import java.net.URL; +import java.net.MalformedURLException; +import java.net.UnknownHostException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.nutch.net.URLNormalizers; +import org.apache.nutch.util.URLUtil; + +/** + * Partition urls by host, domain name or IP depending on the value of the + * parameter 'partition.url.mode' which can be 'byHost', 'byDomain' or 'byIP' + */ +public class URLPartitioner implements Partitioner<Text, Writable> { + private static final Logger LOG = LoggerFactory + .getLogger(URLPartitioner.class); + + public static final String PARTITION_MODE_KEY = "partition.url.mode"; + + public static final String PARTITION_MODE_HOST = "byHost"; + public static final String PARTITION_MODE_DOMAIN = "byDomain"; + public static final String PARTITION_MODE_IP = "byIP"; + + private int seed; + private URLNormalizers normalizers; + private String mode = PARTITION_MODE_HOST; + + public void configure(JobConf job) { + seed = job.getInt("partition.url.seed", 0); + mode = job.get(PARTITION_MODE_KEY, PARTITION_MODE_HOST); + // check that the mode is known + if (!mode.equals(PARTITION_MODE_IP) && !mode.equals(PARTITION_MODE_DOMAIN) + && !mode.equals(PARTITION_MODE_HOST)) { + LOG.error("Unknown partition mode : " + mode + " - forcing to byHost"); + mode = PARTITION_MODE_HOST; + } + normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_PARTITION); + } + + public void close() { + } + + /** Hash by domain name. */ + public int getPartition(Text key, Writable value, int numReduceTasks) { + String urlString = key.toString(); + URL url = null; + int hashCode = urlString.hashCode(); + try { + urlString = normalizers.normalize(urlString, + URLNormalizers.SCOPE_PARTITION); + url = new URL(urlString); + hashCode = url.getHost().hashCode(); + } catch (MalformedURLException e) { + LOG.warn("Malformed URL: '" + urlString + "'"); + } + + if (mode.equals(PARTITION_MODE_DOMAIN) && url != null) + hashCode = URLUtil.getDomainName(url).hashCode(); + else if (mode.equals(PARTITION_MODE_IP)) { + try { + InetAddress address = InetAddress.getByName(url.getHost()); + hashCode = address.getHostAddress().hashCode(); + } catch (UnknownHostException e) { + Generator.LOG.info("Couldn't find IP for host: " + url.getHost()); + } + } + + // make hosts wind up in different partitions on different runs + hashCode ^= seed; + + return (hashCode & Integer.MAX_VALUE) % numReduceTasks; + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/package.html ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/package.html b/nutch-core/src/main/java/org/apache/nutch/crawl/package.html new file mode 100644 index 0000000..05eeb50 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/package.html @@ -0,0 +1,5 @@ +<html> +<body> +Crawl control code and tools to run the crawler. +</body> +</html> http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItem.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItem.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItem.java new file mode 100644 index 0000000..3ad4970 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItem.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.fetcher; + +import java.net.InetAddress; +import java.net.URL; +import java.net.UnknownHostException; + +import org.apache.hadoop.io.Text; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.util.URLUtil; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +/** + * This class describes the item to be fetched. + */ +public class FetchItem { + + private static final Logger LOG = LoggerFactory.getLogger(FetchItem.class); + + int outlinkDepth = 0; + String queueID; + Text url; + URL u; + CrawlDatum datum; + + public FetchItem(Text url, URL u, CrawlDatum datum, String queueID) { + this(url, u, datum, queueID, 0); + } + + public FetchItem(Text url, URL u, CrawlDatum datum, String queueID, + int outlinkDepth) { + this.url = url; + this.u = u; + this.datum = datum; + this.queueID = queueID; + this.outlinkDepth = outlinkDepth; + } + + /** + * Create an item. Queue id will be created based on <code>queueMode</code> + * argument, either as a protocol + hostname pair, protocol + IP address + * pair or protocol+domain pair. + */ + public static FetchItem create(Text url, CrawlDatum datum, String queueMode) { + return create(url, datum, queueMode, 0); + } + + public static FetchItem create(Text url, CrawlDatum datum, + String queueMode, int outlinkDepth) { + String queueID; + URL u = null; + try { + u = new URL(url.toString()); + } catch (Exception e) { + LOG.warn("Cannot parse url: " + url, e); + return null; + } + final String proto = u.getProtocol().toLowerCase(); + String key; + if (FetchItemQueues.QUEUE_MODE_IP.equalsIgnoreCase(queueMode)) { + try { + final InetAddress addr = InetAddress.getByName(u.getHost()); + key = addr.getHostAddress(); + } catch (final UnknownHostException e) { + // unable to resolve it, so don't fall back to host name + LOG.warn("Unable to resolve: " + u.getHost() + ", skipping."); + return null; + } + } else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)) { + key = URLUtil.getDomainName(u); + if (key == null) { + LOG.warn("Unknown domain for url: " + url + + ", using URL string as key"); + key = u.toExternalForm(); + } + } else { + key = u.getHost(); + if (key == null) { + LOG.warn("Unknown host for url: " + url + ", using URL string as key"); + key = u.toExternalForm(); + } + } + queueID = proto + "://" + key.toLowerCase(); + return new FetchItem(url, u, datum, queueID, outlinkDepth); + } + + public CrawlDatum getDatum() { + return datum; + } + + public String getQueueID() { + return queueID; + } + + public Text getUrl() { + return url; + } + + public URL getURL2() { + return u; + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueue.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueue.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueue.java new file mode 100644 index 0000000..182c063 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueue.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.fetcher; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class handles FetchItems which come from the same host ID (be it a + * proto/hostname or proto/IP pair). It also keeps track of requests in + * progress and elapsed time between requests. + */ +public class FetchItemQueue { + + private static final Logger LOG = LoggerFactory.getLogger(FetchItemQueues.class); + + List<FetchItem> queue = Collections + .synchronizedList(new LinkedList<FetchItem>()); + AtomicInteger inProgress = new AtomicInteger(); + AtomicLong nextFetchTime = new AtomicLong(); + AtomicInteger exceptionCounter = new AtomicInteger(); + long crawlDelay; + long minCrawlDelay; + int maxThreads; + Configuration conf; + + public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, + long minCrawlDelay) { + this.conf = conf; + this.maxThreads = maxThreads; + this.crawlDelay = crawlDelay; + this.minCrawlDelay = minCrawlDelay; + // ready to start + setEndTime(System.currentTimeMillis() - crawlDelay); + } + + public synchronized int emptyQueue() { + int presize = queue.size(); + queue.clear(); + return presize; + } + + public int getQueueSize() { + return queue.size(); + } + + public int getInProgressSize() { + return inProgress.get(); + } + + public int incrementExceptionCounter() { + return exceptionCounter.incrementAndGet(); + } + + public void finishFetchItem(FetchItem it, boolean asap) { + if (it != null) { + inProgress.decrementAndGet(); + setEndTime(System.currentTimeMillis(), asap); + } + } + + public void addFetchItem(FetchItem it) { + if (it == null) + return; + queue.add(it); + } + + public void addInProgressFetchItem(FetchItem it) { + if (it == null) + return; + inProgress.incrementAndGet(); + } + + public FetchItem getFetchItem() { + if (inProgress.get() >= maxThreads) + return null; + long now = System.currentTimeMillis(); + if (nextFetchTime.get() > now) + return null; + FetchItem it = null; + if (queue.size() == 0) + return null; + try { + it = queue.remove(0); + inProgress.incrementAndGet(); + } catch (Exception e) { + LOG.error( + "Cannot remove FetchItem from queue or cannot add it to inProgress queue", + e); + } + return it; + } + + public synchronized void dump() { + LOG.info(" maxThreads = " + maxThreads); + LOG.info(" inProgress = " + inProgress.get()); + LOG.info(" crawlDelay = " + crawlDelay); + LOG.info(" minCrawlDelay = " + minCrawlDelay); + LOG.info(" nextFetchTime = " + nextFetchTime.get()); + LOG.info(" now = " + System.currentTimeMillis()); + for (int i = 0; i < queue.size(); i++) { + FetchItem it = queue.get(i); + LOG.info(" " + i + ". " + it.url); + } + } + + private void setEndTime(long endTime) { + setEndTime(endTime, false); + } + + private void setEndTime(long endTime, boolean asap) { + if (!asap) + nextFetchTime.set(endTime + + (maxThreads > 1 ? minCrawlDelay : crawlDelay)); + else + nextFetchTime.set(endTime); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueues.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueues.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueues.java new file mode 100644 index 0000000..4473ff0 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueues.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.fetcher; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.nutch.crawl.CrawlDatum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Convenience class - a collection of queues that keeps track of the total + * number of items, and provides items eligible for fetching from any queue. + */ +public class FetchItemQueues { + + private static final Logger LOG = LoggerFactory.getLogger(FetchItemQueues.class); + + public static final String DEFAULT_ID = "default"; + Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>(); + AtomicInteger totalSize = new AtomicInteger(0); + int maxThreads; + long crawlDelay; + long minCrawlDelay; + long timelimit = -1; + int maxExceptionsPerQueue = -1; + Configuration conf; + + public static final String QUEUE_MODE_HOST = "byHost"; + public static final String QUEUE_MODE_DOMAIN = "byDomain"; + public static final String QUEUE_MODE_IP = "byIP"; + + String queueMode; + + public FetchItemQueues(Configuration conf) { + this.conf = conf; + this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1); + queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST); + // check that the mode is known + if (!queueMode.equals(QUEUE_MODE_IP) + && !queueMode.equals(QUEUE_MODE_DOMAIN) + && !queueMode.equals(QUEUE_MODE_HOST)) { + LOG.error("Unknown partition mode : " + queueMode + + " - forcing to byHost"); + queueMode = QUEUE_MODE_HOST; + } + LOG.info("Using queue mode : " + queueMode); + + this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000); + this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", + 0.0f) * 1000); + this.timelimit = conf.getLong("fetcher.timelimit", -1); + this.maxExceptionsPerQueue = conf.getInt( + "fetcher.max.exceptions.per.queue", -1); + } + + public int getTotalSize() { + return totalSize.get(); + } + + public int getQueueCount() { + return queues.size(); + } + + public void addFetchItem(Text url, CrawlDatum datum) { + FetchItem it = FetchItem.create(url, datum, queueMode); + if (it != null) + addFetchItem(it); + } + + public synchronized void addFetchItem(FetchItem it) { + FetchItemQueue fiq = getFetchItemQueue(it.queueID); + fiq.addFetchItem(it); + totalSize.incrementAndGet(); + } + + public void finishFetchItem(FetchItem it) { + finishFetchItem(it, false); + } + + public void finishFetchItem(FetchItem it, boolean asap) { + FetchItemQueue fiq = queues.get(it.queueID); + if (fiq == null) { + LOG.warn("Attempting to finish item from unknown queue: " + it); + return; + } + fiq.finishFetchItem(it, asap); + } + + public synchronized FetchItemQueue getFetchItemQueue(String id) { + FetchItemQueue fiq = queues.get(id); + if (fiq == null) { + // initialize queue + fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay); + queues.put(id, fiq); + } + return fiq; + } + + public synchronized FetchItem getFetchItem() { + Iterator<Map.Entry<String, FetchItemQueue>> it = queues.entrySet() + .iterator(); + while (it.hasNext()) { + FetchItemQueue fiq = it.next().getValue(); + // reap empty queues + if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) { + it.remove(); + continue; + } + FetchItem fit = fiq.getFetchItem(); + if (fit != null) { + totalSize.decrementAndGet(); + return fit; + } + } + return null; + } + + // called only once the feeder has stopped + public synchronized int checkTimelimit() { + int count = 0; + + if (System.currentTimeMillis() >= timelimit && timelimit != -1) { + // emptying the queues + count = emptyQueues(); + + // there might also be a case where totalsize !=0 but number of queues + // == 0 + // in which case we simply force it to 0 to avoid blocking + if (totalSize.get() != 0 && queues.size() == 0) + totalSize.set(0); + } + return count; + } + + // empties the queues (used by timebomb and throughput threshold) + public synchronized int emptyQueues() { + int count = 0; + + for (String id : queues.keySet()) { + FetchItemQueue fiq = queues.get(id); + if (fiq.getQueueSize() == 0) + continue; + LOG.info("* queue: " + id + " >> dropping! "); + int deleted = fiq.emptyQueue(); + for (int i = 0; i < deleted; i++) { + totalSize.decrementAndGet(); + } + count += deleted; + } + + return count; + } + + /** + * Increment the exception counter of a queue in case of an exception e.g. + * timeout; when higher than a given threshold simply empty the queue. + * + * @param queueid + * @return number of purged items + */ + public synchronized int checkExceptionThreshold(String queueid) { + FetchItemQueue fiq = queues.get(queueid); + if (fiq == null) { + return 0; + } + if (fiq.getQueueSize() == 0) { + return 0; + } + int excCount = fiq.incrementExceptionCounter(); + if (maxExceptionsPerQueue != -1 && excCount >= maxExceptionsPerQueue) { + // too many exceptions for items in this queue - purge it + int deleted = fiq.emptyQueue(); + LOG.info("* queue: " + queueid + " >> removed " + deleted + + " URLs from queue because " + excCount + " exceptions occurred"); + for (int i = 0; i < deleted; i++) { + totalSize.decrementAndGet(); + } + return deleted; + } + return 0; + } + + public synchronized void dump() { + for (String id : queues.keySet()) { + FetchItemQueue fiq = queues.get(id); + if (fiq.getQueueSize() == 0) + continue; + LOG.info("* queue: " + id); + fiq.dump(); + } + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNode.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNode.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNode.java new file mode 100644 index 0000000..892c90f --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNode.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.fetcher; + +import org.apache.hadoop.io.Text; +import org.apache.nutch.parse.Outlink; + +public class FetchNode { + private Text url = null; + private Outlink[] outlinks; + private int status = 0; + private String title = null; + private long fetchTime = 0; + + public Text getUrl() { + return url; + } + public void setUrl(Text url) { + this.url = url; + } + public Outlink[] getOutlinks() { + return outlinks; + } + public void setOutlinks(Outlink[] links) { + this.outlinks = links; + } + public int getStatus() { + return status; + } + public void setStatus(int status) { + this.status = status; + } + public String getTitle() { + return title; + } + public void setTitle(String title) { + this.title = title; + } + public long getFetchTime() { + return fetchTime; + } + public void setFetchTime(long fetchTime) { + this.fetchTime = fetchTime; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNodeDb.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNodeDb.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNodeDb.java new file mode 100644 index 0000000..2e69f31 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNodeDb.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.fetcher; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + + +public class FetchNodeDb { + + private Map<Integer, FetchNode> fetchNodeDbMap; + private int index; + private static FetchNodeDb fetchNodeDbInstance = null; + + public FetchNodeDb(){ + fetchNodeDbMap = new ConcurrentHashMap<Integer, FetchNode>(); + index = 1; + } + + public static FetchNodeDb getInstance(){ + + if(fetchNodeDbInstance == null){ + fetchNodeDbInstance = new FetchNodeDb(); + } + return fetchNodeDbInstance; + } + + public void put(String url, FetchNode fetchNode){ + System.out.println("FetchNodeDb : putting node - " + fetchNode.hashCode()); + fetchNodeDbMap.put(index++, fetchNode); + } + public Map<Integer, FetchNode> getFetchNodeDb(){ + return fetchNodeDbMap; + } +} \ No newline at end of file
