http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/FetchSchedule.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/FetchSchedule.java 
b/nutch-core/src/main/java/org/apache/nutch/crawl/FetchSchedule.java
new file mode 100755
index 0000000..10ee185
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/FetchSchedule.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This interface defines the contract for implementations that manipulate 
fetch
+ * times and re-fetch intervals.
+ * 
+ * @author Andrzej Bialecki
+ */
+public interface FetchSchedule extends Configurable {
+
+  /** It is unknown whether page was changed since our last visit. */
+  public static final int STATUS_UNKNOWN = 0;
+  /** Page is known to have been modified since our last visit. */
+  public static final int STATUS_MODIFIED = 1;
+  /** Page is known to remain unmodified since our last visit. */
+  public static final int STATUS_NOTMODIFIED = 2;
+
+  public static final int SECONDS_PER_DAY = 3600 * 24;
+
+  /**
+   * Initialize fetch schedule related data. Implementations should at least 
set
+   * the <code>fetchTime</code> and <code>fetchInterval</code>. The default
+   * implementation set the <code>fetchTime</code> to now, using the default
+   * <code>fetchInterval</code>.
+   * 
+   * @param url
+   *          URL of the page.
+   * 
+   * @param datum
+   *          datum instance to be initialized.
+   * 
+   * @return adjusted page information, including all original information.
+   *         NOTE: this may be a different instance than @see CrawlDatum, but
+   *         implementations should make sure that it contains at least all
+   *         information from @see CrawlDatum.
+   */
+  public CrawlDatum initializeSchedule(Text url, CrawlDatum datum);
+
+  /**
+   * Sets the <code>fetchInterval</code> and <code>fetchTime</code> on a
+   * successfully fetched page. Implementations may use supplied arguments to
+   * support different re-fetching schedules.
+   * 
+   * @param url
+   *          url of the page
+   * 
+   * @param datum
+   *          page description to be adjusted. NOTE: this instance, passed by
+   *          reference, may be modified inside the method.
+   * 
+   * @param prevFetchTime
+   *          previous value of fetch time, or 0 if not available.
+   * 
+   * @param prevModifiedTime
+   *          previous value of modifiedTime, or 0 if not available.
+   * 
+   * @param fetchTime
+   *          the latest time, when the page was recently re-fetched. Most
+   *          FetchSchedule implementations should update the value in @see
+   *          CrawlDatum to something greater than this value.
+   * 
+   * @param modifiedTime
+   *          last time the content was modified. This information comes from
+   *          the protocol implementations, or is set to < 0 if not available.
+   *          Most FetchSchedule implementations should update the value in 
@see
+   *          CrawlDatum to this value.
+   * 
+   * @param state
+   *          if {@link #STATUS_MODIFIED}, then the content is considered to be
+   *          "changed" before the <code>fetchTime</code>, if
+   *          {@link #STATUS_NOTMODIFIED} then the content is known to be
+   *          unchanged. This information may be obtained by comparing page
+   *          signatures before and after fetching. If this is set to
+   *          {@link #STATUS_UNKNOWN}, then it is unknown whether the page was
+   *          changed; implementations are free to follow a sensible default
+   *          behavior.
+   * 
+   * @return adjusted page information, including all original information.
+   *         NOTE: this may be a different instance than @see CrawlDatum, but
+   *         implementations should make sure that it contains at least all
+   *         information from @see CrawlDatum}.
+   */
+  public CrawlDatum setFetchSchedule(Text url, CrawlDatum datum,
+      long prevFetchTime, long prevModifiedTime, long fetchTime,
+      long modifiedTime, int state);
+
+  /**
+   * This method specifies how to schedule refetching of pages marked as GONE.
+   * Default implementation increases fetchInterval by 50%, and if it exceeds
+   * the <code>maxInterval</code> it calls
+   * {@link #forceRefetch(Text, CrawlDatum, boolean)}.
+   * 
+   * @param url
+   *          URL of the page
+   * 
+   * @param datum
+   *          datum instance to be adjusted.
+   * 
+   * @return adjusted page information, including all original information.
+   *         NOTE: this may be a different instance than @see CrawlDatum, but
+   *         implementations should make sure that it contains at least all
+   *         information from @see CrawlDatum.
+   */
+  public CrawlDatum setPageGoneSchedule(Text url, CrawlDatum datum,
+      long prevFetchTime, long prevModifiedTime, long fetchTime);
+
+  /**
+   * This method adjusts the fetch schedule if fetching needs to be re-tried 
due
+   * to transient errors. The default implementation sets the next fetch time 1
+   * day in the future and increases the retry counter.
+   * 
+   * @param url
+   *          URL of the page.
+   * 
+   * @param datum
+   *          page information.
+   * 
+   * @param prevFetchTime
+   *          previous fetch time.
+   * 
+   * @param prevModifiedTime
+   *          previous modified time.
+   * 
+   * @param fetchTime
+   *          current fetch time.
+   * 
+   * @return adjusted page information, including all original information.
+   *         NOTE: this may be a different instance than @see CrawlDatum, but
+   *         implementations should make sure that it contains at least all
+   *         information from @see CrawlDatum.
+   */
+  public CrawlDatum setPageRetrySchedule(Text url, CrawlDatum datum,
+      long prevFetchTime, long prevModifiedTime, long fetchTime);
+
+  /**
+   * Calculates last fetch time of the given CrawlDatum.
+   * 
+   * @return the date as a long.
+   */
+  public long calculateLastFetchTime(CrawlDatum datum);
+
+  /**
+   * This method provides information whether the page is suitable for 
selection
+   * in the current fetchlist. NOTE: a true return value does not guarantee 
that
+   * the page will be fetched, it just allows it to be included in the further
+   * selection process based on scores. The default implementation checks
+   * <code>fetchTime</code>, if it is higher than the curTime it returns false,
+   * and true otherwise. It will also check that fetchTime is not too remote
+   * (more than <code>maxInterval</code), in which case it lowers the interval
+   * and returns true.
+   * 
+   * @param url
+   *          URL of the page.
+   * 
+   * @param datum
+   *          datum instance.
+   * 
+   * @param curTime
+   *          reference time (usually set to the time when the fetchlist
+   *          generation process was started).
+   * 
+   * @return true, if the page should be considered for inclusion in the 
current
+   *         fetchlist, otherwise false.
+   */
+  public boolean shouldFetch(Text url, CrawlDatum datum, long curTime);
+
+  /**
+   * This method resets fetchTime, fetchInterval, modifiedTime and page
+   * signature, so that it forces refetching.
+   * 
+   * @param url
+   *          URL of the page.
+   * 
+   * @param datum
+   *          datum instance.
+   * 
+   * @param asap
+   *          if true, force refetch as soon as possible - this sets the
+   *          fetchTime to now. If false, force refetch whenever the next fetch
+   *          time is set.
+   * 
+   * @return adjusted page information, including all original information.
+   *         NOTE: this may be a different instance than @see CrawlDatum, but
+   *         implementations should make sure that it contains at least all
+   *         information from @see CrawlDatum.
+   */
+  public CrawlDatum forceRefetch(Text url, CrawlDatum datum, boolean asap);
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/FetchScheduleFactory.java
----------------------------------------------------------------------
diff --git 
a/nutch-core/src/main/java/org/apache/nutch/crawl/FetchScheduleFactory.java 
b/nutch-core/src/main/java/org/apache/nutch/crawl/FetchScheduleFactory.java
new file mode 100755
index 0000000..7a84524
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/FetchScheduleFactory.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.util.ObjectCache;
+
+/** Creates and caches a {@link FetchSchedule} implementation. */
+public class FetchScheduleFactory {
+
+  public static final Logger LOG = LoggerFactory
+      .getLogger(FetchScheduleFactory.class);
+
+  private FetchScheduleFactory() {
+  } // no public ctor
+
+  /** Return the FetchSchedule implementation. */
+  public synchronized static FetchSchedule getFetchSchedule(Configuration 
conf) {
+    String clazz = conf.get("db.fetch.schedule.class",
+        DefaultFetchSchedule.class.getName());
+    ObjectCache objectCache = ObjectCache.get(conf);
+    FetchSchedule impl = (FetchSchedule) objectCache.getObject(clazz);
+    if (impl == null) {
+      try {
+        LOG.info("Using FetchSchedule impl: " + clazz);
+        Class<?> implClass = Class.forName(clazz);
+        impl = (FetchSchedule) implClass.newInstance();
+        impl.setConf(conf);
+        objectCache.setObject(clazz, impl);
+      } catch (Exception e) {
+        throw new RuntimeException("Couldn't create " + clazz, e);
+      }
+    }
+    return impl;
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/Generator.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/Generator.java 
b/nutch-core/src/main/java/org/apache/nutch/crawl/Generator.java
new file mode 100644
index 0000000..9a82089
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/Generator.java
@@ -0,0 +1,859 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.text.*;
+
+// rLogging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.jexl2.Expression;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilterException;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.JexlUtil;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+
+/**
+ * Generates a subset of a crawl db to fetch. This version allows to generate
+ * fetchlists for several segments in one go. Unlike in the initial version
+ * (OldGenerator), the IP resolution is done ONLY on the entries which have 
been
+ * selected for fetching. The URLs are partitioned by IP, domain or host within
+ * a segment. We can chose separately how to count the URLS i.e. by domain or
+ * host to limit the entries.
+ **/
+public class Generator extends NutchTool implements Tool {
+
+  public static final Logger LOG = LoggerFactory.getLogger(Generator.class);
+
+  public static final String GENERATE_UPDATE_CRAWLDB = 
"generate.update.crawldb";
+  public static final String GENERATOR_MIN_SCORE = "generate.min.score";
+  public static final String GENERATOR_MIN_INTERVAL = "generate.min.interval";
+  public static final String GENERATOR_RESTRICT_STATUS = 
"generate.restrict.status";
+  public static final String GENERATOR_FILTER = "generate.filter";
+  public static final String GENERATOR_NORMALISE = "generate.normalise";
+  public static final String GENERATOR_MAX_COUNT = "generate.max.count";
+  public static final String GENERATOR_COUNT_MODE = "generate.count.mode";
+  public static final String GENERATOR_COUNT_VALUE_DOMAIN = "domain";
+  public static final String GENERATOR_COUNT_VALUE_HOST = "host";
+  public static final String GENERATOR_TOP_N = "generate.topN";
+  public static final String GENERATOR_CUR_TIME = "generate.curTime";
+  public static final String GENERATOR_DELAY = "crawl.gen.delay";
+  public static final String GENERATOR_MAX_NUM_SEGMENTS = 
"generate.max.num.segments";
+  public static final String GENERATOR_EXPR = "generate.expr";
+
+  public static class SelectorEntry implements Writable {
+    public Text url;
+    public CrawlDatum datum;
+    public IntWritable segnum;
+
+    public SelectorEntry() {
+      url = new Text();
+      datum = new CrawlDatum();
+      segnum = new IntWritable(0);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      url.readFields(in);
+      datum.readFields(in);
+      segnum.readFields(in);
+    }
+
+    public void write(DataOutput out) throws IOException {
+      url.write(out);
+      datum.write(out);
+      segnum.write(out);
+    }
+
+    public String toString() {
+      return "url=" + url.toString() + ", datum=" + datum.toString()
+          + ", segnum=" + segnum.toString();
+    }
+  }
+
+  /** Selects entries due for fetch. */
+  public static class Selector implements
+      Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry>,
+      Partitioner<FloatWritable, Writable>,
+      Reducer<FloatWritable, SelectorEntry, FloatWritable, SelectorEntry> {
+    private LongWritable genTime = new 
LongWritable(System.currentTimeMillis());
+    private long curTime;
+    private long limit;
+    private long count;
+    private HashMap<String, int[]> hostCounts = new HashMap<String, int[]>();
+    private int segCounts[];
+    private int maxCount;
+    private boolean byDomain = false;
+    private Partitioner<Text, Writable> partitioner = new URLPartitioner();
+    private URLFilters filters;
+    private URLNormalizers normalizers;
+    private ScoringFilters scfilters;
+    private SelectorEntry entry = new SelectorEntry();
+    private FloatWritable sortValue = new FloatWritable();
+    private boolean filter;
+    private boolean normalise;
+    private long genDelay;
+    private FetchSchedule schedule;
+    private float scoreThreshold = 0f;
+    private int intervalThreshold = -1;
+    private String restrictStatus = null;
+    private int maxNumSegments = 1;
+    private Expression expr = null;
+    private int currentsegmentnum = 1;
+
+    public void configure(JobConf job) {
+      curTime = job.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis());
+      limit = job.getLong(GENERATOR_TOP_N, Long.MAX_VALUE)
+          / job.getNumReduceTasks();
+      maxCount = job.getInt(GENERATOR_MAX_COUNT, -1);
+      if (maxCount == -1) {
+        byDomain = false;
+      }
+      if (GENERATOR_COUNT_VALUE_DOMAIN.equals(job.get(GENERATOR_COUNT_MODE)))
+        byDomain = true;
+      filters = new URLFilters(job);
+      normalise = job.getBoolean(GENERATOR_NORMALISE, true);
+      if (normalise)
+        normalizers = new URLNormalizers(job,
+            URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+      scfilters = new ScoringFilters(job);
+      partitioner.configure(job);
+      filter = job.getBoolean(GENERATOR_FILTER, true);
+      genDelay = job.getLong(GENERATOR_DELAY, 7L) * 3600L * 24L * 1000L;
+      long time = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
+      if (time > 0)
+        genTime.set(time);
+      schedule = FetchScheduleFactory.getFetchSchedule(job);
+      scoreThreshold = job.getFloat(GENERATOR_MIN_SCORE, Float.NaN);
+      intervalThreshold = job.getInt(GENERATOR_MIN_INTERVAL, -1);
+      restrictStatus = job.get(GENERATOR_RESTRICT_STATUS, null);
+      expr = JexlUtil.parseExpression(job.get(GENERATOR_EXPR, null));
+      maxNumSegments = job.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1);
+      segCounts = new int[maxNumSegments];
+    }
+
+    public void close() {
+    }
+
+    /** Select & invert subset due for fetch. */
+    public void map(Text key, CrawlDatum value,
+        OutputCollector<FloatWritable, SelectorEntry> output, Reporter 
reporter)
+        throws IOException {
+      Text url = key;
+      if (filter) {
+        // If filtering is on don't generate URLs that don't pass
+        // URLFilters
+        try {
+          if (filters.filter(url.toString()) == null)
+            return;
+        } catch (URLFilterException e) {
+          if (LOG.isWarnEnabled()) {
+            LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage()
+                + ")");
+          }
+        }
+      }
+      CrawlDatum crawlDatum = value;
+
+      // check fetch schedule
+      if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
+        LOG.debug("-shouldFetch rejected '" + url + "', fetchTime="
+            + crawlDatum.getFetchTime() + ", curTime=" + curTime);
+        return;
+      }
+
+      LongWritable oldGenTime = (LongWritable) crawlDatum.getMetaData().get(
+          Nutch.WRITABLE_GENERATE_TIME_KEY);
+      if (oldGenTime != null) { // awaiting fetch & update
+        if (oldGenTime.get() + genDelay > curTime) // still wait for
+          // update
+          return;
+      }
+      float sort = 1.0f;
+      try {
+        sort = scfilters.generatorSortValue(key, crawlDatum, sort);
+      } catch (ScoringFilterException sfe) {
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + 
sfe);
+        }
+      }
+      
+      // check expr
+      if (expr != null) {
+        if (!crawlDatum.evaluate(expr)) {
+          return;
+        }
+      }
+
+      if (restrictStatus != null
+          && !restrictStatus.equalsIgnoreCase(CrawlDatum
+              .getStatusName(crawlDatum.getStatus())))
+        return;
+
+      // consider only entries with a score superior to the threshold
+      if (scoreThreshold != Float.NaN && sort < scoreThreshold)
+        return;
+
+      // consider only entries with a retry (or fetch) interval lower than
+      // threshold
+      if (intervalThreshold != -1
+          && crawlDatum.getFetchInterval() > intervalThreshold)
+        return;
+
+      // sort by decreasing score, using DecreasingFloatComparator
+      sortValue.set(sort);
+      // record generation time
+      crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
+      entry.datum = crawlDatum;
+      entry.url = key;
+      output.collect(sortValue, entry); // invert for sort by score
+    }
+
+    /** Partition by host / domain or IP. */
+    public int getPartition(FloatWritable key, Writable value,
+        int numReduceTasks) {
+      return partitioner.getPartition(((SelectorEntry) value).url, key,
+          numReduceTasks);
+    }
+
+    /** Collect until limit is reached. */
+    public void reduce(FloatWritable key, Iterator<SelectorEntry> values,
+        OutputCollector<FloatWritable, SelectorEntry> output, Reporter 
reporter)
+        throws IOException {
+
+      while (values.hasNext()) {
+
+        if (count == limit) {
+          // do we have any segments left?
+          if (currentsegmentnum < maxNumSegments) {
+            count = 0;
+            currentsegmentnum++;
+          } else
+            break;
+        }
+
+        SelectorEntry entry = values.next();
+        Text url = entry.url;
+        String urlString = url.toString();
+        URL u = null;
+
+        String hostordomain = null;
+
+        try {
+          if (normalise && normalizers != null) {
+            urlString = normalizers.normalize(urlString,
+                URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+          }
+          u = new URL(urlString);
+          if (byDomain) {
+            hostordomain = URLUtil.getDomainName(u);
+          } else {
+            hostordomain = new URL(urlString).getHost();
+          }
+        } catch (Exception e) {
+          LOG.warn("Malformed URL: '" + urlString + "', skipping ("
+              + StringUtils.stringifyException(e) + ")");
+          reporter.getCounter("Generator", "MALFORMED_URL").increment(1);
+          continue;
+        }
+
+        hostordomain = hostordomain.toLowerCase();
+
+        // only filter if we are counting hosts or domains
+        if (maxCount > 0) {
+          int[] hostCount = hostCounts.get(hostordomain);
+          if (hostCount == null) {
+            hostCount = new int[] { 1, 0 };
+            hostCounts.put(hostordomain, hostCount);
+          }
+
+          // increment hostCount
+          hostCount[1]++;
+
+          // check if topN reached, select next segment if it is
+          while (segCounts[hostCount[0] - 1] >= limit
+              && hostCount[0] < maxNumSegments) {
+            hostCount[0]++;
+            hostCount[1] = 0;
+          }
+
+          // reached the limit of allowed URLs per host / domain
+          // see if we can put it in the next segment?
+          if (hostCount[1] >= maxCount) {
+            if (hostCount[0] < maxNumSegments) {
+              hostCount[0]++;
+              hostCount[1] = 0;
+            } else {
+              if (hostCount[1] == maxCount + 1 && LOG.isInfoEnabled()) {
+                LOG.info("Host or domain "
+                    + hostordomain
+                    + " has more than "
+                    + maxCount
+                    + " URLs for all "
+                    + maxNumSegments
+                    + " segments. Additional URLs won't be included in the 
fetchlist.");
+              }
+              // skip this entry
+              continue;
+            }
+          }
+          entry.segnum = new IntWritable(hostCount[0]);
+          segCounts[hostCount[0] - 1]++;
+        } else {
+          entry.segnum = new IntWritable(currentsegmentnum);
+          segCounts[currentsegmentnum - 1]++;
+        }
+
+        output.collect(key, entry);
+
+        // Count is incremented only when we keep the URL
+        // maxCount may cause us to skip it.
+        count++;
+      }
+    }
+  }
+
+  // Allows the reducers to generate one subfile per
+  public static class GeneratorOutputFormat extends
+      MultipleSequenceFileOutputFormat<FloatWritable, SelectorEntry> {
+    // generate a filename based on the segnum stored for this entry
+    protected String generateFileNameForKeyValue(FloatWritable key,
+        SelectorEntry value, String name) {
+      return "fetchlist-" + value.segnum.toString() + "/" + name;
+    }
+
+  }
+
+  public static class DecreasingFloatComparator extends
+      FloatWritable.Comparator {
+
+    /** Compares two FloatWritables decreasing. */
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      return super.compare(b2, s2, l2, b1, s1, l1);
+    }
+  }
+
+  public static class SelectorInverseMapper extends MapReduceBase implements
+      Mapper<FloatWritable, SelectorEntry, Text, SelectorEntry> {
+
+    public void map(FloatWritable key, SelectorEntry value,
+        OutputCollector<Text, SelectorEntry> output, Reporter reporter)
+        throws IOException {
+      SelectorEntry entry = value;
+      output.collect(entry.url, entry);
+    }
+  }
+
+  public static class PartitionReducer extends MapReduceBase implements
+      Reducer<Text, SelectorEntry, Text, CrawlDatum> {
+
+    public void reduce(Text key, Iterator<SelectorEntry> values,
+        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+        throws IOException {
+      // if using HashComparator, we get only one input key in case of
+      // hash collision
+      // so use only URLs from values
+      while (values.hasNext()) {
+        SelectorEntry entry = values.next();
+        output.collect(entry.url, entry.datum);
+      }
+    }
+
+  }
+
+  /** Sort fetch lists by hash of URL. */
+  public static class HashComparator extends WritableComparator {
+    public HashComparator() {
+      super(Text.class);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public int compare(WritableComparable a, WritableComparable b) {
+      Text url1 = (Text) a;
+      Text url2 = (Text) b;
+      int hash1 = hash(url1.getBytes(), 0, url1.getLength());
+      int hash2 = hash(url2.getBytes(), 0, url2.getLength());
+      return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1));
+    }
+
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      int hash1 = hash(b1, s1, l1);
+      int hash2 = hash(b2, s2, l2);
+      return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1));
+    }
+
+    private static int hash(byte[] bytes, int start, int length) {
+      int hash = 1;
+      // make later bytes more significant in hash code, so that sorting
+      // by
+      // hashcode correlates less with by-host ordering.
+      for (int i = length - 1; i >= 0; i--)
+        hash = (31 * hash) + (int) bytes[start + i];
+      return hash;
+    }
+  }
+
+  /**
+   * Update the CrawlDB so that the next generate won't include the same URLs.
+   */
+  public static class CrawlDbUpdater extends MapReduceBase implements
+      Mapper<Text, CrawlDatum, Text, CrawlDatum>,
+      Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+    long generateTime;
+
+    public void configure(JobConf job) {
+      generateTime = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
+    }
+
+    public void map(Text key, CrawlDatum value,
+        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+        throws IOException {
+      output.collect(key, value);
+    }
+
+    private CrawlDatum orig = new CrawlDatum();
+    private LongWritable genTime = new LongWritable(0L);
+
+    public void reduce(Text key, Iterator<CrawlDatum> values,
+        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+        throws IOException {
+      genTime.set(0L);
+      while (values.hasNext()) {
+        CrawlDatum val = values.next();
+        if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {
+          LongWritable gt = (LongWritable) val.getMetaData().get(
+              Nutch.WRITABLE_GENERATE_TIME_KEY);
+          genTime.set(gt.get());
+          if (genTime.get() != generateTime) {
+            orig.set(val);
+            genTime.set(0L);
+            continue;
+          }
+        } else {
+          orig.set(val);
+        }
+      }
+      if (genTime.get() != 0L) {
+        orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
+      }
+      output.collect(key, orig);
+    }
+  }
+
+  public Generator() {
+  }
+
+  public Generator(Configuration conf) {
+    setConf(conf);
+  }
+
+  public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
+      long curTime) throws IOException {
+
+    JobConf job = new NutchJob(getConf());
+    boolean filter = job.getBoolean(GENERATOR_FILTER, true);
+    boolean normalise = job.getBoolean(GENERATOR_NORMALISE, true);
+    return generate(dbDir, segments, numLists, topN, curTime, filter,
+        normalise, false, 1, null);
+  }
+
+  /**
+   * old signature used for compatibility - does not specify whether or not to
+   * normalise and set the number of segments to 1
+   **/
+  public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
+      long curTime, boolean filter, boolean force) throws IOException {
+    return generate(dbDir, segments, numLists, topN, curTime, filter, true,
+        force, 1, null);
+  }
+
+  /**
+   * Generate fetchlists in one or more segments. Whether to filter URLs or not
+   * is read from the crawl.generate.filter property in the configuration 
files.
+   * If the property is not found, the URLs are filtered. Same for the
+   * normalisation.
+   * 
+   * @param dbDir
+   *          Crawl database directory
+   * @param segments
+   *          Segments directory
+   * @param numLists
+   *          Number of reduce tasks
+   * @param topN
+   *          Number of top URLs to be selected
+   * @param curTime
+   *          Current time in milliseconds
+   * 
+   * @return Path to generated segment or null if no entries were selected
+   * 
+   * @throws IOException
+   *           When an I/O error occurs
+   */
+  public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
+      long curTime, boolean filter, boolean norm, boolean force,
+      int maxNumSegments, String expr) throws IOException {
+
+    Path tempDir = new Path(getConf().get("mapred.temp.dir", ".")
+        + "/generate-temp-" + java.util.UUID.randomUUID().toString());
+
+    Path lock = new Path(dbDir, CrawlDb.LOCK_NAME);
+    FileSystem fs = FileSystem.get(getConf());
+    LockUtil.createLockFile(fs, lock, force);
+    
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("Generator: starting at " + sdf.format(start));
+    LOG.info("Generator: Selecting best-scoring urls due for fetch.");
+    LOG.info("Generator: filtering: " + filter);
+    LOG.info("Generator: normalizing: " + norm);
+    if (topN != Long.MAX_VALUE) {
+      LOG.info("Generator: topN: " + topN);
+    }
+    if (expr != null) {
+      LOG.info("Generator: expr: " + expr);
+    }
+    
+    // map to inverted subset due for fetch, sort by score
+    JobConf job = new NutchJob(getConf());
+    job.setJobName("generate: select from " + dbDir);
+
+    if (numLists == -1) { // for politeness make
+      numLists = job.getNumMapTasks(); // a partition per fetch task
+    }
+    if ("local".equals(job.get("mapreduce.framework.name")) && numLists != 1) {
+      // override
+      LOG.info("Generator: running in local mode, generating exactly one 
partition.");
+      numLists = 1;
+    }
+    job.setLong(GENERATOR_CUR_TIME, curTime);
+    // record real generation time
+    long generateTime = System.currentTimeMillis();
+    job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
+    job.setLong(GENERATOR_TOP_N, topN);
+    job.setBoolean(GENERATOR_FILTER, filter);
+    job.setBoolean(GENERATOR_NORMALISE, norm);
+    job.setInt(GENERATOR_MAX_NUM_SEGMENTS, maxNumSegments);
+    if (expr != null) {
+      job.set(GENERATOR_EXPR, expr);
+    }
+    FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    job.setMapperClass(Selector.class);
+    job.setPartitionerClass(Selector.class);
+    job.setReducerClass(Selector.class);
+
+    FileOutputFormat.setOutputPath(job, tempDir);
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(FloatWritable.class);
+    job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);
+    job.setOutputValueClass(SelectorEntry.class);
+    job.setOutputFormat(GeneratorOutputFormat.class);
+
+    try {
+      JobClient.runJob(job);
+    } catch (IOException e) {
+      LockUtil.removeLockFile(fs, lock);
+      fs.delete(tempDir, true);
+      throw e;
+    }
+
+    // read the subdirectories generated in the temp
+    // output and turn them into segments
+    List<Path> generatedSegments = new ArrayList<Path>();
+
+    FileStatus[] status = fs.listStatus(tempDir);
+    try {
+      for (FileStatus stat : status) {
+        Path subfetchlist = stat.getPath();
+        if (!subfetchlist.getName().startsWith("fetchlist-"))
+          continue;
+        // start a new partition job for this segment
+        Path newSeg = partitionSegment(fs, segments, subfetchlist, numLists);
+        generatedSegments.add(newSeg);
+      }
+    } catch (Exception e) {
+      LOG.warn("Generator: exception while partitioning segments, exiting 
...");
+      fs.delete(tempDir, true);
+      return null;
+    }
+
+    if (generatedSegments.size() == 0) {
+      LOG.warn("Generator: 0 records selected for fetching, exiting ...");
+      LockUtil.removeLockFile(fs, lock);
+      fs.delete(tempDir, true);
+      return null;
+    }
+
+    if (getConf().getBoolean(GENERATE_UPDATE_CRAWLDB, false)) {
+      // update the db from tempDir
+      Path tempDir2 = new Path(getConf().get("mapred.temp.dir", ".")
+          + "/generate-temp-" + java.util.UUID.randomUUID().toString());
+
+      job = new NutchJob(getConf());
+      job.setJobName("generate: updatedb " + dbDir);
+      job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
+      for (Path segmpaths : generatedSegments) {
+        Path subGenDir = new Path(segmpaths, CrawlDatum.GENERATE_DIR_NAME);
+        FileInputFormat.addInputPath(job, subGenDir);
+      }
+      FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
+      job.setInputFormat(SequenceFileInputFormat.class);
+      job.setMapperClass(CrawlDbUpdater.class);
+      job.setReducerClass(CrawlDbUpdater.class);
+      job.setOutputFormat(MapFileOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(CrawlDatum.class);
+      FileOutputFormat.setOutputPath(job, tempDir2);
+      try {
+        JobClient.runJob(job);
+        CrawlDb.install(job, dbDir);
+      } catch (IOException e) {
+        LockUtil.removeLockFile(fs, lock);
+        fs.delete(tempDir, true);
+        fs.delete(tempDir2, true);
+        throw e;
+      }
+      fs.delete(tempDir2, true);
+    }
+
+    LockUtil.removeLockFile(fs, lock);
+    fs.delete(tempDir, true);
+
+    long end = System.currentTimeMillis();
+    LOG.info("Generator: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
+
+    Path[] patharray = new Path[generatedSegments.size()];
+    return generatedSegments.toArray(patharray);
+  }
+
+  private Path partitionSegment(FileSystem fs, Path segmentsDir, Path inputDir,
+      int numLists) throws IOException {
+    // invert again, partition by host/domain/IP, sort by url hash
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Generator: Partitioning selected urls for politeness.");
+    }
+    Path segment = new Path(segmentsDir, generateSegmentName());
+    Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME);
+
+    LOG.info("Generator: segment: " + segment);
+
+    NutchJob job = new NutchJob(getConf());
+    job.setJobName("generate: partition " + segment);
+
+    job.setInt("partition.url.seed", new Random().nextInt());
+
+    FileInputFormat.addInputPath(job, inputDir);
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    job.setMapperClass(SelectorInverseMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(SelectorEntry.class);
+    job.setPartitionerClass(URLPartitioner.class);
+    job.setReducerClass(PartitionReducer.class);
+    job.setNumReduceTasks(numLists);
+
+    FileOutputFormat.setOutputPath(job, output);
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(CrawlDatum.class);
+    job.setOutputKeyComparatorClass(HashComparator.class);
+    JobClient.runJob(job);
+    return segment;
+  }
+
+  private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
+
+  public static synchronized String generateSegmentName() {
+    try {
+      Thread.sleep(1000);
+    } catch (Throwable t) {
+    }
+    ;
+    return sdf.format(new Date(System.currentTimeMillis()));
+  }
+
+  /**
+   * Generate a fetchlist from the crawldb.
+   */
+  public static void main(String args[]) throws Exception {
+    int res = ToolRunner
+        .run(NutchConfiguration.create(), new Generator(), args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.out
+          .println("Usage: Generator <crawldb> <segments_dir> [-force] [-topN 
N] [-numFetchers numFetchers] [-expr <expr>] [-adddays <numDays>] [-noFilter] 
[-noNorm] [-maxNumSegments <num>]");
+      return -1;
+    }
+
+    Path dbDir = new Path(args[0]);
+    Path segmentsDir = new Path(args[1]);
+    long curTime = System.currentTimeMillis();
+    long topN = Long.MAX_VALUE;
+    int numFetchers = -1;
+    boolean filter = true;
+    boolean norm = true;
+    boolean force = false;
+    String expr = null;
+    int maxNumSegments = 1;
+
+    for (int i = 2; i < args.length; i++) {
+      if ("-topN".equals(args[i])) {
+        topN = Long.parseLong(args[i + 1]);
+        i++;
+      } else if ("-numFetchers".equals(args[i])) {
+        numFetchers = Integer.parseInt(args[i + 1]);
+        i++;
+      } else if ("-adddays".equals(args[i])) {
+        long numDays = Integer.parseInt(args[i + 1]);
+        curTime += numDays * 1000L * 60 * 60 * 24;
+      } else if ("-noFilter".equals(args[i])) {
+        filter = false;
+      } else if ("-noNorm".equals(args[i])) {
+        norm = false;
+      } else if ("-force".equals(args[i])) {
+        force = true;
+      } else if ("-maxNumSegments".equals(args[i])) {
+        maxNumSegments = Integer.parseInt(args[i + 1]);
+      } else if ("-expr".equals(args[i])) {
+        expr = args[i + 1];
+      }
+
+    }
+
+    try {
+      Path[] segs = generate(dbDir, segmentsDir, numFetchers, topN, curTime,
+          filter, norm, force, maxNumSegments, expr);
+      if (segs == null)
+        return 1;
+    } catch (Exception e) {
+      LOG.error("Generator: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+    return 0;
+  }
+
+  @Override
+  public Map<String, Object> run(Map<String, Object> args, String crawlId) 
throws Exception {
+
+    Map<String, Object> results = new HashMap<String, Object>();
+
+    long curTime = System.currentTimeMillis();
+    long topN = Long.MAX_VALUE;
+    int numFetchers = -1;
+    boolean filter = true;
+    boolean norm = true;
+    boolean force = false;
+    int maxNumSegments = 1;
+    String expr = null;
+
+    Path crawlDb;
+    if(args.containsKey(Nutch.ARG_CRAWLDB)) {
+      Object crawldbPath = args.get(Nutch.ARG_CRAWLDB);
+      if(crawldbPath instanceof Path) {
+        crawlDb = (Path) crawldbPath;
+      }
+      else {
+        crawlDb = new Path(crawldbPath.toString());
+      }
+    }
+    else {
+      crawlDb = new Path(crawlId+"/crawldb");
+    }
+
+    Path segmentsDir;
+    if(args.containsKey(Nutch.ARG_SEGMENTDIR)) {
+      Object segDir = args.get(Nutch.ARG_SEGMENTDIR);
+      if(segDir instanceof Path) {
+        segmentsDir = (Path) segDir;
+      }
+      else {
+        segmentsDir = new Path(segDir.toString());
+      }
+    }
+    else {
+      segmentsDir = new Path(crawlId+"/segments");
+    }
+    
+    if (args.containsKey("expr")) {
+      expr = (String)args.get("expr");
+    }
+    if (args.containsKey("topN")) {
+      topN = Long.parseLong((String)args.get("topN"));
+    }
+    if (args.containsKey("numFetchers")) {
+      numFetchers = Integer.parseInt((String)args.get("numFetchers"));
+    }
+    if (args.containsKey("adddays")) {
+      long numDays = Integer.parseInt((String)args.get("adddays"));
+      curTime += numDays * 1000L * 60 * 60 * 24;
+    }
+    if (args.containsKey("noFilter")) {
+      filter = false;
+    } 
+    if (args.containsKey("noNorm")) {
+      norm = false;
+    } 
+    if (args.containsKey("force")) {
+      force = true;
+    } 
+    if (args.containsKey("maxNumSegments")) {
+      maxNumSegments = Integer.parseInt((String)args.get("maxNumSegments"));
+    }
+
+    try {
+      Path[] segs = generate(crawlDb, segmentsDir, numFetchers, topN, curTime,
+          filter, norm, force, maxNumSegments, expr);
+      if (segs == null){
+        results.put(Nutch.VAL_RESULT, Integer.toString(1));
+        return results;
+      }
+
+    } catch (Exception e) {
+      LOG.error("Generator: " + StringUtils.stringifyException(e));
+      results.put(Nutch.VAL_RESULT, Integer.toString(-1));
+      return results;
+    }
+    results.put(Nutch.VAL_RESULT, Integer.toString(0));
+    return results;
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/Injector.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/Injector.java 
b/nutch-core/src/main/java/org/apache/nutch/crawl/Injector.java
new file mode 100644
index 0000000..383aaf1
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/Injector.java
@@ -0,0 +1,510 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.TimingUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Injector takes a flat file of URLs and merges ("injects") these URLs into 
the
+ * CrawlDb. Useful for bootstrapping a Nutch crawl. The URL files contain one
+ * URL per line, optionally followed by custom metadata separated by tabs with
+ * the metadata key separated from the corresponding value by '='.
+ * </p>
+ * <p>
+ * Note, that some metadata keys are reserved:
+ * <dl>
+ * <dt>nutch.score</dt>
+ * <dd>allows to set a custom score for a specific URL</dd>
+ * <dt>nutch.fetchInterval</dt>
+ * <dd>allows to set a custom fetch interval for a specific URL</dd>
+ * <dt>nutch.fetchInterval.fixed</dt>
+ * <dd>allows to set a custom fetch interval for a specific URL that is not
+ * changed by AdaptiveFetchSchedule</dd>
+ * </dl>
+ * </p>
+ * <p>
+ * Example:
+ * 
+ * <pre>
+ *  http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t 
userType=open_source
+ * </pre>
+ * </p>
+ **/
+public class Injector extends NutchTool implements Tool {
+  public static final Logger LOG = LoggerFactory.getLogger(Injector.class);
+
+  /** metadata key reserved for setting a custom score for a specific URL */
+  public static String nutchScoreMDName = "nutch.score";
+
+  /**
+   * metadata key reserved for setting a custom fetchInterval for a specific 
URL
+   */
+  public static String nutchFetchIntervalMDName = "nutch.fetchInterval";
+
+  /**
+   * metadata key reserved for setting a fixed custom fetchInterval for a
+   * specific URL
+   */
+  public static String nutchFixedFetchIntervalMDName = 
"nutch.fetchInterval.fixed";
+
+  public static class InjectMapper
+      extends Mapper<Text, Writable, Text, CrawlDatum> {
+    public static final String URL_NORMALIZING_SCOPE = 
"crawldb.url.normalizers.scope";
+    public static final String TAB_CHARACTER = "\t";
+    public static final String EQUAL_CHARACTER = "=";
+
+    private URLNormalizers urlNormalizers;
+    private int interval;
+    private float scoreInjected;
+    private URLFilters filters;
+    private ScoringFilters scfilters;
+    private long curTime;
+    private boolean url404Purging;
+    private String scope;
+
+    public void setup(Context context) {
+      Configuration conf = context.getConfiguration();
+      scope = conf.get(URL_NORMALIZING_SCOPE, URLNormalizers.SCOPE_INJECT);
+      urlNormalizers = new URLNormalizers(conf, scope);
+      interval = conf.getInt("db.fetch.interval.default", 2592000);
+      filters = new URLFilters(conf);
+      scfilters = new ScoringFilters(conf);
+      scoreInjected = conf.getFloat("db.score.injected", 1.0f);
+      curTime = conf.getLong("injector.current.time",
+          System.currentTimeMillis());
+      url404Purging = conf.getBoolean(CrawlDb.CRAWLDB_PURGE_404, false);
+    }
+
+    /* Filter and normalize the input url */
+    private String filterNormalize(String url) {
+      if (url != null) {
+        try {
+          url = urlNormalizers.normalize(url, scope); // normalize the url
+          url = filters.filter(url); // filter the url
+        } catch (Exception e) {
+          LOG.warn("Skipping " + url + ":" + e);
+          url = null;
+        }
+      }
+      return url;
+    }
+
+    /**
+     * Extract metadata that could be passed along with url in a seeds file.
+     * Metadata must be key-value pair(s) and separated by a TAB_CHARACTER
+     */
+    private void processMetaData(String metadata, CrawlDatum datum,
+        String url) {
+      String[] splits = metadata.split(TAB_CHARACTER);
+
+      for (String split : splits) {
+        // find separation between name and value
+        int indexEquals = split.indexOf(EQUAL_CHARACTER);
+        if (indexEquals == -1) // skip anything without a EQUAL_CHARACTER
+          continue;
+
+        String metaname = split.substring(0, indexEquals);
+        String metavalue = split.substring(indexEquals + 1);
+
+        try {
+          if (metaname.equals(nutchScoreMDName)) {
+            datum.setScore(Float.parseFloat(metavalue));
+          } else if (metaname.equals(nutchFetchIntervalMDName)) {
+            datum.setFetchInterval(Integer.parseInt(metavalue));
+          } else if (metaname.equals(nutchFixedFetchIntervalMDName)) {
+            int fixedInterval = Integer.parseInt(metavalue);
+            if (fixedInterval > -1) {
+              // Set writable using float. Float is used by
+              // AdaptiveFetchSchedule
+              datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY,
+                  new FloatWritable(fixedInterval));
+              datum.setFetchInterval(fixedInterval);
+            }
+          } else {
+            datum.getMetaData().put(new Text(metaname), new Text(metavalue));
+          }
+        } catch (NumberFormatException nfe) {
+          LOG.error("Invalid number '" + metavalue + "' in metadata '"
+              + metaname + "' for url " + url);
+        }
+      }
+    }
+
+    public void map(Text key, Writable value, Context context)
+        throws IOException, InterruptedException {
+      if (value instanceof Text) {
+        // if its a url from the seed list
+        String url = key.toString().trim();
+
+        // remove empty string or string starting with '#'
+        if (url.length() == 0 || url.startsWith("#"))
+          return;
+
+        url = filterNormalize(url);
+        if (url == null) {
+          context.getCounter("injector", "urls_filtered").increment(1);
+        } else {
+          CrawlDatum datum = new CrawlDatum();
+          datum.setStatus(CrawlDatum.STATUS_INJECTED);
+          datum.setFetchTime(curTime);
+          datum.setScore(scoreInjected);
+          datum.setFetchInterval(interval);
+
+          String metadata = value.toString().trim();
+          if (metadata.length() > 0)
+            processMetaData(metadata, datum, url);
+
+          try {
+            key.set(url);
+            scfilters.injectedScore(key, datum);
+          } catch (ScoringFilterException e) {
+            if (LOG.isWarnEnabled()) {
+              LOG.warn("Cannot filter injected score for url " + url
+                  + ", using default (" + e.getMessage() + ")");
+            }
+          }
+          context.getCounter("injector", "urls_injected").increment(1);
+          context.write(key, datum);
+        }
+      } else if (value instanceof CrawlDatum) {
+        // if its a crawlDatum from the input crawldb, emulate CrawlDbFilter's
+        // map()
+        CrawlDatum datum = (CrawlDatum) value;
+
+        // remove 404 urls
+        if (url404Purging && CrawlDatum.STATUS_DB_GONE == datum.getStatus())
+          return;
+
+        String url = filterNormalize(key.toString());
+        if (url != null) {
+          key.set(url);
+          context.write(key, datum);
+        }
+      }
+    }
+  }
+
+  /** Combine multiple new entries for a url. */
+  public static class InjectReducer
+      extends Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+    private int interval;
+    private float scoreInjected;
+    private boolean overwrite = false;
+    private boolean update = false;
+    private CrawlDatum old = new CrawlDatum();
+    private CrawlDatum injected = new CrawlDatum();
+
+    public void setup(Context context) {
+      Configuration conf = context.getConfiguration();
+      interval = conf.getInt("db.fetch.interval.default", 2592000);
+      scoreInjected = conf.getFloat("db.score.injected", 1.0f);
+      overwrite = conf.getBoolean("db.injector.overwrite", false);
+      update = conf.getBoolean("db.injector.update", false);
+      LOG.info("Injector: overwrite: " + overwrite);
+      LOG.info("Injector: update: " + update);
+    }
+
+    /**
+     * Merge the input records as per rules below :
+     * 
+     * <pre>
+     * 1. If there is ONLY new injected record ==> emit injected record
+     * 2. If there is ONLY old record          ==> emit existing record
+     * 3. If BOTH new and old records are present:
+     *    (a) If 'overwrite' is true           ==> emit injected record
+     *    (b) If 'overwrite' is false :
+     *        (i)  If 'update' is false        ==> emit existing record
+     *        (ii) If 'update' is true         ==> update existing record and 
emit it
+     * </pre>
+     * 
+     * For more details @see NUTCH-1405
+     */
+    public void reduce(Text key, Iterable<CrawlDatum> values, Context context)
+        throws IOException, InterruptedException {
+
+      boolean oldSet = false;
+      boolean injectedSet = false;
+
+      // If we encounter a datum with status as STATUS_INJECTED, then its a
+      // newly injected record. All other statuses correspond to an old record.
+      for (CrawlDatum val : values) {
+        if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
+          injected.set(val);
+          injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
+          injectedSet = true;
+        } else {
+          old.set(val);
+          oldSet = true;
+        }
+      }
+
+      CrawlDatum result;
+      if (injectedSet && (!oldSet || overwrite)) {
+        // corresponds to rules (1) and (3.a) in the method description
+        result = injected;
+      } else {
+        // corresponds to rules (2) and (3.b) in the method description
+        result = old;
+
+        if (injectedSet && update) {
+          // corresponds to rule (3.b.ii) in the method description
+          old.putAllMetaData(injected);
+          old.setScore(injected.getScore() != scoreInjected
+              ? injected.getScore() : old.getScore());
+          old.setFetchInterval(injected.getFetchInterval() != interval
+              ? injected.getFetchInterval() : old.getFetchInterval());
+        }
+      }
+      if (injectedSet && oldSet) {
+        context.getCounter("injector", "urls_merged").increment(1);
+      }
+      context.write(key, result);
+    }
+  }
+
+  public Injector() {
+  }
+
+  public Injector(Configuration conf) {
+    setConf(conf);
+  }
+
+  public void inject(Path crawlDb, Path urlDir)
+      throws IOException, ClassNotFoundException, InterruptedException {
+    inject(crawlDb, urlDir, false, false);
+  }
+
+  public void inject(Path crawlDb, Path urlDir, boolean overwrite,
+      boolean update) throws IOException, ClassNotFoundException, 
InterruptedException {
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Injector: starting at " + sdf.format(start));
+      LOG.info("Injector: crawlDb: " + crawlDb);
+      LOG.info("Injector: urlDir: " + urlDir);
+      LOG.info("Injector: Converting injected urls to crawl db entries.");
+    }
+
+    // set configuration
+    Configuration conf = getConf();
+    conf.setLong("injector.current.time", System.currentTimeMillis());
+    conf.setBoolean("db.injector.overwrite", overwrite);
+    conf.setBoolean("db.injector.update", update);
+    conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+
+    // create all the required paths
+    FileSystem fs = FileSystem.get(conf);
+    Path current = new Path(crawlDb, CrawlDb.CURRENT_NAME);
+    if (!fs.exists(current))
+      fs.mkdirs(current);
+
+    Path tempCrawlDb = new Path(crawlDb,
+        "crawldb-" + Integer.toString(new 
Random().nextInt(Integer.MAX_VALUE)));
+
+    // lock an existing crawldb to prevent multiple simultaneous updates
+    Path lock = new Path(crawlDb, CrawlDb.LOCK_NAME);
+    LockUtil.createLockFile(fs, lock, false);
+
+    // configure job
+    Job job = Job.getInstance(conf, "inject " + urlDir);
+    job.setJarByClass(Injector.class);
+    job.setMapperClass(InjectMapper.class);
+    job.setReducerClass(InjectReducer.class);
+    job.setOutputFormatClass(MapFileOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(CrawlDatum.class);
+    job.setSpeculativeExecution(false);
+
+    // set input and output paths of the job
+    MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class);
+    MultipleInputs.addInputPath(job, urlDir, KeyValueTextInputFormat.class);
+    FileOutputFormat.setOutputPath(job, tempCrawlDb);
+
+    try {
+      // run the job
+      job.waitForCompletion(true);
+
+      // save output and perform cleanup
+      CrawlDb.install(job, crawlDb);
+
+      if (LOG.isInfoEnabled()) {
+        long urlsInjected = job.getCounters()
+            .findCounter("injector", "urls_injected").getValue();
+        long urlsFiltered = job.getCounters()
+            .findCounter("injector", "urls_filtered").getValue();
+        long urlsMerged = job.getCounters()
+            .findCounter("injector", "urls_merged").getValue();
+        LOG.info("Injector: Total urls rejected by filters: " + urlsFiltered);
+        LOG.info(
+            "Injector: Total urls injected after normalization and filtering: "
+                + urlsInjected);
+        LOG.info("Injector: Total urls injected but already in CrawlDb: "
+            + urlsMerged);
+        LOG.info("Injector: Total new urls injected: "
+            + (urlsInjected - urlsMerged));
+
+        long end = System.currentTimeMillis();
+        LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: "
+            + TimingUtil.elapsedTime(start, end));
+      }
+    } catch (IOException e) {
+      if (fs.exists(tempCrawlDb)) {
+        fs.delete(tempCrawlDb, true);
+      }
+      LockUtil.removeLockFile(fs, lock);
+      throw e;
+    }
+  }
+
+  public void usage() {
+    System.err.println(
+        "Usage: Injector <crawldb> <url_dir> [-overwrite] [-update]\n");
+    System.err.println(
+        "  <crawldb>\tPath to a crawldb directory. If not present, a new one 
would be created.");
+    System.err.println(
+        "  <url_dir>\tPath to directory with URL file(s) containing urls to be 
injected. A URL file");
+    System.err.println(
+        "           \tshould have one URL per line, optionally followed by 
custom metadata.");
+    System.err.println(
+        "           \tBlank lines or lines starting with a '#' would be 
ignored. Custom metadata must");
+    System.err
+        .println("           \tbe of form 'key=value' and separated by tabs.");
+    System.err.println("           \tBelow are reserved metadata keys:\n");
+    System.err.println("           \t\tnutch.score: A custom score for a url");
+    System.err.println(
+        "           \t\tnutch.fetchInterval: A custom fetch interval for a 
url");
+    System.err.println(
+        "           \t\tnutch.fetchInterval.fixed: A custom fetch interval for 
a url that is not "
+            + "changed by AdaptiveFetchSchedule\n");
+    System.err.println("           \tExample:");
+    System.err.println("           \t http://www.apache.org/";);
+    System.err.println(
+        "           \t http://www.nutch.org/ \\t nutch.score=10 \\t 
nutch.fetchInterval=2592000 \\t userType=open_source\n");
+    System.err.println(
+        " -overwrite\tOverwite existing crawldb records by the injected 
records. Has precedence over 'update'");
+    System.err.println(
+        " -update   \tUpdate existing crawldb records with the injected 
records. Old metadata is preserved");
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new Injector(), 
args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage();
+      return -1;
+    }
+
+    boolean overwrite = false;
+    boolean update = false;
+
+    for (int i = 2; i < args.length; i++) {
+      if (args[i].equals("-overwrite")) {
+        overwrite = true;
+      } else if (args[i].equals("-update")) {
+        update = true;
+      } else {
+        LOG.info("Injector: Found invalid argument \"" + args[i] + "\"\n");
+        usage();
+        return -1;
+      }
+    }
+
+    try {
+      inject(new Path(args[0]), new Path(args[1]), overwrite, update);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Injector: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+
+  /**
+   * Used by the Nutch REST service
+   */
+  public Map<String, Object> run(Map<String, Object> args, String crawlId)
+      throws Exception {
+    if (args.size() < 1) {
+      throw new IllegalArgumentException("Required arguments <url_dir>");
+    }
+    Map<String, Object> results = new HashMap<String, Object>();
+
+    Path crawlDb;
+    if (args.containsKey(Nutch.ARG_CRAWLDB)) {
+      Object crawldbPath = args.get(Nutch.ARG_CRAWLDB);
+      if (crawldbPath instanceof Path) {
+        crawlDb = (Path) crawldbPath;
+      } else {
+        crawlDb = new Path(crawldbPath.toString());
+      }
+    } else {
+      crawlDb = new Path(crawlId + "/crawldb");
+    }
+
+    Path input;
+    Object path = args.get(Nutch.ARG_SEEDDIR);
+    if (path instanceof Path) {
+      input = (Path) path;
+    } else {
+      input = new Path(path.toString());
+    }
+
+    inject(crawlDb, input);
+    results.put(Nutch.VAL_RESULT, Integer.toString(0));
+    return results;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/Inlink.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/Inlink.java 
b/nutch-core/src/main/java/org/apache/nutch/crawl/Inlink.java
new file mode 100644
index 0000000..67df357
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/Inlink.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.*;
+import org.apache.hadoop.io.*;
+
+/* An incoming link to a page. */
+public class Inlink implements Writable {
+
+  private String fromUrl;
+  private String anchor;
+
+  public Inlink() {
+  }
+
+  public Inlink(String fromUrl, String anchor) {
+    this.fromUrl = fromUrl;
+    this.anchor = anchor;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    fromUrl = Text.readString(in);
+    anchor = Text.readString(in);
+  }
+
+  /** Skips over one Inlink in the input. */
+  public static void skip(DataInput in) throws IOException {
+    Text.skip(in); // skip fromUrl
+    Text.skip(in); // skip anchor
+  }
+
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, fromUrl);
+    Text.writeString(out, anchor);
+  }
+
+  public static Inlink read(DataInput in) throws IOException {
+    Inlink inlink = new Inlink();
+    inlink.readFields(in);
+    return inlink;
+  }
+
+  public String getFromUrl() {
+    return fromUrl;
+  }
+
+  public String getAnchor() {
+    return anchor;
+  }
+
+  public boolean equals(Object o) {
+    if (!(o instanceof Inlink))
+      return false;
+    Inlink other = (Inlink) o;
+    return this.fromUrl.equals(other.fromUrl)
+        && this.anchor.equals(other.anchor);
+  }
+
+  public int hashCode() {
+    return fromUrl.hashCode() ^ anchor.hashCode();
+  }
+
+  public String toString() {
+    return "fromUrl: " + fromUrl + " anchor: " + anchor;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/Inlinks.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/Inlinks.java 
b/nutch-core/src/main/java/org/apache/nutch/crawl/Inlinks.java
new file mode 100644
index 0000000..89f9731
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/Inlinks.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+import org.apache.hadoop.io.*;
+
+/** A list of {@link Inlink}s. */
+public class Inlinks implements Writable {
+  private HashSet<Inlink> inlinks = new HashSet<Inlink>(1);
+
+  public void add(Inlink inlink) {
+    inlinks.add(inlink);
+  }
+
+  public void add(Inlinks inlinks) {
+    this.inlinks.addAll(inlinks.inlinks);
+  }
+
+  public Iterator<Inlink> iterator() {
+    return this.inlinks.iterator();
+  }
+
+  public int size() {
+    return inlinks.size();
+  }
+
+  public void clear() {
+    inlinks.clear();
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    int length = in.readInt();
+    inlinks.clear();
+    for (int i = 0; i < length; i++) {
+      add(Inlink.read(in));
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(inlinks.size());
+    Iterator<Inlink> it = inlinks.iterator();
+    while (it.hasNext()) {
+      it.next().write(out);
+    }
+  }
+
+  public String toString() {
+    StringBuffer buffer = new StringBuffer();
+    buffer.append("Inlinks:\n");
+    Iterator<Inlink> it = inlinks.iterator();
+    while (it.hasNext()) {
+      buffer.append(" ");
+      buffer.append(it.next());
+      buffer.append("\n");
+    }
+    return buffer.toString();
+  }
+
+  /**
+   * Return the set of anchor texts. Only a single anchor with a given text is
+   * permitted from a given domain.
+   */
+  public String[] getAnchors() {
+    HashMap<String, Set<String>> domainToAnchors = new HashMap<String, 
Set<String>>();
+    ArrayList<String> results = new ArrayList<String>();
+    Iterator<Inlink> it = inlinks.iterator();
+    while (it.hasNext()) {
+      Inlink inlink = it.next();
+      String anchor = inlink.getAnchor();
+
+      if (anchor.length() == 0) // skip empty anchors
+        continue;
+      String domain = null; // extract domain name
+      try {
+        domain = new URL(inlink.getFromUrl()).getHost();
+      } catch (MalformedURLException e) {
+      }
+      Set<String> domainAnchors = domainToAnchors.get(domain);
+      if (domainAnchors == null) {
+        domainAnchors = new HashSet<String>();
+        domainToAnchors.put(domain, domainAnchors);
+      }
+      if (domainAnchors.add(anchor)) { // new anchor from domain
+        results.add(anchor); // collect it
+      }
+    }
+
+    return results.toArray(new String[results.size()]);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDb.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDb.java 
b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDb.java
new file mode 100644
index 0000000..908a8e9
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDb.java
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.*;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.net.*;
+
+// Commons Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.*;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.TimingUtil;
+
+/** Maintains an inverted link map, listing incoming links for each url. */
+public class LinkDb extends NutchTool implements Tool,
+    Mapper<Text, ParseData, Text, Inlinks> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(LinkDb.class);
+
+  public static final String IGNORE_INTERNAL_LINKS = 
"linkdb.ignore.internal.links";
+  public static final String IGNORE_EXTERNAL_LINKS = 
"linkdb.ignore.external.links";
+
+  public static final String CURRENT_NAME = "current";
+  public static final String LOCK_NAME = ".locked";
+
+  private int maxAnchorLength;
+  private boolean ignoreInternalLinks;
+  private boolean ignoreExternalLinks;
+  private URLFilters urlFilters;
+  private URLNormalizers urlNormalizers;
+
+  public LinkDb() {
+  }
+
+  public LinkDb(Configuration conf) {
+    setConf(conf);
+  }
+
+  public void configure(JobConf job) {
+    maxAnchorLength = job.getInt("linkdb.max.anchor.length", 100);
+    ignoreInternalLinks = job.getBoolean(IGNORE_INTERNAL_LINKS, true);
+    ignoreExternalLinks = job.getBoolean(IGNORE_EXTERNAL_LINKS, false);
+
+    if (job.getBoolean(LinkDbFilter.URL_FILTERING, false)) {
+      urlFilters = new URLFilters(job);
+    }
+    if (job.getBoolean(LinkDbFilter.URL_NORMALIZING, false)) {
+      urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_LINKDB);
+    }
+  }
+
+  public void close() {
+  }
+
+  public void map(Text key, ParseData parseData,
+      OutputCollector<Text, Inlinks> output, Reporter reporter)
+      throws IOException {
+    String fromUrl = key.toString();
+    String fromHost = getHost(fromUrl);
+    if (urlNormalizers != null) {
+      try {
+        fromUrl = urlNormalizers
+            .normalize(fromUrl, URLNormalizers.SCOPE_LINKDB); // normalize the
+                                                              // url
+      } catch (Exception e) {
+        LOG.warn("Skipping " + fromUrl + ":" + e);
+        fromUrl = null;
+      }
+    }
+    if (fromUrl != null && urlFilters != null) {
+      try {
+        fromUrl = urlFilters.filter(fromUrl); // filter the url
+      } catch (Exception e) {
+        LOG.warn("Skipping " + fromUrl + ":" + e);
+        fromUrl = null;
+      }
+    }
+    if (fromUrl == null)
+      return; // discard all outlinks
+    Outlink[] outlinks = parseData.getOutlinks();
+    Inlinks inlinks = new Inlinks();
+    for (int i = 0; i < outlinks.length; i++) {
+      Outlink outlink = outlinks[i];
+      String toUrl = outlink.getToUrl();
+
+      if (ignoreInternalLinks) {
+        String toHost = getHost(toUrl);
+        if (toHost == null || toHost.equals(fromHost)) { // internal link
+          continue; // skip it
+        }
+      } else if (ignoreExternalLinks) {
+        String toHost = getHost(toUrl);
+        if (toHost == null || !toHost.equals(fromHost)) { // external link
+          continue;                               // skip it
+        }
+      }
+      if (urlNormalizers != null) {
+        try {
+          toUrl = urlNormalizers.normalize(toUrl, 
URLNormalizers.SCOPE_LINKDB); // normalize
+                                                                               
 // the
+                                                                               
 // url
+        } catch (Exception e) {
+          LOG.warn("Skipping " + toUrl + ":" + e);
+          toUrl = null;
+        }
+      }
+      if (toUrl != null && urlFilters != null) {
+        try {
+          toUrl = urlFilters.filter(toUrl); // filter the url
+        } catch (Exception e) {
+          LOG.warn("Skipping " + toUrl + ":" + e);
+          toUrl = null;
+        }
+      }
+      if (toUrl == null)
+        continue;
+      inlinks.clear();
+      String anchor = outlink.getAnchor(); // truncate long anchors
+      if (anchor.length() > maxAnchorLength) {
+        anchor = anchor.substring(0, maxAnchorLength);
+      }
+      inlinks.add(new Inlink(fromUrl, anchor)); // collect inverted link
+      output.collect(new Text(toUrl), inlinks);
+    }
+  }
+
+  private String getHost(String url) {
+    try {
+      return new URL(url).getHost().toLowerCase();
+    } catch (MalformedURLException e) {
+      return null;
+    }
+  }
+
+  public void invert(Path linkDb, final Path segmentsDir, boolean normalize,
+      boolean filter, boolean force) throws IOException {
+    final FileSystem fs = FileSystem.get(getConf());
+    FileStatus[] files = fs.listStatus(segmentsDir,
+        HadoopFSUtil.getPassDirectoriesFilter(fs));
+    invert(linkDb, HadoopFSUtil.getPaths(files), normalize, filter, force);
+  }
+
+  public void invert(Path linkDb, Path[] segments, boolean normalize,
+      boolean filter, boolean force) throws IOException {
+    JobConf job = LinkDb.createJob(getConf(), linkDb, normalize, filter);
+    Path lock = new Path(linkDb, LOCK_NAME);
+    FileSystem fs = FileSystem.get(getConf());
+    LockUtil.createLockFile(fs, lock, force);
+    Path currentLinkDb = new Path(linkDb, CURRENT_NAME);
+
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("LinkDb: starting at " + sdf.format(start));
+      LOG.info("LinkDb: linkdb: " + linkDb);
+      LOG.info("LinkDb: URL normalize: " + normalize);
+      LOG.info("LinkDb: URL filter: " + filter);
+      if (job.getBoolean(IGNORE_INTERNAL_LINKS, true)) {
+        LOG.info("LinkDb: internal links will be ignored.");
+      }
+      if (job.getBoolean(IGNORE_EXTERNAL_LINKS, false)) {
+        LOG.info("LinkDb: external links will be ignored.");
+      }
+    }
+    if (job.getBoolean(IGNORE_INTERNAL_LINKS, true)
+        && job.getBoolean(IGNORE_EXTERNAL_LINKS, false)) {
+      LOG.warn("LinkDb: internal and external links are ignored! "
+          + "Nothing to do, actually. Exiting.");
+      LockUtil.removeLockFile(fs, lock);
+      return;
+    }
+
+    for (int i = 0; i < segments.length; i++) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("LinkDb: adding segment: " + segments[i]);
+      }
+      FileInputFormat.addInputPath(job, new Path(segments[i],
+          ParseData.DIR_NAME));
+    }
+    try {
+      JobClient.runJob(job);
+    } catch (IOException e) {
+      LockUtil.removeLockFile(fs, lock);
+      throw e;
+    }
+    if (fs.exists(currentLinkDb)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("LinkDb: merging with existing linkdb: " + linkDb);
+      }
+      // try to merge
+      Path newLinkDb = FileOutputFormat.getOutputPath(job);
+      job = LinkDbMerger.createMergeJob(getConf(), linkDb, normalize, filter);
+      FileInputFormat.addInputPath(job, currentLinkDb);
+      FileInputFormat.addInputPath(job, newLinkDb);
+      try {
+        JobClient.runJob(job);
+      } catch (IOException e) {
+        LockUtil.removeLockFile(fs, lock);
+        fs.delete(newLinkDb, true);
+        throw e;
+      }
+      fs.delete(newLinkDb, true);
+    }
+    LinkDb.install(job, linkDb);
+
+    long end = System.currentTimeMillis();
+    LOG.info("LinkDb: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
+  }
+
+  private static JobConf createJob(Configuration config, Path linkDb,
+      boolean normalize, boolean filter) {
+    Path newLinkDb = new Path("linkdb-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new NutchJob(config);
+    job.setJobName("linkdb " + linkDb);
+
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    job.setMapperClass(LinkDb.class);
+    job.setCombinerClass(LinkDbMerger.class);
+    // if we don't run the mergeJob, perform normalization/filtering now
+    if (normalize || filter) {
+      try {
+        FileSystem fs = FileSystem.get(config);
+        if (!fs.exists(linkDb)) {
+          job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
+          job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
+        }
+      } catch (Exception e) {
+        LOG.warn("LinkDb createJob: " + e);
+      }
+    }
+    job.setReducerClass(LinkDbMerger.class);
+
+    FileOutputFormat.setOutputPath(job, newLinkDb);
+    job.setOutputFormat(MapFileOutputFormat.class);
+    job.setBoolean("mapred.output.compress", true);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Inlinks.class);
+
+    return job;
+  }
+
+  public static void install(JobConf job, Path linkDb) throws IOException {
+    Path newLinkDb = FileOutputFormat.getOutputPath(job);
+    FileSystem fs = new JobClient(job).getFs();
+    Path old = new Path(linkDb, "old");
+    Path current = new Path(linkDb, CURRENT_NAME);
+    if (fs.exists(current)) {
+      if (fs.exists(old))
+        fs.delete(old, true);
+      fs.rename(current, old);
+    }
+    fs.mkdirs(linkDb);
+    fs.rename(newLinkDb, current);
+    if (fs.exists(old))
+      fs.delete(old, true);
+    LockUtil.removeLockFile(fs, new Path(linkDb, LOCK_NAME));
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new LinkDb(), args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err
+          .println("Usage: LinkDb <linkdb> (-dir <segmentsDir> | <seg1> <seg2> 
...) [-force] [-noNormalize] [-noFilter]");
+      System.err.println("\tlinkdb\toutput LinkDb to create or update");
+      System.err
+          .println("\t-dir segmentsDir\tparent directory of several segments, 
OR");
+      System.err.println("\tseg1 seg2 ...\t list of segment directories");
+      System.err
+          .println("\t-force\tforce update even if LinkDb appears to be locked 
(CAUTION advised)");
+      System.err.println("\t-noNormalize\tdon't normalize link URLs");
+      System.err.println("\t-noFilter\tdon't apply URLFilters to link URLs");
+      return -1;
+    }
+    final FileSystem fs = FileSystem.get(getConf());
+    Path db = new Path(args[0]);
+    ArrayList<Path> segs = new ArrayList<Path>();
+    boolean filter = true;
+    boolean normalize = true;
+    boolean force = false;
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-dir")) {
+        FileStatus[] paths = fs.listStatus(new Path(args[++i]),
+            HadoopFSUtil.getPassDirectoriesFilter(fs));
+        segs.addAll(Arrays.asList(HadoopFSUtil.getPaths(paths)));
+      } else if (args[i].equalsIgnoreCase("-noNormalize")) {
+        normalize = false;
+      } else if (args[i].equalsIgnoreCase("-noFilter")) {
+        filter = false;
+      } else if (args[i].equalsIgnoreCase("-force")) {
+        force = true;
+      } else
+        segs.add(new Path(args[i]));
+    }
+    try {
+      invert(db, segs.toArray(new Path[segs.size()]), normalize, filter, 
force);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("LinkDb: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+
+  /*
+   * Used for Nutch REST service
+   */
+  @Override
+  public Map<String, Object> run(Map<String, Object> args, String crawlId) 
throws Exception {
+
+    Map<String, Object> results = new HashMap<String, Object>();
+
+    Path linkdb;
+    if(args.containsKey(Nutch.ARG_LINKDB)) {
+      Object path = args.get(Nutch.ARG_LINKDB);
+      if(path instanceof Path) {
+        linkdb = (Path) path;
+      }
+      else {
+        linkdb = new Path(path.toString());
+      }
+    }
+    else {
+      linkdb = new Path(crawlId+"/linkdb");
+    }
+
+
+    ArrayList<Path> segs = new ArrayList<Path>();
+    boolean filter = true;
+    boolean normalize = true;
+    boolean force = false;
+    if (args.containsKey("noNormalize")) {
+      normalize = false;
+    } 
+    if (args.containsKey("noFilter")) {
+      filter = false;
+    } 
+    if (args.containsKey("force")) {
+      force = true;
+    }
+
+    Path segmentsDir;
+    final FileSystem fs = FileSystem.get(getConf());
+    if(args.containsKey(Nutch.ARG_SEGMENTDIR)) {
+      Object segDir = args.get(Nutch.ARG_SEGMENTDIR);
+      if(segDir instanceof Path) {
+        segmentsDir = (Path) segDir;
+      }
+      else {
+        segmentsDir = new Path(segDir.toString());
+      }
+      FileStatus[] paths = fs.listStatus(segmentsDir,
+          HadoopFSUtil.getPassDirectoriesFilter(fs));
+      segs.addAll(Arrays.asList(HadoopFSUtil.getPaths(paths)));
+    }
+    else if(args.containsKey(Nutch.ARG_SEGMENT)) {
+      Object segments = args.get(Nutch.ARG_SEGMENT);
+      ArrayList<String> segmentList = new ArrayList<String>();
+      if(segments instanceof ArrayList) {
+        segmentList = (ArrayList<String>)segments;
+      }
+      for(String segment: segmentList) {
+        segs.add(new Path(segment));
+      }
+    }
+    else {
+      String segment_dir = crawlId+"/segments";
+      File dir = new File(segment_dir);
+      File[] segmentsList = dir.listFiles();  
+      Arrays.sort(segmentsList, new Comparator<File>(){
+        @Override
+        public int compare(File f1, File f2) {
+          if(f1.lastModified()>f2.lastModified())
+            return -1;
+          else
+            return 0;
+        }      
+      });
+      segs.add(new Path(segmentsList[0].getPath()));
+    }
+    try {
+      invert(linkdb, segs.toArray(new Path[segs.size()]), normalize, filter, 
force);
+      results.put(Nutch.VAL_RESULT, Integer.toString(0));
+      return results;
+    } catch (Exception e) {
+      LOG.error("LinkDb: " + StringUtils.stringifyException(e));
+      results.put(Nutch.VAL_RESULT, Integer.toString(-1));
+      return results;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbFilter.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbFilter.java 
b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbFilter.java
new file mode 100644
index 0000000..1ff9b05
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbFilter.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+
+/**
+ * This class provides a way to separate the URL normalization and filtering
+ * steps from the rest of LinkDb manipulation code.
+ * 
+ * @author Andrzej Bialecki
+ */
+public class LinkDbFilter implements Mapper<Text, Inlinks, Text, Inlinks> {
+  public static final String URL_FILTERING = "linkdb.url.filters";
+
+  public static final String URL_NORMALIZING = "linkdb.url.normalizer";
+
+  public static final String URL_NORMALIZING_SCOPE = 
"linkdb.url.normalizer.scope";
+
+  private boolean filter;
+
+  private boolean normalize;
+
+  private URLFilters filters;
+
+  private URLNormalizers normalizers;
+
+  private String scope;
+
+  public static final Logger LOG = LoggerFactory.getLogger(LinkDbFilter.class);
+
+  private Text newKey = new Text();
+
+  public void configure(JobConf job) {
+    filter = job.getBoolean(URL_FILTERING, false);
+    normalize = job.getBoolean(URL_NORMALIZING, false);
+    if (filter) {
+      filters = new URLFilters(job);
+    }
+    if (normalize) {
+      scope = job.get(URL_NORMALIZING_SCOPE, URLNormalizers.SCOPE_LINKDB);
+      normalizers = new URLNormalizers(job, scope);
+    }
+  }
+
+  public void close() {
+  }
+
+  public void map(Text key, Inlinks value,
+      OutputCollector<Text, Inlinks> output, Reporter reporter)
+      throws IOException {
+    String url = key.toString();
+    Inlinks result = new Inlinks();
+    if (normalize) {
+      try {
+        url = normalizers.normalize(url, scope); // normalize the url
+      } catch (Exception e) {
+        LOG.warn("Skipping " + url + ":" + e);
+        url = null;
+      }
+    }
+    if (url != null && filter) {
+      try {
+        url = filters.filter(url); // filter the url
+      } catch (Exception e) {
+        LOG.warn("Skipping " + url + ":" + e);
+        url = null;
+      }
+    }
+    if (url == null)
+      return; // didn't pass the filters
+    Iterator<Inlink> it = value.iterator();
+    String fromUrl = null;
+    while (it.hasNext()) {
+      Inlink inlink = it.next();
+      fromUrl = inlink.getFromUrl();
+      if (normalize) {
+        try {
+          fromUrl = normalizers.normalize(fromUrl, scope); // normalize the url
+        } catch (Exception e) {
+          LOG.warn("Skipping " + fromUrl + ":" + e);
+          fromUrl = null;
+        }
+      }
+      if (fromUrl != null && filter) {
+        try {
+          fromUrl = filters.filter(fromUrl); // filter the url
+        } catch (Exception e) {
+          LOG.warn("Skipping " + fromUrl + ":" + e);
+          fromUrl = null;
+        }
+      }
+      if (fromUrl != null) {
+        result.add(new Inlink(fromUrl, inlink.getAnchor()));
+      }
+    }
+    if (result.size() > 0) { // don't collect empty inlinks
+      newKey.set(url);
+      output.collect(newKey, result);
+    }
+  }
+}

Reply via email to