NUTCH-1712 applied to current trunk; run first simple tests (inject + merge)
Project: http://git-wip-us.apache.org/repos/asf/nutch/repo Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/3c691eb2 Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/3c691eb2 Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/3c691eb2 Branch: refs/heads/master Commit: 3c691eb2823cb85c9ffe95e9212ce7ac0e564709 Parents: 25e879a Author: Sebastian Nagel <sna...@apache.org> Authored: Mon Oct 19 21:48:05 2015 +0200 Committer: Sebastian Nagel <sna...@apache.org> Committed: Thu Feb 25 21:26:30 2016 +0100 ---------------------------------------------------------------------- src/java/org/apache/nutch/crawl/CrawlDb.java | 19 + src/java/org/apache/nutch/crawl/Injector.java | 599 ++++++++++++--------- 2 files changed, 360 insertions(+), 258 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nutch/blob/3c691eb2/src/java/org/apache/nutch/crawl/CrawlDb.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/nutch/crawl/CrawlDb.java b/src/java/org/apache/nutch/crawl/CrawlDb.java index 053e8fb..1537cdc 100644 --- a/src/java/org/apache/nutch/crawl/CrawlDb.java +++ b/src/java/org/apache/nutch/crawl/CrawlDb.java @@ -28,8 +28,10 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.*; import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.util.FSUtils; import org.apache.nutch.util.HadoopFSUtil; import org.apache.nutch.util.LockUtil; import org.apache.nutch.util.NutchConfiguration; @@ -173,6 +175,23 @@ public class CrawlDb extends NutchTool implements Tool { LockUtil.removeLockFile(fs, lock); } + public static void install(Job job, Path crawlDb) throws IOException { + Configuration conf = job.getConfiguration(); + boolean preserveBackup = conf.getBoolean("db.preserve.backup", true); + FileSystem fs = FileSystem.get(conf); + Path old = new Path(crawlDb, "old"); + Path current = new Path(crawlDb, CURRENT_NAME); + Path tempCrawlDb = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat + .getOutputPath(job); + FSUtils.replace(fs, old, current, true); + FSUtils.replace(fs, current, tempCrawlDb, true); + Path lock = new Path(crawlDb, LOCK_NAME); + LockUtil.removeLockFile(fs, lock); + if (!preserveBackup && fs.exists(old)) { + fs.delete(old, true); + } + } + public static void main(String[] args) throws Exception { int res = ToolRunner.run(NutchConfiguration.create(), new CrawlDb(), args); System.exit(res); http://git-wip-us.apache.org/repos/asf/nutch/blob/3c691eb2/src/java/org/apache/nutch/crawl/Injector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/nutch/crawl/Injector.java b/src/java/org/apache/nutch/crawl/Injector.java index dc1f1cf..0d01dc8 100644 --- a/src/java/org/apache/nutch/crawl/Injector.java +++ b/src/java/org/apache/nutch/crawl/Injector.java @@ -17,211 +17,267 @@ package org.apache.nutch.crawl; -import java.io.*; -import java.text.SimpleDateFormat; -import java.util.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; -// Commons Logging imports -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.io.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.util.*; -import org.apache.nutch.net.*; import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.net.URLNormalizers; import org.apache.nutch.scoring.ScoringFilterException; import org.apache.nutch.scoring.ScoringFilters; +import org.apache.nutch.util.LockUtil; import org.apache.nutch.util.NutchConfiguration; -import org.apache.nutch.util.NutchJob; import org.apache.nutch.util.NutchTool; import org.apache.nutch.util.TimingUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + /** - * This class takes a flat file of URLs and adds them to the of pages to be - * crawled. Useful for bootstrapping the system. The URL files contain one URL - * per line, optionally followed by custom metadata separated by tabs with the - * metadata key separated from the corresponding value by '='. <br> - * Note that some metadata keys are reserved : <br> - * - <i>nutch.score</i> : allows to set a custom score for a specific URL <br> - * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a - * specific URL <br> - * - <i>nutch.fetchInterval.fixed</i> : allows to set a custom fetch interval - * for a specific URL that is not changed by AdaptiveFetchSchedule <br> - * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 - * \t userType=open_source + * Injector takes a flat file of URLs and merges ("injects") these URLs into the + * CrawlDb. Useful for bootstrapping a Nutch crawl. The URL files contain one + * URL per line, optionally followed by custom metadata separated by tabs with + * the metadata key separated from the corresponding value by '='. + * </p> + * <p> + * Note, that some metadata keys are reserved: + * <dl> + * <dt>nutch.score</dt> + * <dd>allows to set a custom score for a specific URL</dd> + * <dt>nutch.fetchInterval</dt> + * <dd>allows to set a custom fetch interval for a specific URL</dd> + * <dt>nutch.fetchInterval.fixed</dt> + * <dd>allows to set a custom fetch interval for a specific URL that is not + * changed by AdaptiveFetchSchedule</dd> + * </dl> + * </p> + * <p> + * Example: + * + * <pre> + * http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t userType=open_source + * </pre> + * </p> **/ public class Injector extends NutchTool implements Tool { public static final Logger LOG = LoggerFactory.getLogger(Injector.class); /** metadata key reserved for setting a custom score for a specific URL */ public static String nutchScoreMDName = "nutch.score"; + /** * metadata key reserved for setting a custom fetchInterval for a specific URL */ public static String nutchFetchIntervalMDName = "nutch.fetchInterval"; + /** * metadata key reserved for setting a fixed custom fetchInterval for a * specific URL */ public static String nutchFixedFetchIntervalMDName = "nutch.fetchInterval.fixed"; - /** Normalize and filter injected urls. */ - public static class InjectMapper implements - Mapper<WritableComparable<?>, Text, Text, CrawlDatum> { + public static class InjectMapper + extends Mapper<Text, Writable, Text, CrawlDatum> { + public static final String URL_NORMALIZING_SCOPE = "crawldb.url.normalizers.scope"; + public static final String TAB_CHARACTER = "\t"; + public static final String EQUAL_CHARACTER = "="; + private URLNormalizers urlNormalizers; private int interval; private float scoreInjected; - private JobConf jobConf; private URLFilters filters; private ScoringFilters scfilters; private long curTime; - - public void configure(JobConf job) { - this.jobConf = job; - urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT); - interval = jobConf.getInt("db.fetch.interval.default", 2592000); - filters = new URLFilters(jobConf); - scfilters = new ScoringFilters(jobConf); - scoreInjected = jobConf.getFloat("db.score.injected", 1.0f); - curTime = job - .getLong("injector.current.time", System.currentTimeMillis()); + private boolean url404Purging; + private String scope; + + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + scope = conf.get(URL_NORMALIZING_SCOPE, URLNormalizers.SCOPE_INJECT); + urlNormalizers = new URLNormalizers(conf, scope); + interval = conf.getInt("db.fetch.interval.default", 2592000); + filters = new URLFilters(conf); + scfilters = new ScoringFilters(conf); + scoreInjected = conf.getFloat("db.score.injected", 1.0f); + curTime = conf.getLong("injector.current.time", + System.currentTimeMillis()); + url404Purging = conf.getBoolean(CrawlDb.CRAWLDB_PURGE_404, false); } - public void close() { + /* Filter and normalize the input url */ + private String filterNormalize(String url) { + if (url != null) { + try { + url = urlNormalizers.normalize(url, scope); // normalize the url + url = filters.filter(url); // filter the url + } catch (Exception e) { + LOG.warn("Skipping " + url + ":" + e); + url = null; + } + } + return url; } - public void map(WritableComparable<?> key, Text value, - OutputCollector<Text, CrawlDatum> output, Reporter reporter) - throws IOException { - String url = value.toString().trim(); // value is line of text + /** + * Extract metadata that could be passed along with url in a seeds file. + * Metadata must be key-value pair(s) and separated by a TAB_CHARACTER + */ + private void processMetaData(String metadata, CrawlDatum datum, + String url) { + String[] splits = metadata.split(TAB_CHARACTER); - if (url != null && (url.length() == 0 || url.startsWith("#"))) { - /* Ignore line that start with # */ - return; - } + for (String split : splits) { + // find separation between name and value + int indexEquals = split.indexOf(EQUAL_CHARACTER); + if (indexEquals == -1) // skip anything without a EQUAL_CHARACTER + continue; - // if tabs : metadata that could be stored - // must be name=value and separated by \t - float customScore = -1f; - int customInterval = interval; - int fixedInterval = -1; - Map<String, String> metadata = new TreeMap<String, String>(); - if (url.indexOf("\t") != -1) { - String[] splits = url.split("\t"); - url = splits[0]; - for (int s = 1; s < splits.length; s++) { - // find separation between name and value - int indexEquals = splits[s].indexOf("="); - if (indexEquals == -1) { - // skip anything without a = - continue; - } - String metaname = splits[s].substring(0, indexEquals); - String metavalue = splits[s].substring(indexEquals + 1); + String metaname = split.substring(0, indexEquals); + String metavalue = split.substring(indexEquals + 1); + + try { if (metaname.equals(nutchScoreMDName)) { - try { - customScore = Float.parseFloat(metavalue); - } catch (NumberFormatException nfe) { - } + datum.setScore(Float.parseFloat(metavalue)); } else if (metaname.equals(nutchFetchIntervalMDName)) { - try { - customInterval = Integer.parseInt(metavalue); - } catch (NumberFormatException nfe) { - } + datum.setFetchInterval(Integer.parseInt(metavalue)); } else if (metaname.equals(nutchFixedFetchIntervalMDName)) { - try { - fixedInterval = Integer.parseInt(metavalue); - } catch (NumberFormatException nfe) { + int fixedInterval = Integer.parseInt(metavalue); + if (fixedInterval > -1) { + // Set writable using float. Float is used by + // AdaptiveFetchSchedule + datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY, + new FloatWritable(fixedInterval)); + datum.setFetchInterval(fixedInterval); } - } else - metadata.put(metaname, metavalue); - } - } - try { - url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT); - url = filters.filter(url); // filter the url - } catch (Exception e) { - if (LOG.isWarnEnabled()) { - LOG.warn("Skipping " + url + ":" + e); + } else { + datum.getMetaData().put(new Text(metaname), new Text(metavalue)); + } + } catch (NumberFormatException nfe) { + LOG.error("Invalid number '" + metavalue + "' in metadata '" + + metaname + "' for url " + url); } - url = null; } - if (url == null) { - reporter.getCounter("injector", "urls_filtered").increment(1); - } else { // if it passes - value.set(url); // collect it - CrawlDatum datum = new CrawlDatum(); - datum.setStatus(CrawlDatum.STATUS_INJECTED); - - // Is interval custom? Then set as meta data - if (fixedInterval > -1) { - // Set writable using float. Flaot is used by - // AdaptiveFetchSchedule - datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY, - new FloatWritable(fixedInterval)); - datum.setFetchInterval(fixedInterval); - } else { - datum.setFetchInterval(customInterval); - } + } - datum.setFetchTime(curTime); - // now add the metadata - Iterator<String> keysIter = metadata.keySet().iterator(); - while (keysIter.hasNext()) { - String keymd = keysIter.next(); - String valuemd = metadata.get(keymd); - datum.getMetaData().put(new Text(keymd), new Text(valuemd)); - } - if (customScore != -1) - datum.setScore(customScore); - else + public void map(Text key, Writable value, Context context) + throws IOException, InterruptedException { + if (value instanceof Text) { + // if its a url from the seed list + String url = key.toString().trim(); + + // remove empty string or string starting with '#' + if (url.length() == 0 || url.startsWith("#")) + return; + + url = filterNormalize(url); + if (url == null) { + context.getCounter("injector", "urls_filtered").increment(1); + } else { + CrawlDatum datum = new CrawlDatum(); + datum.setStatus(CrawlDatum.STATUS_INJECTED); + datum.setFetchTime(curTime); datum.setScore(scoreInjected); - try { - scfilters.injectedScore(value, datum); - } catch (ScoringFilterException e) { - if (LOG.isWarnEnabled()) { - LOG.warn("Cannot filter injected score for url " + url - + ", using default (" + e.getMessage() + ")"); + datum.setFetchInterval(interval); + + String metadata = value.toString().trim(); + if (metadata.length() > 0) + processMetaData(metadata, datum, url); + + try { + key.set(url); + scfilters.injectedScore(key, datum); + } catch (ScoringFilterException e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Cannot filter injected score for url " + url + + ", using default (" + e.getMessage() + ")"); + } } + context.getCounter("injector", "urls_injected").increment(1); + context.write(key, datum); + } + } else if (value instanceof CrawlDatum) { + // if its a crawlDatum from the input crawldb, emulate CrawlDbFilter's + // map() + CrawlDatum datum = (CrawlDatum) value; + + // remove 404 urls + if (url404Purging && CrawlDatum.STATUS_DB_GONE == datum.getStatus()) + return; + + String url = filterNormalize(key.toString()); + if (url != null) { + key.set(url); + context.write(key, datum); } - reporter.getCounter("injector", "urls_injected").increment(1); - output.collect(value, datum); } } } /** Combine multiple new entries for a url. */ - public static class InjectReducer implements - Reducer<Text, CrawlDatum, Text, CrawlDatum> { + public static class InjectReducer + extends Reducer<Text, CrawlDatum, Text, CrawlDatum> { private int interval; private float scoreInjected; private boolean overwrite = false; private boolean update = false; + private CrawlDatum old = new CrawlDatum(); + private CrawlDatum injected = new CrawlDatum(); - public void configure(JobConf job) { - interval = job.getInt("db.fetch.interval.default", 2592000); - scoreInjected = job.getFloat("db.score.injected", 1.0f); - overwrite = job.getBoolean("db.injector.overwrite", false); - update = job.getBoolean("db.injector.update", false); + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + interval = conf.getInt("db.fetch.interval.default", 2592000); + scoreInjected = conf.getFloat("db.score.injected", 1.0f); + overwrite = conf.getBoolean("db.injector.overwrite", false); + update = conf.getBoolean("db.injector.update", false); LOG.info("Injector: overwrite: " + overwrite); LOG.info("Injector: update: " + update); } - public void close() { - } - - private CrawlDatum old = new CrawlDatum(); - private CrawlDatum injected = new CrawlDatum(); + /** + * Merge the input records as per rules below : + * + * <pre> + * 1. If there is ONLY new injected record ==> emit injected record + * 2. If there is ONLY old record ==> emit existing record + * 3. If BOTH new and old records are present: + * (a) If 'overwrite' is true ==> emit injected record + * (b) If 'overwrite' is false : + * (i) If 'update' is false ==> emit existing record + * (ii) If 'update' is true ==> update existing record and emit it + * </pre> + * + * For more details @see NUTCH-1405 + */ + public void reduce(Text key, Iterable<CrawlDatum> values, Context context) + throws IOException, InterruptedException { - public void reduce(Text key, Iterator<CrawlDatum> values, - OutputCollector<Text, CrawlDatum> output, Reporter reporter) - throws IOException { boolean oldSet = false; boolean injectedSet = false; - while (values.hasNext()) { - CrawlDatum val = values.next(); + + // If we encounter a datum with status as STATUS_INJECTED, then its a + // newly injected record. All other statuses correspond to an old record. + for (CrawlDatum val : values) { if (val.getStatus() == CrawlDatum.STATUS_INJECTED) { injected.set(val); injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); @@ -230,41 +286,29 @@ public class Injector extends NutchTool implements Tool { old.set(val); oldSet = true; } - } - CrawlDatum res = null; - - // Old default behaviour - if (injectedSet && !oldSet) { - res = injected; + CrawlDatum result; + if (injectedSet && (!oldSet || overwrite)) { + // corresponds to rules (1) and (3.a) in the method description + result = injected; } else { - res = old; + // corresponds to rules (2) and (3.b) in the method description + result = old; + + if (injectedSet && update) { + // corresponds to rule (3.b.ii) in the method description + old.putAllMetaData(injected); + old.setScore(injected.getScore() != scoreInjected + ? injected.getScore() : old.getScore()); + old.setFetchInterval(injected.getFetchInterval() != interval + ? injected.getFetchInterval() : old.getFetchInterval()); + } } if (injectedSet && oldSet) { - reporter.getCounter("injector", "urls_merged").increment(1); - } - /** - * Whether to overwrite, ignore or update existing records - * - * @see https://issues.apache.org/jira/browse/NUTCH-1405 - */ - // Injected record already exists and update but not overwrite - if (injectedSet && oldSet && update && !overwrite) { - res = old; - old.putAllMetaData(injected); - old.setScore(injected.getScore() != scoreInjected ? injected.getScore() - : old.getScore()); - old.setFetchInterval(injected.getFetchInterval() != interval ? injected - .getFetchInterval() : old.getFetchInterval()); + context.getCounter("injector", "urls_merged").increment(1); } - - // Injected record already exists and overwrite - if (injectedSet && oldSet && overwrite) { - res = injected; - } - - output.collect(key, res); + context.write(key, result); } } @@ -275,94 +319,121 @@ public class Injector extends NutchTool implements Tool { setConf(conf); } - public void inject(Path crawlDb, Path urlDir) throws IOException { + public void inject(Path crawlDb, Path urlDir) throws Exception { + inject(crawlDb, urlDir, false, false); + } + + public void inject(Path crawlDb, Path urlDir, boolean overwrite, + boolean update) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); + if (LOG.isInfoEnabled()) { LOG.info("Injector: starting at " + sdf.format(start)); LOG.info("Injector: crawlDb: " + crawlDb); LOG.info("Injector: urlDir: " + urlDir); - } - - Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") - + "/inject-temp-" - + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); - - // map text input file to a <url,CrawlDatum> file - if (LOG.isInfoEnabled()) { LOG.info("Injector: Converting injected urls to crawl db entries."); } - FileSystem fs = FileSystem.get(getConf()); - // determine if the crawldb already exists - boolean dbExists = fs.exists(crawlDb); - - JobConf sortJob = new NutchJob(getConf()); - sortJob.setJobName("inject " + urlDir); - FileInputFormat.addInputPath(sortJob, urlDir); - sortJob.setMapperClass(InjectMapper.class); - - FileOutputFormat.setOutputPath(sortJob, tempDir); - if (dbExists) { - // Don't run merge injected urls, wait for merge with - // existing DB - sortJob.setOutputFormat(SequenceFileOutputFormat.class); - sortJob.setNumReduceTasks(0); - } else { - sortJob.setOutputFormat(MapFileOutputFormat.class); - sortJob.setReducerClass(InjectReducer.class); - sortJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", - false); - } - sortJob.setOutputKeyClass(Text.class); - sortJob.setOutputValueClass(CrawlDatum.class); - sortJob.setLong("injector.current.time", System.currentTimeMillis()); + // set configuration + Configuration conf = getConf(); + conf.setLong("injector.current.time", System.currentTimeMillis()); + conf.setBoolean("db.injector.overwrite", overwrite); + conf.setBoolean("db.injector.update", update); + conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); + + // create all the required paths + FileSystem fs = FileSystem.get(conf); + Path current = new Path(crawlDb, CrawlDb.CURRENT_NAME); + if (!fs.exists(current)) + fs.mkdirs(current); + + Path tempCrawlDb = new Path(crawlDb, + "crawldb-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); + + // lock an existing crawldb to prevent multiple simultaneous updates + Path lock = new Path(crawlDb, CrawlDb.LOCK_NAME); + LockUtil.createLockFile(fs, lock, false); + + // configure job + Job job = Job.getInstance(conf, "inject " + urlDir); + job.setJarByClass(Injector.class); + job.setMapperClass(InjectMapper.class); + job.setReducerClass(InjectReducer.class); + job.setOutputFormatClass(MapFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(CrawlDatum.class); + job.setSpeculativeExecution(false); + + // set input and output paths of the job + MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class); + MultipleInputs.addInputPath(job, urlDir, KeyValueTextInputFormat.class); + FileOutputFormat.setOutputPath(job, tempCrawlDb); - RunningJob mapJob = null; try { - mapJob = JobClient.runJob(sortJob); - } catch (IOException e) { - fs.delete(tempDir, true); - throw e; - } - long urlsInjected = mapJob.getCounters() - .findCounter("injector", "urls_injected").getValue(); - long urlsFiltered = mapJob.getCounters() - .findCounter("injector", "urls_filtered").getValue(); - LOG.info("Injector: Total number of urls rejected by filters: " - + urlsFiltered); - LOG.info("Injector: Total number of urls after normalization: " - + urlsInjected); - long urlsMerged = 0; - if (dbExists) { - // merge with existing crawl db + // run the job + job.waitForCompletion(true); + + // save output and perform cleanup + CrawlDb.install(job, crawlDb); + if (LOG.isInfoEnabled()) { - LOG.info("Injector: Merging injected urls into crawl db."); + long urlsInjected = job.getCounters() + .findCounter("injector", "urls_injected").getValue(); + long urlsFiltered = job.getCounters() + .findCounter("injector", "urls_filtered").getValue(); + long urlsMerged = job.getCounters() + .findCounter("injector", "urls_merged").getValue(); + LOG.info("Injector: Total urls rejected by filters: " + urlsFiltered); + LOG.info( + "Injector: Total urls injected after normalization and filtering: " + + urlsInjected); + LOG.info("Injector: Total urls injected but already in CrawlDb: " + + urlsMerged); + LOG.info("Injector: Total new urls injected: " + + (urlsInjected - urlsMerged)); + + long end = System.currentTimeMillis(); + LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); } - JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb); - FileInputFormat.addInputPath(mergeJob, tempDir); - mergeJob.setReducerClass(InjectReducer.class); - try { - RunningJob merge = JobClient.runJob(mergeJob); - urlsMerged = merge.getCounters().findCounter("injector", "urls_merged") - .getValue(); - LOG.info("Injector: URLs merged: " + urlsMerged); - } catch (IOException e) { - fs.delete(tempDir, true); - throw e; + } catch (Exception e) { + if (fs.exists(tempCrawlDb)) { + fs.delete(tempCrawlDb, true); } - CrawlDb.install(mergeJob, crawlDb); - } else { - CrawlDb.install(sortJob, crawlDb); + LockUtil.removeLockFile(fs, lock); + throw e; } + } - // clean up - fs.delete(tempDir, true); - LOG.info("Injector: Total new urls injected: " - + (urlsInjected - urlsMerged)); - long end = System.currentTimeMillis(); - LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: " - + TimingUtil.elapsedTime(start, end)); + public void usage() { + System.err.println( + "Usage: Injector <crawldb> <url_dir> [-overwrite] [-update]\n"); + System.err.println( + " <crawldb>\tPath to a crawldb directory. If not present, a new one would be created."); + System.err.println( + " <url_dir>\tPath to directory with URL file(s) containing urls to be injected. A URL file"); + System.err.println( + " \tshould have one URL per line, optionally followed by custom metadata."); + System.err.println( + " \tBlank lines or lines starting with a '#' would be ignored. Custom metadata must"); + System.err + .println(" \tbe of form 'key=value' and separated by tabs."); + System.err.println(" \tBelow are reserved metadata keys:\n"); + System.err.println(" \t\tnutch.score: A custom score for a url"); + System.err.println( + " \t\tnutch.fetchInterval: A custom fetch interval for a url"); + System.err.println( + " \t\tnutch.fetchInterval.fixed: A custom fetch interval for a url that is not " + + "changed by AdaptiveFetchSchedule\n"); + System.err.println(" \tExample:"); + System.err.println(" \t http://www.apache.org/"); + System.err.println( + " \t http://www.nutch.org/ \\t nutch.score=10 \\t nutch.fetchInterval=2592000 \\t userType=open_source\n"); + System.err.println( + " -overwrite\tOverwite existing crawldb records by the injected records. Has precedence over 'update'"); + System.err.println( + " -update \tUpdate existing crawldb records with the injected records. Old metadata is preserved"); } public static void main(String[] args) throws Exception { @@ -372,11 +443,27 @@ public class Injector extends NutchTool implements Tool { public int run(String[] args) throws Exception { if (args.length < 2) { - System.err.println("Usage: Injector <crawldb> <url_dir>"); + usage(); return -1; } + + boolean overwrite = false; + boolean update = false; + + for (int i = 2; i < args.length; i++) { + if (args[i].equals("-overwrite")) { + overwrite = true; + } else if (args[i].equals("-update")) { + update = true; + } else { + LOG.info("Injector: Found invalid argument \"" + args[i] + "\"\n"); + usage(); + return -1; + } + } + try { - inject(new Path(args[0]), new Path(args[1])); + inject(new Path(args[0]), new Path(args[1]), overwrite, update); return 0; } catch (Exception e) { LOG.error("Injector: " + StringUtils.stringifyException(e)); @@ -384,43 +471,39 @@ public class Injector extends NutchTool implements Tool { } } - @Override /** * Used by the Nutch REST service */ - public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception { - if(args.size()<1){ + public Map<String, Object> run(Map<String, Object> args, String crawlId) + throws Exception { + if (args.size() < 1) { throw new IllegalArgumentException("Required arguments <url_dir>"); } Map<String, Object> results = new HashMap<String, Object>(); Path crawlDb; - if(args.containsKey(Nutch.ARG_CRAWLDB)) { + if (args.containsKey(Nutch.ARG_CRAWLDB)) { Object crawldbPath = args.get(Nutch.ARG_CRAWLDB); - if(crawldbPath instanceof Path) { + if (crawldbPath instanceof Path) { crawlDb = (Path) crawldbPath; - } - else { + } else { crawlDb = new Path(crawldbPath.toString()); } - } - else { - crawlDb = new Path(crawlId+"/crawldb"); + } else { + crawlDb = new Path(crawlId + "/crawldb"); } Path input; Object path = args.get(Nutch.ARG_SEEDDIR); - if(path instanceof Path) { + if (path instanceof Path) { input = (Path) path; - } - else { + } else { input = new Path(path.toString()); } inject(crawlDb, input); results.put(Nutch.VAL_RESULT, Integer.toString(0)); return results; - } }