NUTCH-1712 applied to current trunk; run first simple tests (inject + merge)


Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/3c691eb2
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/3c691eb2
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/3c691eb2

Branch: refs/heads/master
Commit: 3c691eb2823cb85c9ffe95e9212ce7ac0e564709
Parents: 25e879a
Author: Sebastian Nagel <sna...@apache.org>
Authored: Mon Oct 19 21:48:05 2015 +0200
Committer: Sebastian Nagel <sna...@apache.org>
Committed: Thu Feb 25 21:26:30 2016 +0100

----------------------------------------------------------------------
 src/java/org/apache/nutch/crawl/CrawlDb.java  |  19 +
 src/java/org/apache/nutch/crawl/Injector.java | 599 ++++++++++++---------
 2 files changed, 360 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nutch/blob/3c691eb2/src/java/org/apache/nutch/crawl/CrawlDb.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/nutch/crawl/CrawlDb.java 
b/src/java/org/apache/nutch/crawl/CrawlDb.java
index 053e8fb..1537cdc 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDb.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDb.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.*;
 import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.util.FSUtils;
 import org.apache.nutch.util.HadoopFSUtil;
 import org.apache.nutch.util.LockUtil;
 import org.apache.nutch.util.NutchConfiguration;
@@ -173,6 +175,23 @@ public class CrawlDb extends NutchTool implements Tool {
     LockUtil.removeLockFile(fs, lock);
   }
 
+  public static void install(Job job, Path crawlDb) throws IOException {
+    Configuration conf = job.getConfiguration();
+    boolean preserveBackup = conf.getBoolean("db.preserve.backup", true);
+    FileSystem fs = FileSystem.get(conf);
+    Path old = new Path(crawlDb, "old");
+    Path current = new Path(crawlDb, CURRENT_NAME);
+    Path tempCrawlDb = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+        .getOutputPath(job);
+    FSUtils.replace(fs, old, current, true);
+    FSUtils.replace(fs, current, tempCrawlDb, true);
+    Path lock = new Path(crawlDb, LOCK_NAME);
+    LockUtil.removeLockFile(fs, lock);
+    if (!preserveBackup && fs.exists(old)) {
+      fs.delete(old, true);
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     int res = ToolRunner.run(NutchConfiguration.create(), new CrawlDb(), args);
     System.exit(res);

http://git-wip-us.apache.org/repos/asf/nutch/blob/3c691eb2/src/java/org/apache/nutch/crawl/Injector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/nutch/crawl/Injector.java 
b/src/java/org/apache/nutch/crawl/Injector.java
index dc1f1cf..0d01dc8 100644
--- a/src/java/org/apache/nutch/crawl/Injector.java
+++ b/src/java/org/apache/nutch/crawl/Injector.java
@@ -17,211 +17,267 @@
 
 package org.apache.nutch.crawl;
 
-import java.io.*;
-import java.text.SimpleDateFormat;
-import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 
-// Commons Logging imports
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
-import org.apache.nutch.net.*;
 import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
 import org.apache.nutch.scoring.ScoringFilterException;
 import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.LockUtil;
 import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.NutchTool;
 import org.apache.nutch.util.TimingUtil;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
 /**
- * This class takes a flat file of URLs and adds them to the of pages to be
- * crawled. Useful for bootstrapping the system. The URL files contain one URL
- * per line, optionally followed by custom metadata separated by tabs with the
- * metadata key separated from the corresponding value by '='. <br>
- * Note that some metadata keys are reserved : <br>
- * - <i>nutch.score</i> : allows to set a custom score for a specific URL <br>
- * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a
- * specific URL <br>
- * - <i>nutch.fetchInterval.fixed</i> : allows to set a custom fetch interval
- * for a specific URL that is not changed by AdaptiveFetchSchedule <br>
- * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000
- * \t userType=open_source
+ * Injector takes a flat file of URLs and merges ("injects") these URLs into 
the
+ * CrawlDb. Useful for bootstrapping a Nutch crawl. The URL files contain one
+ * URL per line, optionally followed by custom metadata separated by tabs with
+ * the metadata key separated from the corresponding value by '='.
+ * </p>
+ * <p>
+ * Note, that some metadata keys are reserved:
+ * <dl>
+ * <dt>nutch.score</dt>
+ * <dd>allows to set a custom score for a specific URL</dd>
+ * <dt>nutch.fetchInterval</dt>
+ * <dd>allows to set a custom fetch interval for a specific URL</dd>
+ * <dt>nutch.fetchInterval.fixed</dt>
+ * <dd>allows to set a custom fetch interval for a specific URL that is not
+ * changed by AdaptiveFetchSchedule</dd>
+ * </dl>
+ * </p>
+ * <p>
+ * Example:
+ * 
+ * <pre>
+ *  http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t 
userType=open_source
+ * </pre>
+ * </p>
  **/
 public class Injector extends NutchTool implements Tool {
   public static final Logger LOG = LoggerFactory.getLogger(Injector.class);
 
   /** metadata key reserved for setting a custom score for a specific URL */
   public static String nutchScoreMDName = "nutch.score";
+
   /**
    * metadata key reserved for setting a custom fetchInterval for a specific 
URL
    */
   public static String nutchFetchIntervalMDName = "nutch.fetchInterval";
+
   /**
    * metadata key reserved for setting a fixed custom fetchInterval for a
    * specific URL
    */
   public static String nutchFixedFetchIntervalMDName = 
"nutch.fetchInterval.fixed";
 
-  /** Normalize and filter injected urls. */
-  public static class InjectMapper implements
-      Mapper<WritableComparable<?>, Text, Text, CrawlDatum> {
+  public static class InjectMapper
+      extends Mapper<Text, Writable, Text, CrawlDatum> {
+    public static final String URL_NORMALIZING_SCOPE = 
"crawldb.url.normalizers.scope";
+    public static final String TAB_CHARACTER = "\t";
+    public static final String EQUAL_CHARACTER = "=";
+
     private URLNormalizers urlNormalizers;
     private int interval;
     private float scoreInjected;
-    private JobConf jobConf;
     private URLFilters filters;
     private ScoringFilters scfilters;
     private long curTime;
-
-    public void configure(JobConf job) {
-      this.jobConf = job;
-      urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT);
-      interval = jobConf.getInt("db.fetch.interval.default", 2592000);
-      filters = new URLFilters(jobConf);
-      scfilters = new ScoringFilters(jobConf);
-      scoreInjected = jobConf.getFloat("db.score.injected", 1.0f);
-      curTime = job
-          .getLong("injector.current.time", System.currentTimeMillis());
+    private boolean url404Purging;
+    private String scope;
+
+    public void setup(Context context) {
+      Configuration conf = context.getConfiguration();
+      scope = conf.get(URL_NORMALIZING_SCOPE, URLNormalizers.SCOPE_INJECT);
+      urlNormalizers = new URLNormalizers(conf, scope);
+      interval = conf.getInt("db.fetch.interval.default", 2592000);
+      filters = new URLFilters(conf);
+      scfilters = new ScoringFilters(conf);
+      scoreInjected = conf.getFloat("db.score.injected", 1.0f);
+      curTime = conf.getLong("injector.current.time",
+          System.currentTimeMillis());
+      url404Purging = conf.getBoolean(CrawlDb.CRAWLDB_PURGE_404, false);
     }
 
-    public void close() {
+    /* Filter and normalize the input url */
+    private String filterNormalize(String url) {
+      if (url != null) {
+        try {
+          url = urlNormalizers.normalize(url, scope); // normalize the url
+          url = filters.filter(url); // filter the url
+        } catch (Exception e) {
+          LOG.warn("Skipping " + url + ":" + e);
+          url = null;
+        }
+      }
+      return url;
     }
 
-    public void map(WritableComparable<?> key, Text value,
-        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
-        throws IOException {
-      String url = value.toString().trim(); // value is line of text
+    /**
+     * Extract metadata that could be passed along with url in a seeds file.
+     * Metadata must be key-value pair(s) and separated by a TAB_CHARACTER
+     */
+    private void processMetaData(String metadata, CrawlDatum datum,
+        String url) {
+      String[] splits = metadata.split(TAB_CHARACTER);
 
-      if (url != null && (url.length() == 0 || url.startsWith("#"))) {
-        /* Ignore line that start with # */
-        return;
-      }
+      for (String split : splits) {
+        // find separation between name and value
+        int indexEquals = split.indexOf(EQUAL_CHARACTER);
+        if (indexEquals == -1) // skip anything without a EQUAL_CHARACTER
+          continue;
 
-      // if tabs : metadata that could be stored
-      // must be name=value and separated by \t
-      float customScore = -1f;
-      int customInterval = interval;
-      int fixedInterval = -1;
-      Map<String, String> metadata = new TreeMap<String, String>();
-      if (url.indexOf("\t") != -1) {
-        String[] splits = url.split("\t");
-        url = splits[0];
-        for (int s = 1; s < splits.length; s++) {
-          // find separation between name and value
-          int indexEquals = splits[s].indexOf("=");
-          if (indexEquals == -1) {
-            // skip anything without a =
-            continue;
-          }
-          String metaname = splits[s].substring(0, indexEquals);
-          String metavalue = splits[s].substring(indexEquals + 1);
+        String metaname = split.substring(0, indexEquals);
+        String metavalue = split.substring(indexEquals + 1);
+
+        try {
           if (metaname.equals(nutchScoreMDName)) {
-            try {
-              customScore = Float.parseFloat(metavalue);
-            } catch (NumberFormatException nfe) {
-            }
+            datum.setScore(Float.parseFloat(metavalue));
           } else if (metaname.equals(nutchFetchIntervalMDName)) {
-            try {
-              customInterval = Integer.parseInt(metavalue);
-            } catch (NumberFormatException nfe) {
-            }
+            datum.setFetchInterval(Integer.parseInt(metavalue));
           } else if (metaname.equals(nutchFixedFetchIntervalMDName)) {
-            try {
-              fixedInterval = Integer.parseInt(metavalue);
-            } catch (NumberFormatException nfe) {
+            int fixedInterval = Integer.parseInt(metavalue);
+            if (fixedInterval > -1) {
+              // Set writable using float. Float is used by
+              // AdaptiveFetchSchedule
+              datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY,
+                  new FloatWritable(fixedInterval));
+              datum.setFetchInterval(fixedInterval);
             }
-          } else
-            metadata.put(metaname, metavalue);
-        }
-      }
-      try {
-        url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
-        url = filters.filter(url); // filter the url
-      } catch (Exception e) {
-        if (LOG.isWarnEnabled()) {
-          LOG.warn("Skipping " + url + ":" + e);
+          } else {
+            datum.getMetaData().put(new Text(metaname), new Text(metavalue));
+          }
+        } catch (NumberFormatException nfe) {
+          LOG.error("Invalid number '" + metavalue + "' in metadata '"
+              + metaname + "' for url " + url);
         }
-        url = null;
       }
-      if (url == null) {
-        reporter.getCounter("injector", "urls_filtered").increment(1);
-      } else { // if it passes
-        value.set(url); // collect it
-        CrawlDatum datum = new CrawlDatum();
-        datum.setStatus(CrawlDatum.STATUS_INJECTED);
-
-        // Is interval custom? Then set as meta data
-        if (fixedInterval > -1) {
-          // Set writable using float. Flaot is used by
-          // AdaptiveFetchSchedule
-          datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY,
-              new FloatWritable(fixedInterval));
-          datum.setFetchInterval(fixedInterval);
-        } else {
-          datum.setFetchInterval(customInterval);
-        }
+    }
 
-        datum.setFetchTime(curTime);
-        // now add the metadata
-        Iterator<String> keysIter = metadata.keySet().iterator();
-        while (keysIter.hasNext()) {
-          String keymd = keysIter.next();
-          String valuemd = metadata.get(keymd);
-          datum.getMetaData().put(new Text(keymd), new Text(valuemd));
-        }
-        if (customScore != -1)
-          datum.setScore(customScore);
-        else
+    public void map(Text key, Writable value, Context context)
+        throws IOException, InterruptedException {
+      if (value instanceof Text) {
+        // if its a url from the seed list
+        String url = key.toString().trim();
+
+        // remove empty string or string starting with '#'
+        if (url.length() == 0 || url.startsWith("#"))
+          return;
+
+        url = filterNormalize(url);
+        if (url == null) {
+          context.getCounter("injector", "urls_filtered").increment(1);
+        } else {
+          CrawlDatum datum = new CrawlDatum();
+          datum.setStatus(CrawlDatum.STATUS_INJECTED);
+          datum.setFetchTime(curTime);
           datum.setScore(scoreInjected);
-        try {
-          scfilters.injectedScore(value, datum);
-        } catch (ScoringFilterException e) {
-          if (LOG.isWarnEnabled()) {
-            LOG.warn("Cannot filter injected score for url " + url
-                + ", using default (" + e.getMessage() + ")");
+          datum.setFetchInterval(interval);
+
+          String metadata = value.toString().trim();
+          if (metadata.length() > 0)
+            processMetaData(metadata, datum, url);
+
+          try {
+            key.set(url);
+            scfilters.injectedScore(key, datum);
+          } catch (ScoringFilterException e) {
+            if (LOG.isWarnEnabled()) {
+              LOG.warn("Cannot filter injected score for url " + url
+                  + ", using default (" + e.getMessage() + ")");
+            }
           }
+          context.getCounter("injector", "urls_injected").increment(1);
+          context.write(key, datum);
+        }
+      } else if (value instanceof CrawlDatum) {
+        // if its a crawlDatum from the input crawldb, emulate CrawlDbFilter's
+        // map()
+        CrawlDatum datum = (CrawlDatum) value;
+
+        // remove 404 urls
+        if (url404Purging && CrawlDatum.STATUS_DB_GONE == datum.getStatus())
+          return;
+
+        String url = filterNormalize(key.toString());
+        if (url != null) {
+          key.set(url);
+          context.write(key, datum);
         }
-        reporter.getCounter("injector", "urls_injected").increment(1);
-        output.collect(value, datum);
       }
     }
   }
 
   /** Combine multiple new entries for a url. */
-  public static class InjectReducer implements
-      Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+  public static class InjectReducer
+      extends Reducer<Text, CrawlDatum, Text, CrawlDatum> {
     private int interval;
     private float scoreInjected;
     private boolean overwrite = false;
     private boolean update = false;
+    private CrawlDatum old = new CrawlDatum();
+    private CrawlDatum injected = new CrawlDatum();
 
-    public void configure(JobConf job) {
-      interval = job.getInt("db.fetch.interval.default", 2592000);
-      scoreInjected = job.getFloat("db.score.injected", 1.0f);
-      overwrite = job.getBoolean("db.injector.overwrite", false);
-      update = job.getBoolean("db.injector.update", false);
+    public void setup(Context context) {
+      Configuration conf = context.getConfiguration();
+      interval = conf.getInt("db.fetch.interval.default", 2592000);
+      scoreInjected = conf.getFloat("db.score.injected", 1.0f);
+      overwrite = conf.getBoolean("db.injector.overwrite", false);
+      update = conf.getBoolean("db.injector.update", false);
       LOG.info("Injector: overwrite: " + overwrite);
       LOG.info("Injector: update: " + update);
     }
 
-    public void close() {
-    }
-
-    private CrawlDatum old = new CrawlDatum();
-    private CrawlDatum injected = new CrawlDatum();
+    /**
+     * Merge the input records as per rules below :
+     * 
+     * <pre>
+     * 1. If there is ONLY new injected record ==> emit injected record
+     * 2. If there is ONLY old record          ==> emit existing record
+     * 3. If BOTH new and old records are present:
+     *    (a) If 'overwrite' is true           ==> emit injected record
+     *    (b) If 'overwrite' is false :
+     *        (i)  If 'update' is false        ==> emit existing record
+     *        (ii) If 'update' is true         ==> update existing record and 
emit it
+     * </pre>
+     * 
+     * For more details @see NUTCH-1405
+     */
+    public void reduce(Text key, Iterable<CrawlDatum> values, Context context)
+        throws IOException, InterruptedException {
 
-    public void reduce(Text key, Iterator<CrawlDatum> values,
-        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
-        throws IOException {
       boolean oldSet = false;
       boolean injectedSet = false;
-      while (values.hasNext()) {
-        CrawlDatum val = values.next();
+
+      // If we encounter a datum with status as STATUS_INJECTED, then its a
+      // newly injected record. All other statuses correspond to an old record.
+      for (CrawlDatum val : values) {
         if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
           injected.set(val);
           injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
@@ -230,41 +286,29 @@ public class Injector extends NutchTool implements Tool {
           old.set(val);
           oldSet = true;
         }
-
       }
 
-      CrawlDatum res = null;
-
-      // Old default behaviour
-      if (injectedSet && !oldSet) {
-        res = injected;
+      CrawlDatum result;
+      if (injectedSet && (!oldSet || overwrite)) {
+        // corresponds to rules (1) and (3.a) in the method description
+        result = injected;
       } else {
-        res = old;
+        // corresponds to rules (2) and (3.b) in the method description
+        result = old;
+
+        if (injectedSet && update) {
+          // corresponds to rule (3.b.ii) in the method description
+          old.putAllMetaData(injected);
+          old.setScore(injected.getScore() != scoreInjected
+              ? injected.getScore() : old.getScore());
+          old.setFetchInterval(injected.getFetchInterval() != interval
+              ? injected.getFetchInterval() : old.getFetchInterval());
+        }
       }
       if (injectedSet && oldSet) {
-        reporter.getCounter("injector", "urls_merged").increment(1);
-      }
-      /**
-       * Whether to overwrite, ignore or update existing records
-       * 
-       * @see https://issues.apache.org/jira/browse/NUTCH-1405
-       */
-      // Injected record already exists and update but not overwrite
-      if (injectedSet && oldSet && update && !overwrite) {
-        res = old;
-        old.putAllMetaData(injected);
-        old.setScore(injected.getScore() != scoreInjected ? injected.getScore()
-            : old.getScore());
-        old.setFetchInterval(injected.getFetchInterval() != interval ? injected
-            .getFetchInterval() : old.getFetchInterval());
+        context.getCounter("injector", "urls_merged").increment(1);
       }
-
-      // Injected record already exists and overwrite
-      if (injectedSet && oldSet && overwrite) {
-        res = injected;
-      }
-
-      output.collect(key, res);
+      context.write(key, result);
     }
   }
 
@@ -275,94 +319,121 @@ public class Injector extends NutchTool implements Tool {
     setConf(conf);
   }
 
-  public void inject(Path crawlDb, Path urlDir) throws IOException {
+  public void inject(Path crawlDb, Path urlDir) throws Exception {
+    inject(crawlDb, urlDir, false, false);
+  }
+
+  public void inject(Path crawlDb, Path urlDir, boolean overwrite,
+      boolean update) throws Exception {
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     long start = System.currentTimeMillis();
+
     if (LOG.isInfoEnabled()) {
       LOG.info("Injector: starting at " + sdf.format(start));
       LOG.info("Injector: crawlDb: " + crawlDb);
       LOG.info("Injector: urlDir: " + urlDir);
-    }
-
-    Path tempDir = new Path(getConf().get("mapred.temp.dir", ".")
-        + "/inject-temp-"
-        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
-
-    // map text input file to a <url,CrawlDatum> file
-    if (LOG.isInfoEnabled()) {
       LOG.info("Injector: Converting injected urls to crawl db entries.");
     }
 
-    FileSystem fs = FileSystem.get(getConf());
-    // determine if the crawldb already exists
-    boolean dbExists = fs.exists(crawlDb);
-
-    JobConf sortJob = new NutchJob(getConf());
-    sortJob.setJobName("inject " + urlDir);
-    FileInputFormat.addInputPath(sortJob, urlDir);
-    sortJob.setMapperClass(InjectMapper.class);
-
-    FileOutputFormat.setOutputPath(sortJob, tempDir);
-    if (dbExists) {
-      // Don't run merge injected urls, wait for merge with
-      // existing DB
-      sortJob.setOutputFormat(SequenceFileOutputFormat.class);
-      sortJob.setNumReduceTasks(0);
-    } else {
-      sortJob.setOutputFormat(MapFileOutputFormat.class);
-      sortJob.setReducerClass(InjectReducer.class);
-      sortJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
-          false);
-    }
-    sortJob.setOutputKeyClass(Text.class);
-    sortJob.setOutputValueClass(CrawlDatum.class);
-    sortJob.setLong("injector.current.time", System.currentTimeMillis());
+    // set configuration
+    Configuration conf = getConf();
+    conf.setLong("injector.current.time", System.currentTimeMillis());
+    conf.setBoolean("db.injector.overwrite", overwrite);
+    conf.setBoolean("db.injector.update", update);
+    conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+
+    // create all the required paths
+    FileSystem fs = FileSystem.get(conf);
+    Path current = new Path(crawlDb, CrawlDb.CURRENT_NAME);
+    if (!fs.exists(current))
+      fs.mkdirs(current);
+
+    Path tempCrawlDb = new Path(crawlDb,
+        "crawldb-" + Integer.toString(new 
Random().nextInt(Integer.MAX_VALUE)));
+
+    // lock an existing crawldb to prevent multiple simultaneous updates
+    Path lock = new Path(crawlDb, CrawlDb.LOCK_NAME);
+    LockUtil.createLockFile(fs, lock, false);
+
+    // configure job
+    Job job = Job.getInstance(conf, "inject " + urlDir);
+    job.setJarByClass(Injector.class);
+    job.setMapperClass(InjectMapper.class);
+    job.setReducerClass(InjectReducer.class);
+    job.setOutputFormatClass(MapFileOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(CrawlDatum.class);
+    job.setSpeculativeExecution(false);
+
+    // set input and output paths of the job
+    MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class);
+    MultipleInputs.addInputPath(job, urlDir, KeyValueTextInputFormat.class);
+    FileOutputFormat.setOutputPath(job, tempCrawlDb);
 
-    RunningJob mapJob = null;
     try {
-      mapJob = JobClient.runJob(sortJob);
-    } catch (IOException e) {
-      fs.delete(tempDir, true);
-      throw e;
-    }
-    long urlsInjected = mapJob.getCounters()
-        .findCounter("injector", "urls_injected").getValue();
-    long urlsFiltered = mapJob.getCounters()
-        .findCounter("injector", "urls_filtered").getValue();
-    LOG.info("Injector: Total number of urls rejected by filters: "
-        + urlsFiltered);
-    LOG.info("Injector: Total number of urls after normalization: "
-        + urlsInjected);
-    long urlsMerged = 0;
-    if (dbExists) {
-      // merge with existing crawl db
+      // run the job
+      job.waitForCompletion(true);
+
+      // save output and perform cleanup
+      CrawlDb.install(job, crawlDb);
+
       if (LOG.isInfoEnabled()) {
-        LOG.info("Injector: Merging injected urls into crawl db.");
+        long urlsInjected = job.getCounters()
+            .findCounter("injector", "urls_injected").getValue();
+        long urlsFiltered = job.getCounters()
+            .findCounter("injector", "urls_filtered").getValue();
+        long urlsMerged = job.getCounters()
+            .findCounter("injector", "urls_merged").getValue();
+        LOG.info("Injector: Total urls rejected by filters: " + urlsFiltered);
+        LOG.info(
+            "Injector: Total urls injected after normalization and filtering: "
+                + urlsInjected);
+        LOG.info("Injector: Total urls injected but already in CrawlDb: "
+            + urlsMerged);
+        LOG.info("Injector: Total new urls injected: "
+            + (urlsInjected - urlsMerged));
+
+        long end = System.currentTimeMillis();
+        LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: "
+            + TimingUtil.elapsedTime(start, end));
       }
-      JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
-      FileInputFormat.addInputPath(mergeJob, tempDir);
-      mergeJob.setReducerClass(InjectReducer.class);
-      try {
-        RunningJob merge = JobClient.runJob(mergeJob);
-        urlsMerged = merge.getCounters().findCounter("injector", "urls_merged")
-            .getValue();
-        LOG.info("Injector: URLs merged: " + urlsMerged);
-      } catch (IOException e) {
-        fs.delete(tempDir, true);
-        throw e;
+    } catch (Exception e) {
+      if (fs.exists(tempCrawlDb)) {
+        fs.delete(tempCrawlDb, true);
       }
-      CrawlDb.install(mergeJob, crawlDb);
-    } else {
-      CrawlDb.install(sortJob, crawlDb);
+      LockUtil.removeLockFile(fs, lock);
+      throw e;
     }
+  }
 
-    // clean up
-    fs.delete(tempDir, true);
-    LOG.info("Injector: Total new urls injected: "
-        + (urlsInjected - urlsMerged));
-    long end = System.currentTimeMillis();
-    LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: "
-        + TimingUtil.elapsedTime(start, end));
+  public void usage() {
+    System.err.println(
+        "Usage: Injector <crawldb> <url_dir> [-overwrite] [-update]\n");
+    System.err.println(
+        "  <crawldb>\tPath to a crawldb directory. If not present, a new one 
would be created.");
+    System.err.println(
+        "  <url_dir>\tPath to directory with URL file(s) containing urls to be 
injected. A URL file");
+    System.err.println(
+        "           \tshould have one URL per line, optionally followed by 
custom metadata.");
+    System.err.println(
+        "           \tBlank lines or lines starting with a '#' would be 
ignored. Custom metadata must");
+    System.err
+        .println("           \tbe of form 'key=value' and separated by tabs.");
+    System.err.println("           \tBelow are reserved metadata keys:\n");
+    System.err.println("           \t\tnutch.score: A custom score for a url");
+    System.err.println(
+        "           \t\tnutch.fetchInterval: A custom fetch interval for a 
url");
+    System.err.println(
+        "           \t\tnutch.fetchInterval.fixed: A custom fetch interval for 
a url that is not "
+            + "changed by AdaptiveFetchSchedule\n");
+    System.err.println("           \tExample:");
+    System.err.println("           \t http://www.apache.org/";);
+    System.err.println(
+        "           \t http://www.nutch.org/ \\t nutch.score=10 \\t 
nutch.fetchInterval=2592000 \\t userType=open_source\n");
+    System.err.println(
+        " -overwrite\tOverwite existing crawldb records by the injected 
records. Has precedence over 'update'");
+    System.err.println(
+        " -update   \tUpdate existing crawldb records with the injected 
records. Old metadata is preserved");
   }
 
   public static void main(String[] args) throws Exception {
@@ -372,11 +443,27 @@ public class Injector extends NutchTool implements Tool {
 
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
-      System.err.println("Usage: Injector <crawldb> <url_dir>");
+      usage();
       return -1;
     }
+
+    boolean overwrite = false;
+    boolean update = false;
+
+    for (int i = 2; i < args.length; i++) {
+      if (args[i].equals("-overwrite")) {
+        overwrite = true;
+      } else if (args[i].equals("-update")) {
+        update = true;
+      } else {
+        LOG.info("Injector: Found invalid argument \"" + args[i] + "\"\n");
+        usage();
+        return -1;
+      }
+    }
+
     try {
-      inject(new Path(args[0]), new Path(args[1]));
+      inject(new Path(args[0]), new Path(args[1]), overwrite, update);
       return 0;
     } catch (Exception e) {
       LOG.error("Injector: " + StringUtils.stringifyException(e));
@@ -384,43 +471,39 @@ public class Injector extends NutchTool implements Tool {
     }
   }
 
-  @Override
   /**
    * Used by the Nutch REST service
    */
-  public Map<String, Object> run(Map<String, Object> args, String crawlId) 
throws Exception {
-    if(args.size()<1){
+  public Map<String, Object> run(Map<String, Object> args, String crawlId)
+      throws Exception {
+    if (args.size() < 1) {
       throw new IllegalArgumentException("Required arguments <url_dir>");
     }
     Map<String, Object> results = new HashMap<String, Object>();
 
     Path crawlDb;
-    if(args.containsKey(Nutch.ARG_CRAWLDB)) {
+    if (args.containsKey(Nutch.ARG_CRAWLDB)) {
       Object crawldbPath = args.get(Nutch.ARG_CRAWLDB);
-      if(crawldbPath instanceof Path) {
+      if (crawldbPath instanceof Path) {
         crawlDb = (Path) crawldbPath;
-      }
-      else {
+      } else {
         crawlDb = new Path(crawldbPath.toString());
       }
-    }
-    else {
-      crawlDb = new Path(crawlId+"/crawldb");
+    } else {
+      crawlDb = new Path(crawlId + "/crawldb");
     }
 
     Path input;
     Object path = args.get(Nutch.ARG_SEEDDIR);
-    if(path instanceof Path) {
+    if (path instanceof Path) {
       input = (Path) path;
-    }
-    else {
+    } else {
       input = new Path(path.toString());
     }
 
     inject(crawlDb, input);
     results.put(Nutch.VAL_RESULT, Integer.toString(0));
     return results;
-
   }
 
 }

Reply via email to