http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDb.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDb.java b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDb.java new file mode 100644 index 0000000..1537cdc --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDb.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.*; +import java.text.SimpleDateFormat; +import java.util.*; + +// Commons Logging imports +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.io.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.*; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.util.FSUtils; +import org.apache.nutch.util.HadoopFSUtil; +import org.apache.nutch.util.LockUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.NutchTool; +import org.apache.nutch.util.TimingUtil; + +/** + * This class takes the output of the fetcher and updates the crawldb + * accordingly. + */ +public class CrawlDb extends NutchTool implements Tool { + public static final Logger LOG = LoggerFactory.getLogger(CrawlDb.class); + + public static final String CRAWLDB_ADDITIONS_ALLOWED = "db.update.additions.allowed"; + + public static final String CRAWLDB_PURGE_404 = "db.update.purge.404"; + + public static final String CURRENT_NAME = "current"; + + public static final String LOCK_NAME = ".locked"; + + public CrawlDb() { + } + + public CrawlDb(Configuration conf) { + setConf(conf); + } + + public void update(Path crawlDb, Path[] segments, boolean normalize, + boolean filter) throws IOException { + boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED, + true); + update(crawlDb, segments, normalize, filter, additionsAllowed, false); + } + + public void update(Path crawlDb, Path[] segments, boolean normalize, + boolean filter, boolean additionsAllowed, boolean force) + throws IOException { + FileSystem fs = FileSystem.get(getConf()); + Path lock = new Path(crawlDb, LOCK_NAME); + LockUtil.createLockFile(fs, lock, force); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + + JobConf job = CrawlDb.createJob(getConf(), crawlDb); + job.setBoolean(CRAWLDB_ADDITIONS_ALLOWED, additionsAllowed); + job.setBoolean(CrawlDbFilter.URL_FILTERING, filter); + job.setBoolean(CrawlDbFilter.URL_NORMALIZING, normalize); + + boolean url404Purging = job.getBoolean(CRAWLDB_PURGE_404, false); + + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb update: starting at " + sdf.format(start)); + LOG.info("CrawlDb update: db: " + crawlDb); + LOG.info("CrawlDb update: segments: " + Arrays.asList(segments)); + LOG.info("CrawlDb update: additions allowed: " + additionsAllowed); + LOG.info("CrawlDb update: URL normalizing: " + normalize); + LOG.info("CrawlDb update: URL filtering: " + filter); + LOG.info("CrawlDb update: 404 purging: " + url404Purging); + } + + for (int i = 0; i < segments.length; i++) { + Path fetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME); + Path parse = new Path(segments[i], CrawlDatum.PARSE_DIR_NAME); + if (fs.exists(fetch) && fs.exists(parse)) { + FileInputFormat.addInputPath(job, fetch); + FileInputFormat.addInputPath(job, parse); + } else { + LOG.info(" - skipping invalid segment " + segments[i]); + } + } + + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb update: Merging segment data into db."); + } + try { + JobClient.runJob(job); + } catch (IOException e) { + LockUtil.removeLockFile(fs, lock); + Path outPath = FileOutputFormat.getOutputPath(job); + if (fs.exists(outPath)) + fs.delete(outPath, true); + throw e; + } + + CrawlDb.install(job, crawlDb); + long end = System.currentTimeMillis(); + LOG.info("CrawlDb update: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + } + + /* + * Configure a new CrawlDb in a temp folder at crawlDb/<rand> + */ + public static JobConf createJob(Configuration config, Path crawlDb) + throws IOException { + Path newCrawlDb = new Path(crawlDb, Integer.toString(new Random() + .nextInt(Integer.MAX_VALUE))); + + JobConf job = new NutchJob(config); + job.setJobName("crawldb " + crawlDb); + + Path current = new Path(crawlDb, CURRENT_NAME); + if (FileSystem.get(job).exists(current)) { + FileInputFormat.addInputPath(job, current); + } + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(CrawlDbFilter.class); + job.setReducerClass(CrawlDbReducer.class); + + FileOutputFormat.setOutputPath(job, newCrawlDb); + job.setOutputFormat(MapFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(CrawlDatum.class); + + // https://issues.apache.org/jira/browse/NUTCH-1110 + job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); + + return job; + } + + public static void install(JobConf job, Path crawlDb) throws IOException { + boolean preserveBackup = job.getBoolean("db.preserve.backup", true); + + Path newCrawlDb = FileOutputFormat.getOutputPath(job); + FileSystem fs = new JobClient(job).getFs(); + Path old = new Path(crawlDb, "old"); + Path current = new Path(crawlDb, CURRENT_NAME); + if (fs.exists(current)) { + if (fs.exists(old)) + fs.delete(old, true); + fs.rename(current, old); + } + fs.mkdirs(crawlDb); + fs.rename(newCrawlDb, current); + if (!preserveBackup && fs.exists(old)) + fs.delete(old, true); + Path lock = new Path(crawlDb, LOCK_NAME); + LockUtil.removeLockFile(fs, lock); + } + + public static void install(Job job, Path crawlDb) throws IOException { + Configuration conf = job.getConfiguration(); + boolean preserveBackup = conf.getBoolean("db.preserve.backup", true); + FileSystem fs = FileSystem.get(conf); + Path old = new Path(crawlDb, "old"); + Path current = new Path(crawlDb, CURRENT_NAME); + Path tempCrawlDb = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat + .getOutputPath(job); + FSUtils.replace(fs, old, current, true); + FSUtils.replace(fs, current, tempCrawlDb, true); + Path lock = new Path(crawlDb, LOCK_NAME); + LockUtil.removeLockFile(fs, lock); + if (!preserveBackup && fs.exists(old)) { + fs.delete(old, true); + } + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new CrawlDb(), args); + System.exit(res); + } + + public int run(String[] args) throws Exception { + if (args.length < 1) { + System.err + .println("Usage: CrawlDb <crawldb> (-dir <segments> | <seg1> <seg2> ...) [-force] [-normalize] [-filter] [-noAdditions]"); + System.err.println("\tcrawldb\tCrawlDb to update"); + System.err + .println("\t-dir segments\tparent directory containing all segments to update from"); + System.err + .println("\tseg1 seg2 ...\tlist of segment names to update from"); + System.err + .println("\t-force\tforce update even if CrawlDb appears to be locked (CAUTION advised)"); + System.err + .println("\t-normalize\tuse URLNormalizer on urls in CrawlDb and segment (usually not needed)"); + System.err + .println("\t-filter\tuse URLFilters on urls in CrawlDb and segment"); + System.err + .println("\t-noAdditions\tonly update already existing URLs, don't add any newly discovered URLs"); + + return -1; + } + boolean normalize = getConf().getBoolean(CrawlDbFilter.URL_NORMALIZING, + false); + boolean filter = getConf().getBoolean(CrawlDbFilter.URL_FILTERING, false); + boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED, + true); + boolean force = false; + final FileSystem fs = FileSystem.get(getConf()); + HashSet<Path> dirs = new HashSet<Path>(); + for (int i = 1; i < args.length; i++) { + if (args[i].equals("-normalize")) { + normalize = true; + } else if (args[i].equals("-filter")) { + filter = true; + } else if (args[i].equals("-force")) { + force = true; + } else if (args[i].equals("-noAdditions")) { + additionsAllowed = false; + } else if (args[i].equals("-dir")) { + FileStatus[] paths = fs.listStatus(new Path(args[++i]), + HadoopFSUtil.getPassDirectoriesFilter(fs)); + dirs.addAll(Arrays.asList(HadoopFSUtil.getPaths(paths))); + } else { + dirs.add(new Path(args[i])); + } + } + try { + update(new Path(args[0]), dirs.toArray(new Path[dirs.size()]), normalize, + filter, additionsAllowed, force); + return 0; + } catch (Exception e) { + LOG.error("CrawlDb update: " + StringUtils.stringifyException(e)); + return -1; + } + } + + /* + * Used for Nutch REST service + */ + @Override + public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception { + + Map<String, Object> results = new HashMap<String, Object>(); + + boolean normalize = getConf().getBoolean(CrawlDbFilter.URL_NORMALIZING, + false); + boolean filter = getConf().getBoolean(CrawlDbFilter.URL_FILTERING, false); + boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED, + true); + boolean force = false; + HashSet<Path> dirs = new HashSet<Path>(); + + if (args.containsKey("normalize")) { + normalize = true; + } + if (args.containsKey("filter")) { + filter = true; + } + if (args.containsKey("force")) { + force = true; + } + if (args.containsKey("noAdditions")) { + additionsAllowed = false; + } + + Path crawlDb; + if(args.containsKey(Nutch.ARG_CRAWLDB)) { + Object crawldbPath = args.get(Nutch.ARG_CRAWLDB); + if(crawldbPath instanceof Path) { + crawlDb = (Path) crawldbPath; + } + else { + crawlDb = new Path(crawldbPath.toString()); + } + } + else { + crawlDb = new Path(crawlId+"/crawldb"); + } + + Path segmentsDir; + final FileSystem fs = FileSystem.get(getConf()); + if(args.containsKey(Nutch.ARG_SEGMENTDIR)) { + Object segDir = args.get(Nutch.ARG_SEGMENTDIR); + if(segDir instanceof Path) { + segmentsDir = (Path) segDir; + } + else { + segmentsDir = new Path(segDir.toString()); + } + FileStatus[] paths = fs.listStatus(segmentsDir, + HadoopFSUtil.getPassDirectoriesFilter(fs)); + dirs.addAll(Arrays.asList(HadoopFSUtil.getPaths(paths))); + } + + else if(args.containsKey(Nutch.ARG_SEGMENT)) { + Object segments = args.get(Nutch.ARG_SEGMENT); + ArrayList<String> segmentList = new ArrayList<String>(); + if(segments instanceof ArrayList) { + segmentList = (ArrayList<String>)segments; + } + for(String segment: segmentList) { + dirs.add(new Path(segment)); + } + } + else { + String segment_dir = crawlId+"/segments"; + File dir = new File(segment_dir); + File[] segmentsList = dir.listFiles(); + Arrays.sort(segmentsList, new Comparator<File>(){ + @Override + public int compare(File f1, File f2) { + if(f1.lastModified()>f2.lastModified()) + return -1; + else + return 0; + } + }); + dirs.add(new Path(segmentsList[0].getPath())); + } + try { + update(crawlDb, dirs.toArray(new Path[dirs.size()]), normalize, + filter, additionsAllowed, force); + results.put(Nutch.VAL_RESULT, Integer.toString(0)); + return results; + } catch (Exception e) { + LOG.error("CrawlDb update: " + StringUtils.stringifyException(e)); + results.put(Nutch.VAL_RESULT, Integer.toString(-1)); + return results; + } + } +}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbFilter.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbFilter.java b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbFilter.java new file mode 100644 index 0000000..de4c37b --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbFilter.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.net.URLNormalizers; + +/** + * This class provides a way to separate the URL normalization and filtering + * steps from the rest of CrawlDb manipulation code. + * + * @author Andrzej Bialecki + */ +public class CrawlDbFilter implements + Mapper<Text, CrawlDatum, Text, CrawlDatum> { + public static final String URL_FILTERING = "crawldb.url.filters"; + + public static final String URL_NORMALIZING = "crawldb.url.normalizers"; + + public static final String URL_NORMALIZING_SCOPE = "crawldb.url.normalizers.scope"; + + private boolean urlFiltering; + + private boolean urlNormalizers; + + private boolean url404Purging; + + private URLFilters filters; + + private URLNormalizers normalizers; + + private String scope; + + public static final Logger LOG = LoggerFactory.getLogger(CrawlDbFilter.class); + + public void configure(JobConf job) { + urlFiltering = job.getBoolean(URL_FILTERING, false); + urlNormalizers = job.getBoolean(URL_NORMALIZING, false); + url404Purging = job.getBoolean(CrawlDb.CRAWLDB_PURGE_404, false); + + if (urlFiltering) { + filters = new URLFilters(job); + } + if (urlNormalizers) { + scope = job.get(URL_NORMALIZING_SCOPE, URLNormalizers.SCOPE_CRAWLDB); + normalizers = new URLNormalizers(job, scope); + } + } + + public void close() { + } + + private Text newKey = new Text(); + + public void map(Text key, CrawlDatum value, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + + String url = key.toString(); + + // https://issues.apache.org/jira/browse/NUTCH-1101 check status first, + // cheaper than normalizing or filtering + if (url404Purging && CrawlDatum.STATUS_DB_GONE == value.getStatus()) { + url = null; + } + if (url != null && urlNormalizers) { + try { + url = normalizers.normalize(url, scope); // normalize the url + } catch (Exception e) { + LOG.warn("Skipping " + url + ":" + e); + url = null; + } + } + if (url != null && urlFiltering) { + try { + url = filters.filter(url); // filter the url + } catch (Exception e) { + LOG.warn("Skipping " + url + ":" + e); + url = null; + } + } + if (url != null) { // if it passes + newKey.set(url); // collect it + output.collect(newKey, value); + } + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbMerger.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbMerger.java b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbMerger.java new file mode 100644 index 0000000..cd775d8 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbMerger.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.Map.Entry; + +// Commons Logging imports +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.*; +import org.apache.hadoop.conf.*; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.TimingUtil; + +/** + * This tool merges several CrawlDb-s into one, optionally filtering URLs + * through the current URLFilters, to skip prohibited pages. + * + * <p> + * It's possible to use this tool just for filtering - in that case only one + * CrawlDb should be specified in arguments. + * </p> + * <p> + * If more than one CrawlDb contains information about the same URL, only the + * most recent version is retained, as determined by the value of + * {@link org.apache.nutch.crawl.CrawlDatum#getFetchTime()}. However, all + * metadata information from all versions is accumulated, with newer values + * taking precedence over older values. + * + * @author Andrzej Bialecki + */ +public class CrawlDbMerger extends Configured implements Tool { + private static final Logger LOG = LoggerFactory + .getLogger(CrawlDbMerger.class); + + public static class Merger extends MapReduceBase implements + Reducer<Text, CrawlDatum, Text, CrawlDatum> { + private org.apache.hadoop.io.MapWritable meta; + private CrawlDatum res = new CrawlDatum(); + private FetchSchedule schedule; + + public void close() throws IOException { + } + + public void configure(JobConf conf) { + schedule = FetchScheduleFactory.getFetchSchedule(conf); + } + + public void reduce(Text key, Iterator<CrawlDatum> values, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + long resTime = 0L; + boolean resSet = false; + meta = new org.apache.hadoop.io.MapWritable(); + while (values.hasNext()) { + CrawlDatum val = values.next(); + if (!resSet) { + res.set(val); + resSet = true; + resTime = schedule.calculateLastFetchTime(res); + for (Entry<Writable, Writable> e : res.getMetaData().entrySet()) { + meta.put(e.getKey(), e.getValue()); + } + continue; + } + // compute last fetch time, and pick the latest + long valTime = schedule.calculateLastFetchTime(val); + if (valTime > resTime) { + // collect all metadata, newer values override older values + for (Entry<Writable, Writable> e : val.getMetaData().entrySet()) { + meta.put(e.getKey(), e.getValue()); + } + res.set(val); + resTime = valTime; + } else { + // insert older metadata before newer + for (Entry<Writable, Writable> e : meta.entrySet()) { + val.getMetaData().put(e.getKey(), e.getValue()); + } + meta = val.getMetaData(); + } + } + res.setMetaData(meta); + output.collect(key, res); + } + } + + public CrawlDbMerger() { + + } + + public CrawlDbMerger(Configuration conf) { + setConf(conf); + } + + public void merge(Path output, Path[] dbs, boolean normalize, boolean filter) + throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("CrawlDb merge: starting at " + sdf.format(start)); + + JobConf job = createMergeJob(getConf(), output, normalize, filter); + for (int i = 0; i < dbs.length; i++) { + if (LOG.isInfoEnabled()) { + LOG.info("Adding " + dbs[i]); + } + FileInputFormat.addInputPath(job, new Path(dbs[i], CrawlDb.CURRENT_NAME)); + } + JobClient.runJob(job); + FileSystem fs = FileSystem.get(getConf()); + if (fs.exists(output)) + fs.delete(output, true); + fs.mkdirs(output); + fs.rename(FileOutputFormat.getOutputPath(job), new Path(output, + CrawlDb.CURRENT_NAME)); + long end = System.currentTimeMillis(); + LOG.info("CrawlDb merge: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + } + + public static JobConf createMergeJob(Configuration conf, Path output, + boolean normalize, boolean filter) { + Path newCrawlDb = new Path("crawldb-merge-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + JobConf job = new NutchJob(conf); + job.setJobName("crawldb merge " + output); + + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(CrawlDbFilter.class); + job.setBoolean(CrawlDbFilter.URL_FILTERING, filter); + job.setBoolean(CrawlDbFilter.URL_NORMALIZING, normalize); + job.setReducerClass(Merger.class); + + FileOutputFormat.setOutputPath(job, newCrawlDb); + job.setOutputFormat(MapFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(CrawlDatum.class); + + return job; + } + + /** + * @param args + */ + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new CrawlDbMerger(), + args); + System.exit(res); + } + + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err + .println("Usage: CrawlDbMerger <output_crawldb> <crawldb1> [<crawldb2> <crawldb3> ...] [-normalize] [-filter]"); + System.err.println("\toutput_crawldb\toutput CrawlDb"); + System.err + .println("\tcrawldb1 ...\tinput CrawlDb-s (single input CrawlDb is ok)"); + System.err + .println("\t-normalize\tuse URLNormalizer on urls in the crawldb(s) (usually not needed)"); + System.err.println("\t-filter\tuse URLFilters on urls in the crawldb(s)"); + return -1; + } + Path output = new Path(args[0]); + ArrayList<Path> dbs = new ArrayList<Path>(); + boolean filter = false; + boolean normalize = false; + FileSystem fs = FileSystem.get(getConf()); + for (int i = 1; i < args.length; i++) { + if (args[i].equals("-filter")) { + filter = true; + continue; + } else if (args[i].equals("-normalize")) { + normalize = true; + continue; + } + final Path dbPath = new Path(args[i]); + if (fs.exists(dbPath)) + dbs.add(dbPath); + } + try { + merge(output, dbs.toArray(new Path[dbs.size()]), normalize, filter); + return 0; + } catch (Exception e) { + LOG.error("CrawlDb merge: " + StringUtils.stringifyException(e)); + return -1; + } + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReader.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReader.java b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReader.java new file mode 100644 index 0000000..5db5f95 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReader.java @@ -0,0 +1,887 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.Closeable; +import java.net.URL; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.TreeMap; + + +// Commons Logging imports +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapFileOutputFormat; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; +import org.apache.hadoop.mapred.lib.HashPartitioner; +import org.apache.hadoop.mapred.lib.IdentityMapper; +import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.util.JexlUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.StringUtil; +import org.apache.nutch.util.TimingUtil; +import org.apache.commons.jexl2.Expression; +import org.apache.commons.jexl2.JexlEngine; +import org.apache.commons.lang.time.DateUtils; + +/** + * Read utility for the CrawlDB. + * + * @author Andrzej Bialecki + * + */ +public class CrawlDbReader extends Configured implements Closeable, Tool { + + public static final Logger LOG = LoggerFactory.getLogger(CrawlDbReader.class); + + private MapFile.Reader[] readers = null; + + private void openReaders(String crawlDb, JobConf config) + throws IOException { + if (readers != null) + return; + FileSystem fs = FileSystem.get(config); + readers = MapFileOutputFormat.getReaders(fs, new Path(crawlDb, + CrawlDb.CURRENT_NAME), config); + } + + private void closeReaders() { + if (readers == null) + return; + for (int i = 0; i < readers.length; i++) { + try { + readers[i].close(); + } catch (Exception e) { + + } + } + } + + public static class CrawlDatumCsvOutputFormat extends + FileOutputFormat<Text, CrawlDatum> { + protected static class LineRecordWriter implements + RecordWriter<Text, CrawlDatum> { + private DataOutputStream out; + + public LineRecordWriter(DataOutputStream out) { + this.out = out; + try { + out.writeBytes("Url,Status code,Status name,Fetch Time,Modified Time,Retries since fetch,Retry interval seconds,Retry interval days,Score,Signature,Metadata\n"); + } catch (IOException e) { + } + } + + public synchronized void write(Text key, CrawlDatum value) + throws IOException { + out.writeByte('"'); + out.writeBytes(key.toString()); + out.writeByte('"'); + out.writeByte(','); + out.writeBytes(Integer.toString(value.getStatus())); + out.writeByte(','); + out.writeByte('"'); + out.writeBytes(CrawlDatum.getStatusName(value.getStatus())); + out.writeByte('"'); + out.writeByte(','); + out.writeBytes(new Date(value.getFetchTime()).toString()); + out.writeByte(','); + out.writeBytes(new Date(value.getModifiedTime()).toString()); + out.writeByte(','); + out.writeBytes(Integer.toString(value.getRetriesSinceFetch())); + out.writeByte(','); + out.writeBytes(Float.toString(value.getFetchInterval())); + out.writeByte(','); + out.writeBytes(Float.toString((value.getFetchInterval() / FetchSchedule.SECONDS_PER_DAY))); + out.writeByte(','); + out.writeBytes(Float.toString(value.getScore())); + out.writeByte(','); + out.writeByte('"'); + out.writeBytes(value.getSignature() != null ? StringUtil + .toHexString(value.getSignature()) : "null"); + out.writeByte('"'); + out.writeByte(','); + out.writeByte('"'); + if (value.getMetaData() != null) { + for (Entry<Writable, Writable> e : value.getMetaData().entrySet()) { + out.writeBytes(e.getKey().toString()); + out.writeByte(':'); + out.writeBytes(e.getValue().toString()); + out.writeBytes("|||"); + } + } + out.writeByte('"'); + + out.writeByte('\n'); + } + + public synchronized void close(Reporter reporter) throws IOException { + out.close(); + } + } + + public RecordWriter<Text, CrawlDatum> getRecordWriter(FileSystem fs, + JobConf job, String name, Progressable progress) throws IOException { + Path dir = FileOutputFormat.getOutputPath(job); + DataOutputStream fileOut = fs.create(new Path(dir, name), progress); + return new LineRecordWriter(fileOut); + } + } + + public static class CrawlDbStatMapper implements + Mapper<Text, CrawlDatum, Text, LongWritable> { + LongWritable COUNT_1 = new LongWritable(1); + private boolean sort = false; + + public void configure(JobConf job) { + sort = job.getBoolean("db.reader.stats.sort", false); + } + + public void close() { + } + + public void map(Text key, CrawlDatum value, + OutputCollector<Text, LongWritable> output, Reporter reporter) + throws IOException { + output.collect(new Text("T"), COUNT_1); + output.collect(new Text("status " + value.getStatus()), COUNT_1); + output + .collect(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1); + output.collect(new Text("sc"), new LongWritable( + (long) (value.getScore() * 1000.0))); + // fetch time (in minutes to prevent from overflows when summing up) + output.collect(new Text("ft"), + new LongWritable(value.getFetchTime() / (1000 * 60))); + // fetch interval (in seconds) + output.collect(new Text("fi"), + new LongWritable(value.getFetchInterval())); + if (sort) { + URL u = new URL(key.toString()); + String host = u.getHost(); + output.collect(new Text("status " + value.getStatus() + " " + host), + COUNT_1); + } + } + } + + public static class CrawlDbStatCombiner implements + Reducer<Text, LongWritable, Text, LongWritable> { + LongWritable val = new LongWritable(); + + public CrawlDbStatCombiner() { + } + + public void configure(JobConf job) { + } + + public void close() { + } + + private void reduceMinMaxTotal(String keyPrefix, Iterator<LongWritable> values, + OutputCollector<Text, LongWritable> output, Reporter reporter) + throws IOException { + long total = 0; + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + while (values.hasNext()) { + LongWritable cnt = values.next(); + if (cnt.get() < min) + min = cnt.get(); + if (cnt.get() > max) + max = cnt.get(); + total += cnt.get(); + } + output.collect(new Text(keyPrefix+"n"), new LongWritable(min)); + output.collect(new Text(keyPrefix+"x"), new LongWritable(max)); + output.collect(new Text(keyPrefix+"t"), new LongWritable(total)); + } + + public void reduce(Text key, Iterator<LongWritable> values, + OutputCollector<Text, LongWritable> output, Reporter reporter) + throws IOException { + val.set(0L); + String k = key.toString(); + if (k.equals("sc") || k.equals("ft") || k.equals("fi")) { + reduceMinMaxTotal(k, values, output, reporter); + } else { + while (values.hasNext()) { + LongWritable cnt = values.next(); + val.set(val.get() + cnt.get()); + } + output.collect(key, val); + } + } + } + + public static class CrawlDbStatReducer implements + Reducer<Text, LongWritable, Text, LongWritable> { + public void configure(JobConf job) { + } + + public void close() { + } + + public void reduce(Text key, Iterator<LongWritable> values, + OutputCollector<Text, LongWritable> output, Reporter reporter) + throws IOException { + + String k = key.toString(); + if (k.equals("T")) { + // sum all values for this key + long sum = 0; + while (values.hasNext()) { + sum += values.next().get(); + } + // output sum + output.collect(key, new LongWritable(sum)); + } else if (k.startsWith("status") || k.startsWith("retry")) { + LongWritable cnt = new LongWritable(); + while (values.hasNext()) { + LongWritable val = values.next(); + cnt.set(cnt.get() + val.get()); + } + output.collect(key, cnt); + } else if (k.equals("scx") || k.equals("ftx") || k.equals("fix")) { + LongWritable cnt = new LongWritable(Long.MIN_VALUE); + while (values.hasNext()) { + LongWritable val = values.next(); + if (cnt.get() < val.get()) + cnt.set(val.get()); + } + output.collect(key, cnt); + } else if (k.equals("scn") || k.equals("ftn") || k.equals("fin")) { + LongWritable cnt = new LongWritable(Long.MAX_VALUE); + while (values.hasNext()) { + LongWritable val = values.next(); + if (cnt.get() > val.get()) + cnt.set(val.get()); + } + output.collect(key, cnt); + } else if (k.equals("sct") || k.equals("ftt") || k.equals("fit")) { + LongWritable cnt = new LongWritable(); + while (values.hasNext()) { + LongWritable val = values.next(); + cnt.set(cnt.get() + val.get()); + } + output.collect(key, cnt); + } + } + } + + public static class CrawlDbTopNMapper implements + Mapper<Text, CrawlDatum, FloatWritable, Text> { + private static final FloatWritable fw = new FloatWritable(); + private float min = 0.0f; + + public void configure(JobConf job) { + min = job.getFloat("db.reader.topn.min", 0.0f); + } + + public void close() { + } + + public void map(Text key, CrawlDatum value, + OutputCollector<FloatWritable, Text> output, Reporter reporter) + throws IOException { + if (value.getScore() < min) + return; // don't collect low-scoring records + fw.set(-value.getScore()); // reverse sorting order + output.collect(fw, key); // invert mapping: score -> url + } + } + + public static class CrawlDbTopNReducer implements + Reducer<FloatWritable, Text, FloatWritable, Text> { + private long topN; + private long count = 0L; + + public void reduce(FloatWritable key, Iterator<Text> values, + OutputCollector<FloatWritable, Text> output, Reporter reporter) + throws IOException { + while (values.hasNext() && count < topN) { + key.set(-key.get()); + output.collect(key, values.next()); + count++; + } + } + + public void configure(JobConf job) { + topN = job.getLong("db.reader.topn", 100) / job.getNumReduceTasks(); + } + + public void close() { + } + } + + public void close() { + closeReaders(); + } + + private TreeMap<String, LongWritable> processStatJobHelper(String crawlDb, Configuration config, boolean sort) throws IOException{ + Path tmpFolder = new Path(crawlDb, "stat_tmp" + System.currentTimeMillis()); + + JobConf job = new NutchJob(config); + job.setJobName("stats " + crawlDb); + job.setBoolean("db.reader.stats.sort", sort); + + FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME)); + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(CrawlDbStatMapper.class); + job.setCombinerClass(CrawlDbStatCombiner.class); + job.setReducerClass(CrawlDbStatReducer.class); + + FileOutputFormat.setOutputPath(job, tmpFolder); + job.setOutputFormat(SequenceFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + + // https://issues.apache.org/jira/browse/NUTCH-1029 + job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); + + JobClient.runJob(job); + + // reading the result + FileSystem fileSystem = FileSystem.get(config); + SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(config, + tmpFolder); + + Text key = new Text(); + LongWritable value = new LongWritable(); + + TreeMap<String, LongWritable> stats = new TreeMap<String, LongWritable>(); + for (int i = 0; i < readers.length; i++) { + SequenceFile.Reader reader = readers[i]; + while (reader.next(key, value)) { + String k = key.toString(); + LongWritable val = stats.get(k); + if (val == null) { + val = new LongWritable(); + if (k.equals("scx") || k.equals("ftx") || k.equals("fix")) + val.set(Long.MIN_VALUE); + if (k.equals("scn") || k.equals("ftn") || k.equals("fin")) + val.set(Long.MAX_VALUE); + stats.put(k, val); + } + if (k.equals("scx") || k.equals("ftx") || k.equals("fix")) { + if (val.get() < value.get()) + val.set(value.get()); + } else if (k.equals("scn") || k.equals("ftn") || k.equals("fin")) { + if (val.get() > value.get()) + val.set(value.get()); + } else { + val.set(val.get() + value.get()); + } + } + reader.close(); + } + // removing the tmp folder + fileSystem.delete(tmpFolder, true); + return stats; + } + + public void processStatJob(String crawlDb, Configuration config, boolean sort) + throws IOException { + + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb statistics start: " + crawlDb); + } + TreeMap<String, LongWritable> stats = processStatJobHelper(crawlDb, config, sort); + + if (LOG.isInfoEnabled()) { + LOG.info("Statistics for CrawlDb: " + crawlDb); + LongWritable totalCnt = stats.get("T"); + stats.remove("T"); + LOG.info("TOTAL urls:\t" + totalCnt.get()); + for (Map.Entry<String, LongWritable> entry : stats.entrySet()) { + String k = entry.getKey(); + LongWritable val = entry.getValue(); + if (k.equals("scn")) { + LOG.info("min score:\t" + (val.get() / 1000.0f)); + } else if (k.equals("scx")) { + LOG.info("max score:\t" + (val.get() / 1000.0f)); + } else if (k.equals("sct")) { + LOG.info("avg score:\t" + + (float) ((((double) val.get()) / totalCnt.get()) / 1000.0)); + } else if (k.equals("ftn")) { + LOG.info("earliest fetch time:\t" + new Date(1000 * 60 * val.get())); + } else if (k.equals("ftx")) { + LOG.info("latest fetch time:\t" + new Date(1000 * 60 * val.get())); + } else if (k.equals("ftt")) { + LOG.info("avg of fetch times:\t" + + new Date(1000 * 60 * (val.get() / totalCnt.get()))); + } else if (k.equals("fin")) { + LOG.info("shortest fetch interval:\t{}", + TimingUtil.secondsToDaysHMS(val.get())); + } else if (k.equals("fix")) { + LOG.info("longest fetch interval:\t{}", + TimingUtil.secondsToDaysHMS(val.get())); + } else if (k.equals("fit")) { + LOG.info("avg fetch interval:\t{}", + TimingUtil.secondsToDaysHMS(val.get() / totalCnt.get())); + } else if (k.startsWith("status")) { + String[] st = k.split(" "); + int code = Integer.parseInt(st[1]); + if (st.length > 2) + LOG.info(" " + st[2] + " :\t" + val); + else + LOG.info(st[0] + " " + code + " (" + + CrawlDatum.getStatusName((byte) code) + "):\t" + val); + } else + LOG.info(k + ":\t" + val); + } + } + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb statistics: done"); + } + + } + + public CrawlDatum get(String crawlDb, String url, JobConf config) + throws IOException { + Text key = new Text(url); + CrawlDatum val = new CrawlDatum(); + openReaders(crawlDb, config); + CrawlDatum res = (CrawlDatum) MapFileOutputFormat.getEntry(readers, + new HashPartitioner<Text, CrawlDatum>(), key, val); + return res; + } + + public void readUrl(String crawlDb, String url, JobConf config) + throws IOException { + CrawlDatum res = get(crawlDb, url, config); + System.out.println("URL: " + url); + if (res != null) { + System.out.println(res); + } else { + System.out.println("not found"); + } + } + + public void processDumpJob(String crawlDb, String output, + JobConf config, String format, String regex, String status, + Integer retry, String expr) throws IOException { + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb dump: starting"); + LOG.info("CrawlDb db: " + crawlDb); + } + + Path outFolder = new Path(output); + + JobConf job = new NutchJob(config); + job.setJobName("dump " + crawlDb); + + FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME)); + job.setInputFormat(SequenceFileInputFormat.class); + FileOutputFormat.setOutputPath(job, outFolder); + + if (format.equals("csv")) { + job.setOutputFormat(CrawlDatumCsvOutputFormat.class); + } else if (format.equals("crawldb")) { + job.setOutputFormat(MapFileOutputFormat.class); + } else { + job.setOutputFormat(TextOutputFormat.class); + } + + if (status != null) + job.set("status", status); + if (regex != null) + job.set("regex", regex); + if (retry != null) + job.setInt("retry", retry); + if (expr != null) { + job.set("expr", expr); + LOG.info("CrawlDb db: expr: " + expr); + } + + job.setMapperClass(CrawlDbDumpMapper.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(CrawlDatum.class); + + JobClient.runJob(job); + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb dump: done"); + } + } + + public static class CrawlDbDumpMapper implements + Mapper<Text, CrawlDatum, Text, CrawlDatum> { + Pattern pattern = null; + Matcher matcher = null; + String status = null; + Integer retry = null; + Expression expr = null; + + public void configure(JobConf job) { + if (job.get("regex", null) != null) { + pattern = Pattern.compile(job.get("regex")); + } + status = job.get("status", null); + retry = job.getInt("retry", -1); + + if (job.get("expr", null) != null) { + expr = JexlUtil.parseExpression(job.get("expr", null)); + } + } + + public void close() { + } + + public void map(Text key, CrawlDatum value, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + + // check retry + if (retry != -1) { + if (value.getRetriesSinceFetch() < retry) { + return; + } + } + + // check status + if (status != null + && !status.equalsIgnoreCase(CrawlDatum.getStatusName(value + .getStatus()))) + return; + + // check regex + if (pattern != null) { + matcher = pattern.matcher(key.toString()); + if (!matcher.matches()) { + return; + } + } + + // check expr + if (expr != null) { + if (!value.evaluate(expr)) { + return; + } + } + + output.collect(key, value); + } + } + + public void processTopNJob(String crawlDb, long topN, float min, + String output, JobConf config) throws IOException { + + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")"); + LOG.info("CrawlDb db: " + crawlDb); + } + + Path outFolder = new Path(output); + Path tempDir = new Path(config.get("mapred.temp.dir", ".") + + "/readdb-topN-temp-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + JobConf job = new NutchJob(config); + job.setJobName("topN prepare " + crawlDb); + FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME)); + job.setInputFormat(SequenceFileInputFormat.class); + job.setMapperClass(CrawlDbTopNMapper.class); + job.setReducerClass(IdentityReducer.class); + + FileOutputFormat.setOutputPath(job, tempDir); + job.setOutputFormat(SequenceFileOutputFormat.class); + job.setOutputKeyClass(FloatWritable.class); + job.setOutputValueClass(Text.class); + + job.setFloat("db.reader.topn.min", min); + JobClient.runJob(job); + + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb topN: collecting topN scores."); + } + job = new NutchJob(config); + job.setJobName("topN collect " + crawlDb); + job.setLong("db.reader.topn", topN); + + FileInputFormat.addInputPath(job, tempDir); + job.setInputFormat(SequenceFileInputFormat.class); + job.setMapperClass(IdentityMapper.class); + job.setReducerClass(CrawlDbTopNReducer.class); + + FileOutputFormat.setOutputPath(job, outFolder); + job.setOutputFormat(TextOutputFormat.class); + job.setOutputKeyClass(FloatWritable.class); + job.setOutputValueClass(Text.class); + + job.setNumReduceTasks(1); // create a single file. + + JobClient.runJob(job); + FileSystem fs = FileSystem.get(config); + fs.delete(tempDir, true); + if (LOG.isInfoEnabled()) { + LOG.info("CrawlDb topN: done"); + } + + } + + public int run(String[] args) throws IOException { + @SuppressWarnings("resource") + CrawlDbReader dbr = new CrawlDbReader(); + + if (args.length < 2) { + System.err + .println("Usage: CrawlDbReader <crawldb> (-stats | -dump <out_dir> | -topN <nnnn> <out_dir> [<min>] | -url <url>)"); + System.err + .println("\t<crawldb>\tdirectory name where crawldb is located"); + System.err + .println("\t-stats [-sort] \tprint overall statistics to System.out"); + System.err.println("\t\t[-sort]\tlist status sorted by host"); + System.err + .println("\t-dump <out_dir> [-format normal|csv|crawldb]\tdump the whole db to a text file in <out_dir>"); + System.err.println("\t\t[-format csv]\tdump in Csv format"); + System.err + .println("\t\t[-format normal]\tdump in standard format (default option)"); + System.err.println("\t\t[-format crawldb]\tdump as CrawlDB"); + System.err.println("\t\t[-regex <expr>]\tfilter records with expression"); + System.err.println("\t\t[-retry <num>]\tminimum retry count"); + System.err + .println("\t\t[-status <status>]\tfilter records by CrawlDatum status"); + System.err.println("\t\t[-expr <expr>]\tJexl expression to evaluate for this record"); + System.err + .println("\t-url <url>\tprint information on <url> to System.out"); + System.err + .println("\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn> urls sorted by score to <out_dir>"); + System.err + .println("\t\t[<min>]\tskip records with scores below this value."); + System.err.println("\t\t\tThis can significantly improve performance."); + return -1; + } + String param = null; + String crawlDb = args[0]; + JobConf job = new NutchJob(getConf()); + for (int i = 1; i < args.length; i++) { + if (args[i].equals("-stats")) { + boolean toSort = false; + if (i < args.length - 1 && "-sort".equals(args[i + 1])) { + toSort = true; + i++; + } + dbr.processStatJob(crawlDb, job, toSort); + } else if (args[i].equals("-dump")) { + param = args[++i]; + String format = "normal"; + String regex = null; + Integer retry = null; + String status = null; + String expr = null; + for (int j = i + 1; j < args.length; j++) { + if (args[j].equals("-format")) { + format = args[++j]; + i = i + 2; + } + if (args[j].equals("-regex")) { + regex = args[++j]; + i = i + 2; + } + if (args[j].equals("-retry")) { + retry = Integer.parseInt(args[++j]); + i = i + 2; + } + if (args[j].equals("-status")) { + status = args[++j]; + i = i + 2; + } + if (args[j].equals("-expr")) { + expr = args[++j]; + i=i+2; + } + } + dbr.processDumpJob(crawlDb, param, job, format, regex, status, retry, expr); + } else if (args[i].equals("-url")) { + param = args[++i]; + dbr.readUrl(crawlDb, param, job); + } else if (args[i].equals("-topN")) { + param = args[++i]; + long topN = Long.parseLong(param); + param = args[++i]; + float min = 0.0f; + if (i < args.length - 1) { + min = Float.parseFloat(args[++i]); + } + dbr.processTopNJob(crawlDb, topN, min, param, job); + } else { + System.err.println("\nError: wrong argument " + args[i]); + return -1; + } + } + return 0; + } + + public static void main(String[] args) throws Exception { + int result = ToolRunner.run(NutchConfiguration.create(), + new CrawlDbReader(), args); + System.exit(result); + } + + public Object query(Map<String, String> args, Configuration conf, String type, String crawlId) throws Exception { + + + Map<String, Object> results = new HashMap<String, Object>(); + String crawlDb = crawlId + "/crawldb"; + + if(type.equalsIgnoreCase("stats")){ + boolean sort = false; + if(args.containsKey("sort")){ + if(args.get("sort").equalsIgnoreCase("true")) + sort = true; + } + TreeMap<String , LongWritable> stats = processStatJobHelper(crawlDb, NutchConfiguration.create(), sort); + LongWritable totalCnt = stats.get("T"); + stats.remove("T"); + results.put("totalUrls", String.valueOf(totalCnt.get())); + Map<String, Object> statusMap = new HashMap<String, Object>(); + + for (Map.Entry<String, LongWritable> entry : stats.entrySet()) { + String k = entry.getKey(); + LongWritable val = entry.getValue(); + if (k.equals("scn")) { + results.put("minScore", String.valueOf((val.get() / 1000.0f))); + } else if (k.equals("scx")) { + results.put("maxScore", String.valueOf((val.get() / 1000.0f))); + } else if (k.equals("sct")) { + results.put("avgScore", String.valueOf((float) ((((double) val.get()) / totalCnt.get()) / 1000.0))); + } else if (k.startsWith("status")) { + String[] st = k.split(" "); + int code = Integer.parseInt(st[1]); + if (st.length > 2){ + @SuppressWarnings("unchecked") + Map<String, Object> individualStatusInfo = (Map<String, Object>) statusMap.get(String.valueOf(code)); + Map<String, String> hostValues; + if(individualStatusInfo.containsKey("hostValues")){ + hostValues= (Map<String, String>) individualStatusInfo.get("hostValues"); + } + else{ + hostValues = new HashMap<String, String>(); + individualStatusInfo.put("hostValues", hostValues); + } + hostValues.put(st[2], String.valueOf(val)); + } + else{ + Map<String, Object> individualStatusInfo = new HashMap<String, Object>(); + + individualStatusInfo.put("statusValue", CrawlDatum.getStatusName((byte) code)); + individualStatusInfo.put("count", String.valueOf(val)); + + statusMap.put(String.valueOf(code), individualStatusInfo); + } + } else + results.put(k, String.valueOf(val)); + } + results.put("status", statusMap); + return results; + } + if(type.equalsIgnoreCase("dump")){ + String output = args.get("out_dir"); + String format = "normal"; + String regex = null; + Integer retry = null; + String status = null; + String expr = null; + if (args.containsKey("format")) { + format = args.get("format"); + } + if (args.containsKey("regex")) { + regex = args.get("regex"); + } + if (args.containsKey("retry")) { + retry = Integer.parseInt(args.get("retry")); + } + if (args.containsKey("status")) { + status = args.get("status"); + } + if (args.containsKey("expr")) { + expr = args.get("expr"); + } + processDumpJob(crawlDb, output, new NutchJob(conf), format, regex, status, retry, expr); + File dumpFile = new File(output+"/part-00000"); + return dumpFile; + } + if (type.equalsIgnoreCase("topN")) { + String output = args.get("out_dir"); + long topN = Long.parseLong(args.get("nnn")); + float min = 0.0f; + if(args.containsKey("min")){ + min = Float.parseFloat(args.get("min")); + } + processTopNJob(crawlDb, topN, min, output, new NutchJob(conf)); + File dumpFile = new File(output+"/part-00000"); + return dumpFile; + } + + if(type.equalsIgnoreCase("url")){ + String url = args.get("url"); + CrawlDatum res = get(crawlDb, url, new NutchJob(conf)); + results.put("status", res.getStatus()); + results.put("fetchTime", new Date(res.getFetchTime())); + results.put("modifiedTime", new Date(res.getModifiedTime())); + results.put("retriesSinceFetch", res.getRetriesSinceFetch()); + results.put("retryInterval", res.getFetchInterval()); + results.put("score", res.getScore()); + results.put("signature", StringUtil.toHexString(res.getSignature())); + Map<String, String> metadata = new HashMap<String, String>(); + if(res.getMetaData()!=null){ + for (Entry<Writable, Writable> e : res.getMetaData().entrySet()) { + metadata.put(String.valueOf(e.getKey()), String.valueOf(e.getValue())); + } + } + results.put("metadata", metadata); + + return results; + } + return results; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReducer.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReducer.java b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReducer.java new file mode 100644 index 0000000..1ae73b8 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReducer.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.io.IOException; + +// Logging imports +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.PriorityQueue; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.scoring.ScoringFilterException; +import org.apache.nutch.scoring.ScoringFilters; + +/** Merge new page entries with existing entries. */ +public class CrawlDbReducer implements + Reducer<Text, CrawlDatum, Text, CrawlDatum> { + public static final Logger LOG = LoggerFactory + .getLogger(CrawlDbReducer.class); + + private int retryMax; + private CrawlDatum result = new CrawlDatum(); + private InlinkPriorityQueue linked = null; + private ScoringFilters scfilters = null; + private boolean additionsAllowed; + private int maxInterval; + private FetchSchedule schedule; + + public void configure(JobConf job) { + retryMax = job.getInt("db.fetch.retry.max", 3); + scfilters = new ScoringFilters(job); + additionsAllowed = job.getBoolean(CrawlDb.CRAWLDB_ADDITIONS_ALLOWED, true); + maxInterval = job.getInt("db.fetch.interval.max", 0); + schedule = FetchScheduleFactory.getFetchSchedule(job); + int maxLinks = job.getInt("db.update.max.inlinks", 10000); + linked = new InlinkPriorityQueue(maxLinks); + } + + public void close() { + } + + public void reduce(Text key, Iterator<CrawlDatum> values, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + + CrawlDatum fetch = new CrawlDatum(); + CrawlDatum old = new CrawlDatum(); + + boolean fetchSet = false; + boolean oldSet = false; + byte[] signature = null; + boolean multiple = false; // avoid deep copy when only single value exists + linked.clear(); + org.apache.hadoop.io.MapWritable metaFromParse = null; + + while (values.hasNext()) { + CrawlDatum datum = values.next(); + if (!multiple && values.hasNext()) + multiple = true; + if (CrawlDatum.hasDbStatus(datum)) { + if (!oldSet) { + if (multiple) { + old.set(datum); + } else { + // no need for a deep copy - this is the only value + old = datum; + } + oldSet = true; + } else { + // always take the latest version + if (old.getFetchTime() < datum.getFetchTime()) + old.set(datum); + } + continue; + } + + if (CrawlDatum.hasFetchStatus(datum)) { + if (!fetchSet) { + if (multiple) { + fetch.set(datum); + } else { + fetch = datum; + } + fetchSet = true; + } else { + // always take the latest version + if (fetch.getFetchTime() < datum.getFetchTime()) + fetch.set(datum); + } + continue; + } + + switch (datum.getStatus()) { // collect other info + case CrawlDatum.STATUS_LINKED: + CrawlDatum link; + if (multiple) { + link = new CrawlDatum(); + link.set(datum); + } else { + link = datum; + } + linked.insert(link); + break; + case CrawlDatum.STATUS_SIGNATURE: + signature = datum.getSignature(); + break; + case CrawlDatum.STATUS_PARSE_META: + metaFromParse = datum.getMetaData(); + break; + default: + LOG.warn("Unknown status, key: " + key + ", datum: " + datum); + } + } + + // copy the content of the queue into a List + // in reversed order + int numLinks = linked.size(); + List<CrawlDatum> linkList = new ArrayList<CrawlDatum>(numLinks); + for (int i = numLinks - 1; i >= 0; i--) { + linkList.add(linked.pop()); + } + + // if it doesn't already exist, skip it + if (!oldSet && !additionsAllowed) + return; + + // if there is no fetched datum, perhaps there is a link + if (!fetchSet && linkList.size() > 0) { + fetch = linkList.get(0); + fetchSet = true; + } + + // still no new data - record only unchanged old data, if exists, and return + if (!fetchSet) { + if (oldSet) {// at this point at least "old" should be present + output.collect(key, old); + reporter.getCounter("CrawlDB status", + CrawlDatum.getStatusName(old.getStatus())).increment(1); + } else { + LOG.warn("Missing fetch and old value, signature=" + signature); + } + return; + } + + if (signature == null) + signature = fetch.getSignature(); + long prevModifiedTime = oldSet ? old.getModifiedTime() : 0L; + long prevFetchTime = oldSet ? old.getFetchTime() : 0L; + + // initialize with the latest version, be it fetch or link + result.set(fetch); + if (oldSet) { + // copy metadata from old, if exists + if (old.getMetaData().size() > 0) { + result.putAllMetaData(old); + // overlay with new, if any + if (fetch.getMetaData().size() > 0) + result.putAllMetaData(fetch); + } + // set the most recent valid value of modifiedTime + if (old.getModifiedTime() > 0 && fetch.getModifiedTime() == 0) { + result.setModifiedTime(old.getModifiedTime()); + } + } + + switch (fetch.getStatus()) { // determine new status + + case CrawlDatum.STATUS_LINKED: // it was link + if (oldSet) { // if old exists + result.set(old); // use it + } else { + result = schedule.initializeSchedule(key, result); + result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); + try { + scfilters.initialScore(key, result); + } catch (ScoringFilterException e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Cannot filter init score for url " + key + + ", using default: " + e.getMessage()); + } + result.setScore(0.0f); + } + } + break; + + case CrawlDatum.STATUS_FETCH_SUCCESS: // succesful fetch + case CrawlDatum.STATUS_FETCH_REDIR_TEMP: // successful fetch, redirected + case CrawlDatum.STATUS_FETCH_REDIR_PERM: + case CrawlDatum.STATUS_FETCH_NOTMODIFIED: // successful fetch, notmodified + // https://issues.apache.org/jira/browse/NUTCH-1656 + if (metaFromParse != null) { + for (Entry<Writable, Writable> e : metaFromParse.entrySet()) { + result.getMetaData().put(e.getKey(), e.getValue()); + } + } + + // determine the modification status + int modified = FetchSchedule.STATUS_UNKNOWN; + if (fetch.getStatus() == CrawlDatum.STATUS_FETCH_NOTMODIFIED) { + modified = FetchSchedule.STATUS_NOTMODIFIED; + } else if (fetch.getStatus() == CrawlDatum.STATUS_FETCH_SUCCESS) { + // only successful fetches (but not redirects, NUTCH-1422) + // are detected as "not modified" by signature comparison + if (oldSet && old.getSignature() != null && signature != null) { + if (SignatureComparator._compare(old.getSignature(), signature) != 0) { + modified = FetchSchedule.STATUS_MODIFIED; + } else { + modified = FetchSchedule.STATUS_NOTMODIFIED; + } + } + } + // set the schedule + result = schedule.setFetchSchedule(key, result, prevFetchTime, + prevModifiedTime, fetch.getFetchTime(), fetch.getModifiedTime(), + modified); + // set the result status and signature + if (modified == FetchSchedule.STATUS_NOTMODIFIED) { + result.setStatus(CrawlDatum.STATUS_DB_NOTMODIFIED); + + // NUTCH-1341 The page is not modified according to its signature, let's + // reset lastModified as well + result.setModifiedTime(prevModifiedTime); + + if (oldSet) + result.setSignature(old.getSignature()); + } else { + switch (fetch.getStatus()) { + case CrawlDatum.STATUS_FETCH_SUCCESS: + result.setStatus(CrawlDatum.STATUS_DB_FETCHED); + break; + case CrawlDatum.STATUS_FETCH_REDIR_PERM: + result.setStatus(CrawlDatum.STATUS_DB_REDIR_PERM); + break; + case CrawlDatum.STATUS_FETCH_REDIR_TEMP: + result.setStatus(CrawlDatum.STATUS_DB_REDIR_TEMP); + break; + default: + LOG.warn("Unexpected status: " + fetch.getStatus() + + " resetting to old status."); + if (oldSet) + result.setStatus(old.getStatus()); + else + result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); + } + result.setSignature(signature); + } + + // if fetchInterval is larger than the system-wide maximum, trigger + // an unconditional recrawl. This prevents the page to be stuck at + // NOTMODIFIED state, when the old fetched copy was already removed with + // old segments. + if (maxInterval < result.getFetchInterval()) + result = schedule.forceRefetch(key, result, false); + break; + case CrawlDatum.STATUS_SIGNATURE: + if (LOG.isWarnEnabled()) { + LOG.warn("Lone CrawlDatum.STATUS_SIGNATURE: " + key); + } + return; + case CrawlDatum.STATUS_FETCH_RETRY: // temporary failure + if (oldSet) { + result.setSignature(old.getSignature()); // use old signature + } + result = schedule.setPageRetrySchedule(key, result, prevFetchTime, + prevModifiedTime, fetch.getFetchTime()); + if (result.getRetriesSinceFetch() < retryMax) { + result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); + } else { + result.setStatus(CrawlDatum.STATUS_DB_GONE); + result = schedule.setPageGoneSchedule(key, result, prevFetchTime, + prevModifiedTime, fetch.getFetchTime()); + } + break; + + case CrawlDatum.STATUS_FETCH_GONE: // permanent failure + if (oldSet) + result.setSignature(old.getSignature()); // use old signature + result.setStatus(CrawlDatum.STATUS_DB_GONE); + result = schedule.setPageGoneSchedule(key, result, prevFetchTime, + prevModifiedTime, fetch.getFetchTime()); + break; + + default: + throw new RuntimeException("Unknown status: " + fetch.getStatus() + " " + + key); + } + + try { + scfilters.updateDbScore(key, oldSet ? old : null, result, linkList); + } catch (Exception e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Couldn't update score, key=" + key + ": " + e); + } + } + // remove generation time, if any + result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY); + output.collect(key, result); + reporter.getCounter("CrawlDB status", + CrawlDatum.getStatusName(result.getStatus())).increment(1); + } + +} + +class InlinkPriorityQueue extends PriorityQueue<CrawlDatum> { + + public InlinkPriorityQueue(int maxSize) { + initialize(maxSize); + } + + /** Determines the ordering of objects in this priority queue. **/ + protected boolean lessThan(Object arg0, Object arg1) { + CrawlDatum candidate = (CrawlDatum) arg0; + CrawlDatum least = (CrawlDatum) arg1; + return candidate.getScore() > least.getScore(); + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/DeduplicationJob.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/DeduplicationJob.java b/nutch-core/src/main/java/org/apache/nutch/crawl/DeduplicationJob.java new file mode 100644 index 0000000..c439570 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/DeduplicationJob.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.crawl; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.text.SimpleDateFormat; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; +import java.util.Arrays; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Counters.Group; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.CrawlDb; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.NutchTool; +import org.apache.nutch.util.TimingUtil; +import org.apache.nutch.util.URLUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Generic deduplicator which groups fetched URLs with the same digest and marks + * all of them as duplicate except the one with the highest score (based on the + * score in the crawldb, which is not necessarily the same as the score + * indexed). If two (or more) documents have the same score, then the document + * with the latest timestamp is kept. If the documents have the same timestamp + * then the one with the shortest URL is kept. The documents marked as duplicate + * can then be deleted with the command CleaningJob. + ***/ +public class DeduplicationJob extends NutchTool implements Tool { + + public static final Logger LOG = LoggerFactory + .getLogger(DeduplicationJob.class); + + private final static Text urlKey = new Text("_URLTEMPKEY_"); + private final static String DEDUPLICATION_GROUP_MODE = "deduplication.group.mode"; + private final static String DEDUPLICATION_COMPARE_ORDER = "deduplication.compare.order"; + + public static class DBFilter implements + Mapper<Text, CrawlDatum, BytesWritable, CrawlDatum> { + + private String groupMode; + + @Override + public void configure(JobConf arg0) { + groupMode = arg0.get(DEDUPLICATION_GROUP_MODE); + } + + @Override + public void close() throws IOException { + } + + @Override + public void map(Text key, CrawlDatum value, + OutputCollector<BytesWritable, CrawlDatum> output, Reporter reporter) + throws IOException { + + if (value.getStatus() == CrawlDatum.STATUS_DB_FETCHED + || value.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) { + // || value.getStatus() ==CrawlDatum.STATUS_DB_GONE){ + byte[] signature = value.getSignature(); + if (signature == null) + return; + String url = key.toString(); + BytesWritable sig = null; + byte[] data; + switch (groupMode) { + case "none": + sig = new BytesWritable(signature); + break; + case "host": + byte[] host = URLUtil.getHost(url).getBytes(); + data = new byte[signature.length + host.length]; + System.arraycopy(signature, 0, data, 0, signature.length); + System.arraycopy(host, 0, data, signature.length, host.length); + sig = new BytesWritable(data); + break; + case "domain": + byte[] domain = URLUtil.getDomainName(url).getBytes(); + data = new byte[signature.length + domain.length]; + System.arraycopy(signature, 0, data, 0, signature.length); + System.arraycopy(domain, 0, data, signature.length, domain.length); + sig = new BytesWritable(data); + break; + } + // add the URL as a temporary MD + value.getMetaData().put(urlKey, key); + // reduce on the signature optionall grouped on host or domain or not at all + output.collect(sig, value); + } + } + } + + public static class DedupReducer implements + Reducer<BytesWritable, CrawlDatum, Text, CrawlDatum> { + + private String[] compareOrder; + + @Override + public void configure(JobConf arg0) { + compareOrder = arg0.get(DEDUPLICATION_COMPARE_ORDER).split(","); + } + + private void writeOutAsDuplicate(CrawlDatum datum, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + datum.setStatus(CrawlDatum.STATUS_DB_DUPLICATE); + Text key = (Text) datum.getMetaData().remove(urlKey); + reporter.incrCounter("DeduplicationJobStatus", + "Documents marked as duplicate", 1); + output.collect(key, datum); + } + + @Override + public void reduce(BytesWritable key, Iterator<CrawlDatum> values, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + CrawlDatum existingDoc = null; + + outerloop: + while (values.hasNext()) { + if (existingDoc == null) { + existingDoc = new CrawlDatum(); + existingDoc.set(values.next()); + continue; + } + CrawlDatum newDoc = values.next(); + + for (int i = 0; i < compareOrder.length; i++) { + switch (compareOrder[i]) { + case "score": + // compare based on score + if (existingDoc.getScore() < newDoc.getScore()) { + writeOutAsDuplicate(existingDoc, output, reporter); + existingDoc = new CrawlDatum(); + existingDoc.set(newDoc); + continue outerloop; + } else if (existingDoc.getScore() > newDoc.getScore()) { + // mark new one as duplicate + writeOutAsDuplicate(newDoc, output, reporter); + continue outerloop; + } + break; + case "fetchTime": + // same score? delete the one which is oldest + if (existingDoc.getFetchTime() > newDoc.getFetchTime()) { + // mark new one as duplicate + writeOutAsDuplicate(newDoc, output, reporter); + continue outerloop; + } else if (existingDoc.getFetchTime() < newDoc.getFetchTime()) { + // mark existing one as duplicate + writeOutAsDuplicate(existingDoc, output, reporter); + existingDoc = new CrawlDatum(); + existingDoc.set(newDoc); + continue outerloop; + } + break; + case "urlLength": + // same time? keep the one which has the shortest URL + String urlExisting; + String urlnewDoc; + try { + urlExisting = URLDecoder.decode(existingDoc.getMetaData().get(urlKey).toString(), "UTF8"); + urlnewDoc = URLDecoder.decode(newDoc.getMetaData().get(urlKey).toString(), "UTF8"); + } catch (UnsupportedEncodingException e) { + LOG.error("Error decoding: " + urlKey); + throw new IOException("UnsupportedEncodingException for " + urlKey); + } + if (urlExisting.length() < urlnewDoc.length()) { + // mark new one as duplicate + writeOutAsDuplicate(newDoc, output, reporter); + continue outerloop; + } else if (urlExisting.length() > urlnewDoc.length()) { + // mark existing one as duplicate + writeOutAsDuplicate(existingDoc, output, reporter); + existingDoc = new CrawlDatum(); + existingDoc.set(newDoc); + continue outerloop; + } + break; + } + } + + } + } + + @Override + public void close() throws IOException { + + } + } + + /** Combine multiple new entries for a url. */ + public static class StatusUpdateReducer implements + Reducer<Text, CrawlDatum, Text, CrawlDatum> { + + public void configure(JobConf job) { + } + + public void close() { + } + + private CrawlDatum old = new CrawlDatum(); + private CrawlDatum duplicate = new CrawlDatum(); + + public void reduce(Text key, Iterator<CrawlDatum> values, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + boolean duplicateSet = false; + + while (values.hasNext()) { + CrawlDatum val = values.next(); + if (val.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) { + duplicate.set(val); + duplicateSet = true; + } else { + old.set(val); + } + } + + // keep the duplicate if there is one + if (duplicateSet) { + output.collect(key, duplicate); + return; + } + + // no duplicate? keep old one then + output.collect(key, old); + } + } + + public int run(String[] args) throws IOException { + if (args.length < 1) { + System.err.println("Usage: DeduplicationJob <crawldb> [-group <none|host|domain>] [-compareOrder <score>,<fetchTime>,<urlLength>]"); + return 1; + } + + String group = "none"; + String crawldb = args[0]; + String compareOrder = "score,fetchTime,urlLength"; + + for (int i = 1; i < args.length; i++) { + if (args[i].equals("-group")) + group = args[++i]; + if (args[i].equals("-compareOrder")) { + compareOrder = args[++i]; + + if (compareOrder.indexOf("score") == -1 || + compareOrder.indexOf("fetchTime") == -1 || + compareOrder.indexOf("urlLength") == -1) { + System.err.println("DeduplicationJob: compareOrder must contain score, fetchTime and urlLength."); + return 1; + } + } + } + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("DeduplicationJob: starting at " + sdf.format(start)); + + Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + + "/dedup-temp-" + + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + JobConf job = new NutchJob(getConf()); + + job.setJobName("Deduplication on " + crawldb); + job.set(DEDUPLICATION_GROUP_MODE, group); + job.set(DEDUPLICATION_COMPARE_ORDER, compareOrder); + + FileInputFormat.addInputPath(job, new Path(crawldb, CrawlDb.CURRENT_NAME)); + job.setInputFormat(SequenceFileInputFormat.class); + + FileOutputFormat.setOutputPath(job, tempDir); + job.setOutputFormat(SequenceFileOutputFormat.class); + + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(CrawlDatum.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(CrawlDatum.class); + + job.setMapperClass(DBFilter.class); + job.setReducerClass(DedupReducer.class); + + try { + RunningJob rj = JobClient.runJob(job); + Group g = rj.getCounters().getGroup("DeduplicationJobStatus"); + if (g != null) { + long dups = g.getCounter("Documents marked as duplicate"); + LOG.info("Deduplication: " + (int) dups + + " documents marked as duplicates"); + } + } catch (final Exception e) { + LOG.error("DeduplicationJob: " + StringUtils.stringifyException(e)); + return -1; + } + + // merge with existing crawl db + if (LOG.isInfoEnabled()) { + LOG.info("Deduplication: Updating status of duplicate urls into crawl db."); + } + + Path dbPath = new Path(crawldb); + JobConf mergeJob = CrawlDb.createJob(getConf(), dbPath); + FileInputFormat.addInputPath(mergeJob, tempDir); + mergeJob.setReducerClass(StatusUpdateReducer.class); + + try { + JobClient.runJob(mergeJob); + } catch (final Exception e) { + LOG.error("DeduplicationMergeJob: " + StringUtils.stringifyException(e)); + return -1; + } + + CrawlDb.install(mergeJob, dbPath); + + // clean up + FileSystem fs = FileSystem.get(getConf()); + fs.delete(tempDir, true); + + long end = System.currentTimeMillis(); + LOG.info("Deduplication finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + + return 0; + } + + public static void main(String[] args) throws Exception { + int result = ToolRunner.run(NutchConfiguration.create(), + new DeduplicationJob(), args); + System.exit(result); + } + + @Override + public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception { + Map<String, Object> results = new HashMap<String, Object>(); + String[] arg = new String[1]; + String crawldb; + if(args.containsKey(Nutch.ARG_CRAWLDB)) { + crawldb = (String)args.get(Nutch.ARG_CRAWLDB); + } + else { + crawldb = crawlId+"/crawldb"; + } + arg[0] = crawldb; + int res = run(arg); + results.put(Nutch.VAL_RESULT, Integer.toString(res)); + return results; + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/DefaultFetchSchedule.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/DefaultFetchSchedule.java b/nutch-core/src/main/java/org/apache/nutch/crawl/DefaultFetchSchedule.java new file mode 100755 index 0000000..4a60a1c --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/crawl/DefaultFetchSchedule.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import org.apache.hadoop.io.Text; + +/** + * This class implements the default re-fetch schedule. That is, no matter if + * the page was changed or not, the <code>fetchInterval</code> remains + * unchanged, and the updated page fetchTime will always be set to + * <code>fetchTime + fetchInterval * 1000</code>. + * + * @author Andrzej Bialecki + */ +public class DefaultFetchSchedule extends AbstractFetchSchedule { + + @Override + public CrawlDatum setFetchSchedule(Text url, CrawlDatum datum, + long prevFetchTime, long prevModifiedTime, long fetchTime, + long modifiedTime, int state) { + datum = super.setFetchSchedule(url, datum, prevFetchTime, prevModifiedTime, + fetchTime, modifiedTime, state); + if (datum.getFetchInterval() == 0) { + datum.setFetchInterval(defaultInterval); + } + datum.setFetchTime(fetchTime + (long) datum.getFetchInterval() * 1000); + datum.setModifiedTime(modifiedTime); + return datum; + } +}
