Author: jnioche
Date: Mon Mar 22 16:19:12 2010
New Revision: 926155

URL: http://svn.apache.org/viewvc?rev=926155&view=rev
Log:
NUTCH-762 : Generator can generate several segments in one parse of the crawlDB

Added:
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/URLPartitioner.java
Removed:
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java
Modified:
    lucene/nutch/trunk/CHANGES.txt
    lucene/nutch/trunk/conf/nutch-default.xml
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Crawl.java
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java
    lucene/nutch/trunk/src/java/org/apache/nutch/net/URLNormalizers.java
    lucene/nutch/trunk/src/java/org/apache/nutch/tools/FreeGenerator.java
    lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java
    lucene/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java

Modified: lucene/nutch/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?rev=926155&r1=926154&r2=926155&view=diff
==============================================================================
--- lucene/nutch/trunk/CHANGES.txt (original)
+++ lucene/nutch/trunk/CHANGES.txt Mon Mar 22 16:19:12 2010
@@ -2,6 +2,8 @@ Nutch Change Log
 
 Unreleased Changes
 
+* NUTCH-762 Generator can generate several segments in one parse of the 
crawlDB (jnioche)
+
 * NUTCH-740 Configuration option to override default language for fetched 
pages (Marcin Okraszewski via jnioche)
 
 * NUTCH-803 Upgrade to Hadoop 0.20.2 (ab)

Modified: lucene/nutch/trunk/conf/nutch-default.xml
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/conf/nutch-default.xml?rev=926155&r1=926154&r2=926155&view=diff
==============================================================================
--- lucene/nutch/trunk/conf/nutch-default.xml (original)
+++ lucene/nutch/trunk/conf/nutch-default.xml Mon Mar 22 16:19:12 2010
@@ -514,24 +514,21 @@
 <!-- generate properties -->
 
 <property>
-  <name>generate.max.per.host</name>
+  <name>generate.max.count</name>
   <value>-1</value>
-  <description>The maximum number of urls per host in a single
-  fetchlist.  -1 if unlimited.</description>
+  <description>The maximum number of urls in a single
+  fetchlist.  -1 if unlimited. The urls are counted according
+  to the value of the parameter generator.count.mode.
+  </description>
 </property>
 
 <property>
-  <name>generate.max.per.host.by.ip</name>
-  <value>false</value>
-  <description>If false, same host names are counted. If true,
-  hosts' IP addresses are resolved and the same IP-s are counted.
-  
-  -+-+-+- WARNING !!! -+-+-+-
-  When set to true, Generator will create a lot of DNS lookup
-  requests, rapidly. This may cause a DOS attack on
-  remote DNS servers, not to mention increased external traffic
-  and latency. For these reasons when using this option it is
-  required that a local caching DNS be used.</description>
+  <name>generate.count.mode</name>
+  <value>host</value>
+  <description>Determines how the URLs are counted for generator.max.count.
+  Default value is 'host' but can be 'domain'. Note that we do not count 
+  per IP in the new version of the Generator.
+  </description>
 </property>
 
 <property>
@@ -545,6 +542,34 @@
   updatedb will generate identical fetchlists.</description>
 </property>
 
+<property>
+  <name>generate.max.per.host</name>
+  <value>-1</value>
+  <description>(Deprecated). Use generate.max.count and generate.count.mode 
instead.
+  The maximum number of urls per host in a single
+  fetchlist.  -1 if unlimited.</description>
+</property>
+
+<!-- urlpartitioner properties -->
+<property>
+  <name>partition.url.mode</name>
+  <value>byHost</value>
+  <description>Determines how to partition URLs. Default value is 'byHost', 
+  also takes 'byDomain' or 'byIP'. 
+  </description>
+</property>
+
+<property>
+  <name>crawl.gen.delay</name>
+  <value>604800000</value>
+  <description>
+   This value, expressed in days, defines how long we should keep the lock on 
records 
+   in CrawlDb that were just selected for fetching. If these records are not 
updated 
+   in the meantime, the lock is canceled, i.e. the become eligible for 
selecting. 
+   Default value of this is 7 days.
+  </description>
+</property>
+
 <!-- fetcher properties -->
 
 <property>

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Crawl.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Crawl.java?rev=926155&r1=926154&r2=926155&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Crawl.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Crawl.java Mon Mar 22 
16:19:12 2010
@@ -124,17 +124,17 @@ public class Crawl {
     injector.inject(crawlDb, rootUrlDir);
     int i;
     for (i = 0; i < depth; i++) {             // generate new segment
-      Path segment = generator.generate(crawlDb, segments, -1, topN, System
+      Path[] segs = generator.generate(crawlDb, segments, -1, topN, System
           .currentTimeMillis());
-      if (segment == null) {
+      if (segments == null) {
         LOG.info("Stopping at depth=" + i + " - no more URLs to fetch.");
         break;
       }
-      fetcher.fetch(segment, threads, 
org.apache.nutch.fetcher.Fetcher.isParsing(conf));  // fetch it
+      fetcher.fetch(segs[0], threads, 
org.apache.nutch.fetcher.Fetcher.isParsing(conf));  // fetch it
       if (!Fetcher.isParsing(job)) {
-        parseSegment.parse(segment);    // parse it, if needed
+        parseSegment.parse(segs[0]);    // parse it, if needed
       }
-      crawlDbTool.update(crawlDb, new Path[]{segment}, true, true); // update 
crawldb
+      crawlDbTool.update(crawlDb, segs, true, true); // update crawldb
     }
     if (i > 0) {
       linkDbTool.invert(linkDb, segments, true, true, false); // invert links

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java?rev=926155&r1=926154&r2=926155&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java Mon Mar 
22 16:19:12 2010
@@ -29,7 +29,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
 import org.apache.hadoop.util.*;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -42,98 +44,133 @@ import org.apache.nutch.scoring.ScoringF
 import org.apache.nutch.util.LockUtil;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.URLUtil;
 
-/** Generates a subset of a crawl db to fetch. */
+/**
+ * Generates a subset of a crawl db to fetch. This version allows to generate
+ * fetchlists for several segments in one go. Unlike in the initial version
+ * (OldGenerator), the IP resolution is done ONLY on the entries which have 
been
+ * selected for fetching. The URLs are partitioned by IP, domain or host 
within a 
+ * segment. We can chose separately how to count the URLS i.e. by domain or 
host
+ * to limit the entries.
+ **/
 public class Generator extends Configured implements Tool {
 
-  public static final String CRAWL_GENERATE_FILTER = "crawl.generate.filter";
-  public static final String GENERATE_MAX_PER_HOST_BY_IP = 
"generate.max.per.host.by.ip";
-  public static final String GENERATE_MAX_PER_HOST = "generate.max.per.host";
-  public static final String GENERATE_UPDATE_CRAWLDB = 
"generate.update.crawldb";
-  public static final String CRAWL_TOP_N = "crawl.topN";
-  public static final String CRAWL_GEN_CUR_TIME = "crawl.gen.curTime";
-  public static final String CRAWL_GEN_DELAY = "crawl.gen.delay";
   public static final Log LOG = LogFactory.getLog(Generator.class);
+
+  public static final String GENERATE_UPDATE_CRAWLDB = 
"generate.update.crawldb";
+  public static final String GENERATOR_MIN_SCORE = "generate.min.score";
+  public static final String GENERATOR_FILTER = "generate.filter";
+  public static final String GENERATOR_NORMALISE = "generate.normalise";
+  public static final String GENERATOR_MAX_COUNT = "generate.max.count";
+  public static final String GENERATOR_COUNT_MODE = "generate.count.mode";
+  public static final String GENERATOR_COUNT_VALUE_DOMAIN = "domain";
+  public static final String GENERATOR_COUNT_VALUE_HOST = "host";
+  public static final String GENERATOR_TOP_N = "generate.topN";
+  public static final String GENERATOR_CUR_TIME = "generate.curTime";
+  public static final String GENERATOR_DELAY = "crawl.gen.delay";
+  public static final String GENERATOR_MAX_NUM_SEGMENTS = 
"generate.max.num.segments";
   
+  // deprecated parameters 
+  public static final String GENERATE_MAX_PER_HOST_BY_IP = 
"generate.max.per.host.by.ip";
+  public static final String GENERATE_MAX_PER_HOST = "generate.max.per.host";
+
   public static class SelectorEntry implements Writable {
     public Text url;
     public CrawlDatum datum;
-    
+    public IntWritable segnum;
+
     public SelectorEntry() {
       url = new Text();
       datum = new CrawlDatum();
+      segnum = new IntWritable(0);
     }
 
     public void readFields(DataInput in) throws IOException {
       url.readFields(in);
       datum.readFields(in);
+      segnum.readFields(in);
     }
 
     public void write(DataOutput out) throws IOException {
       url.write(out);
       datum.write(out);
+      segnum.write(out);
     }
-    
+
     public String toString() {
-      return "url=" + url.toString() + ", datum=" + datum.toString();
+      return "url=" + url.toString() + ", datum=" + datum.toString() + ", 
segnum="
+          + segnum.toString();
     }
   }
 
   /** Selects entries due for fetch. */
-  public static class Selector implements Mapper<Text, CrawlDatum, 
FloatWritable, SelectorEntry>, Partitioner<FloatWritable, Writable>, 
Reducer<FloatWritable, SelectorEntry, FloatWritable, SelectorEntry> {
+  public static class Selector implements
+      Mapper<Text,CrawlDatum,FloatWritable,SelectorEntry>,
+      Partitioner<FloatWritable,Writable>,
+      Reducer<FloatWritable,SelectorEntry,FloatWritable,SelectorEntry> {
     private LongWritable genTime = new 
LongWritable(System.currentTimeMillis());
     private long curTime;
     private long limit;
     private long count;
-    private HashMap<String, IntWritable> hostCounts =
-      new HashMap<String, IntWritable>();
-    private int maxPerHost;
-    private HashSet<String> maxedHosts = new HashSet<String>();
-    private HashSet<String> dnsFailureHosts = new HashSet<String>();
-    private Partitioner<Text, Writable> hostPartitioner = new 
PartitionUrlByHost();
+    private HashMap<String,int[]> hostCounts = new HashMap<String,int[]>();
+    private int maxCount;
+    private boolean byDomain = false;
+    private Partitioner<Text,Writable> partitioner = new URLPartitioner();
     private URLFilters filters;
     private URLNormalizers normalizers;
     private ScoringFilters scfilters;
     private SelectorEntry entry = new SelectorEntry();
     private FloatWritable sortValue = new FloatWritable();
-    private boolean byIP;
-    private long dnsFailure = 0L;
     private boolean filter;
+    private boolean normalise;
     private long genDelay;
     private FetchSchedule schedule;
+    private float scoreThreshold = 0f;
+    private int maxNumSegments = 1;
+    int currentsegmentnum = 1;
 
     public void configure(JobConf job) {
-      curTime = job.getLong(CRAWL_GEN_CUR_TIME, System.currentTimeMillis());
-      limit = job.getLong(CRAWL_TOP_N,Long.MAX_VALUE)/job.getNumReduceTasks();
-      maxPerHost = job.getInt(GENERATE_MAX_PER_HOST, -1);
-      byIP = job.getBoolean(GENERATE_MAX_PER_HOST_BY_IP, false);
+      curTime = job.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis());
+      limit = job.getLong(GENERATOR_TOP_N, Long.MAX_VALUE) / 
job.getNumReduceTasks();
+      maxCount = job.getInt(GENERATOR_MAX_COUNT, -1);
+      // back compatibility with old param
+      int oldMaxPerHost = job.getInt(GENERATE_MAX_PER_HOST, -1);
+      if (maxCount==-1 && oldMaxPerHost!=-1){
+        maxCount = oldMaxPerHost;
+        byDomain = false;
+      }
+      if (GENERATOR_COUNT_VALUE_DOMAIN.equals(job.get(GENERATOR_COUNT_MODE))) 
byDomain = true;
       filters = new URLFilters(job);
-      normalizers = new URLNormalizers(job, 
URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+      normalise = job.getBoolean(GENERATOR_NORMALISE, true);
+      if (normalise) normalizers = new URLNormalizers(job,
+          URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
       scfilters = new ScoringFilters(job);
-      hostPartitioner.configure(job);
-      filter = job.getBoolean(CRAWL_GENERATE_FILTER, true);
-      genDelay = job.getLong(CRAWL_GEN_DELAY, 7L) * 3600L * 24L * 1000L;
+      partitioner.configure(job);
+      filter = job.getBoolean(GENERATOR_FILTER, true);
+      genDelay = job.getLong(GENERATOR_DELAY, 7L) * 3600L * 24L * 1000L;
       long time = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
       if (time > 0) genTime.set(time);
       schedule = FetchScheduleFactory.getFetchSchedule(job);
+      scoreThreshold = job.getFloat(GENERATOR_MIN_SCORE, Float.NaN);
+      maxNumSegments = job.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1);
     }
 
     public void close() {}
 
     /** Select & invert subset due for fetch. */
     public void map(Text key, CrawlDatum value,
-                    OutputCollector<FloatWritable, SelectorEntry> output, 
Reporter reporter)
-      throws IOException {
+        OutputCollector<FloatWritable,SelectorEntry> output, Reporter reporter)
+        throws IOException {
       Text url = key;
       if (filter) {
-        // If filtering is on don't generate URLs that don't pass URLFilters
+        // If filtering is on don't generate URLs that don't pass
+        // URLFilters
         try {
-          if (filters.filter(url.toString()) == null)
-            return;
+          if (filters.filter(url.toString()) == null) return;
         } catch (URLFilterException e) {
           if (LOG.isWarnEnabled()) {
-            LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage()
-                + ")");
+            LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + 
")");
           }
         }
       }
@@ -141,169 +178,166 @@ public class Generator extends Configure
 
       // check fetch schedule
       if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
-        LOG.debug("-shouldFetch rejected '" + url+ "', fetchTime=" + 
crawlDatum.getFetchTime() + ", curTime=" + curTime);
+        LOG.debug("-shouldFetch rejected '" + url + "', fetchTime="
+            + crawlDatum.getFetchTime() + ", curTime=" + curTime);
         return;
       }
 
-      LongWritable oldGenTime = 
(LongWritable)crawlDatum.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
+      LongWritable oldGenTime = (LongWritable) crawlDatum.getMetaData().get(
+          Nutch.WRITABLE_GENERATE_TIME_KEY);
       if (oldGenTime != null) { // awaiting fetch & update
-        if (oldGenTime.get() + genDelay > curTime) // still wait for update
-          return;
+        if (oldGenTime.get() + genDelay > curTime) // still wait for
+        // update
+        return;
       }
       float sort = 1.0f;
       try {
-        sort = scfilters.generatorSortValue((Text)key, crawlDatum, sort);
+        sort = scfilters.generatorSortValue((Text) key, crawlDatum, sort);
       } catch (ScoringFilterException sfe) {
         if (LOG.isWarnEnabled()) {
           LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + 
sfe);
         }
       }
+
+      // consider only entries with a score superior to the threshold
+      if (scoreThreshold != Float.NaN && sort < scoreThreshold) return;
+
       // sort by decreasing score, using DecreasingFloatComparator
       sortValue.set(sort);
       // record generation time
       crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
       entry.datum = crawlDatum;
-      entry.url = (Text)key;
-      output.collect(sortValue, entry);          // invert for sort by score
+      entry.url = (Text) key;
+      output.collect(sortValue, entry); // invert for sort by score
     }
 
-    /** Partition by host. */
-    public int getPartition(FloatWritable key, Writable value,
-                            int numReduceTasks) {
-      return hostPartitioner.getPartition(((SelectorEntry)value).url, key,
-                                          numReduceTasks);
+    /** Partition by host / domain or IP. */
+    public int getPartition(FloatWritable key, Writable value, int 
numReduceTasks) {
+      return partitioner.getPartition(((SelectorEntry) value).url, key, 
numReduceTasks);
     }
 
     /** Collect until limit is reached. */
     public void reduce(FloatWritable key, Iterator<SelectorEntry> values,
-                       OutputCollector<FloatWritable, SelectorEntry> output,
-                       Reporter reporter)
-      throws IOException {
+        OutputCollector<FloatWritable,SelectorEntry> output, Reporter reporter)
+        throws IOException {
+
+      while (values.hasNext()) {
 
-      while (values.hasNext() && count < limit) {
+        if (count == limit) {
+          // do we have any segments left?
+          if (currentsegmentnum < maxNumSegments) {
+            count = 0;
+            currentsegmentnum++;
+          } else break;
+        }
 
         SelectorEntry entry = values.next();
-        Text url = entry.url;        
-        String urlString = url.toString();        
+        Text url = entry.url;
+        String urlString = url.toString();
         URL u = null;
-        
-        // skip bad urls, including empty and null urls
+
+        String hostordomain = null;
+
         try {
-          u = new URL(url.toString());
-        } catch (MalformedURLException e) {
-          LOG.info("Bad protocol in url: " + url.toString());
-          continue;
-        }
-        
-        String host = u.getHost();
-        host = host.toLowerCase();
-        String hostname = host;
-
-        // partitioning by ip will generate lots of DNS requests here, and 
will 
-        // be up to double the overall dns load, do not run this way unless you
-        // are running a local caching DNS server or a two layer DNS cache
-        if (byIP) {
-          if (maxedHosts.contains(host)) {
-            if (LOG.isDebugEnabled()) { LOG.debug("Host already maxed out: " + 
host); }
-            continue;
+          if (normalise && normalizers != null) {
+            urlString = normalizers.normalize(urlString,
+                URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
           }
-          if (dnsFailureHosts.contains(host)) {
-            if (LOG.isDebugEnabled()) { LOG.debug("Host name lookup already 
failed: " + host); }
-            continue;
+          u = new URL(urlString);
+          if (byDomain) {
+            hostordomain = URLUtil.getDomainName(u);
+          } else {
+            hostordomain = new URL(urlString).getHost();
           }
-          try {
-            InetAddress ia = InetAddress.getByName(host);
-            host = ia.getHostAddress();
-            urlString = new URL(u.getProtocol(), host, u.getPort(), 
u.getFile()).toString();
-          } 
-          catch (UnknownHostException uhe) {
-            // remember hostnames that could not be looked up
-            dnsFailureHosts.add(hostname);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("DNS lookup failed: " + host + ", skipping.");
-            }
-            dnsFailure++;
-            if ((dnsFailure % 1000 == 0) && (LOG.isWarnEnabled())) {
-              LOG.warn("DNS failures: " + dnsFailure);
-            }
-            continue;
-          }
-        }
-        
-        try {
-          urlString = normalizers.normalize(urlString, 
URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
-          host = new URL(urlString).getHost();
         } catch (Exception e) {
-          LOG.warn("Malformed URL: '" + urlString + "', skipping (" +
-              StringUtils.stringifyException(e) + ")");
+          LOG.warn("Malformed URL: '" + urlString + "', skipping ("
+              + StringUtils.stringifyException(e) + ")");
           continue;
         }
-        
-        // only filter if we are counting hosts
-        if (maxPerHost > 0) {
-          
-          IntWritable hostCount = hostCounts.get(host);
+
+        hostordomain = hostordomain.toLowerCase();
+
+        // only filter if we are counting hosts or domains
+        if (maxCount > 0) {
+          int[] hostCount = hostCounts.get(hostordomain);
           if (hostCount == null) {
-            hostCount = new IntWritable();
-            hostCounts.put(host, hostCount);
+            hostCount = new int[] {1, 0};
+            hostCounts.put(hostordomain, hostCount);
           }
-  
+
           // increment hostCount
-          hostCount.set(hostCount.get() + 1);
-  
-          // skip URL if above the limit per host.
-          if (hostCount.get() > maxPerHost) {
-            if (hostCount.get() == maxPerHost + 1) {
-              // remember the raw hostname that is maxed out
-              maxedHosts.add(hostname);
-              if (LOG.isInfoEnabled()) {
-                LOG.info("Host " + host + " has more than " + maxPerHost +
-                         " URLs." + " Skipping additional.");
+          hostCount[1]++;
+
+          // reached the limit of allowed URLs per host / domain
+          // see if we can put it in the next segment?
+          if (hostCount[1] > maxCount) {
+            if (hostCount[0] < maxNumSegments) {
+              hostCount[0]++;
+              hostCount[1] = 0;
+            } else {
+              if (hostCount[1] == maxCount + 1 && LOG.isInfoEnabled()) {
+                LOG.info("Host or domain " + hostordomain + " has more than " 
+ maxCount
+                    + " URLs for all " + maxNumSegments + " segments - 
skipping");
               }
+              // skip this entry
+              continue;
             }
-            continue;
           }
-        }
+          entry.segnum = new IntWritable(hostCount[0]);
+        } else entry.segnum = new IntWritable(currentsegmentnum);
 
         output.collect(key, entry);
 
         // Count is incremented only when we keep the URL
-        // maxPerHost may cause us to skip it.
+        // maxCount may cause us to skip it.
         count++;
       }
     }
   }
 
+  // Allows the reducers to generate one subfile per
+  public static class GeneratorOutputFormat extends
+      MultipleSequenceFileOutputFormat<FloatWritable,SelectorEntry> {
+    // generate a filename based on the segnum stored for this entry
+    protected String generateFileNameForKeyValue(FloatWritable key, 
SelectorEntry value,
+        String name) {
+      return "fetchlist-" + value.segnum.toString() + "/" + name;
+    }
+
+  }
+
   public static class DecreasingFloatComparator extends 
FloatWritable.Comparator {
 
     /** Compares two FloatWritables decreasing. */
-    public int compare(byte[] b1, int s1, int l1,
-        byte[] b2, int s2, int l2) {
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
       return super.compare(b2, s2, l2, b1, s1, l1);
     }
   }
 
-  public static class SelectorInverseMapper extends MapReduceBase implements 
Mapper<FloatWritable, SelectorEntry, Text, SelectorEntry> {
+  public static class SelectorInverseMapper extends MapReduceBase implements
+      Mapper<FloatWritable,SelectorEntry,Text,SelectorEntry> {
 
-    public void map(FloatWritable key, SelectorEntry value, 
OutputCollector<Text, SelectorEntry> output, Reporter reporter) throws 
IOException {
-      SelectorEntry entry = (SelectorEntry)value;
+    public void map(FloatWritable key, SelectorEntry value,
+        OutputCollector<Text,SelectorEntry> output, Reporter reporter) throws 
IOException {
+      SelectorEntry entry = (SelectorEntry) value;
       output.collect(entry.url, entry);
     }
   }
-  
-  public static class PartitionReducer extends MapReduceBase
-      implements Reducer<Text, SelectorEntry, Text, CrawlDatum> {
+
+  public static class PartitionReducer extends MapReduceBase implements
+      Reducer<Text,SelectorEntry,Text,CrawlDatum> {
 
     public void reduce(Text key, Iterator<SelectorEntry> values,
-        OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws 
IOException {
-      // if using HashComparator, we get only one input key in case of hash 
collision
+        OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws 
IOException {
+      // if using HashComparator, we get only one input key in case of
+      // hash collision
       // so use only URLs from values
       while (values.hasNext()) {
         SelectorEntry entry = values.next();
         output.collect(entry.url, entry.datum);
       }
     }
-    
+
   }
 
   /** Sort fetch lists by hash of URL. */
@@ -328,7 +362,8 @@ public class Generator extends Configure
 
     private static int hash(byte[] bytes, int start, int length) {
       int hash = 1;
-      // make later bytes more significant in hash code, so that sorting by
+      // make later bytes more significant in hash code, so that sorting
+      // by
       // hashcode correlates less with by-host ordering.
       for (int i = length - 1; i >= 0; i--)
         hash = (31 * hash) + (int) bytes[start + i];
@@ -339,26 +374,30 @@ public class Generator extends Configure
   /**
    * Update the CrawlDB so that the next generate won't include the same URLs.
    */
-  public static class CrawlDbUpdater extends MapReduceBase implements 
Mapper<Text, CrawlDatum, Text, CrawlDatum>, Reducer<Text, CrawlDatum, Text, 
CrawlDatum> {
+  public static class CrawlDbUpdater extends MapReduceBase implements
+      Mapper<Text,CrawlDatum,Text,CrawlDatum>, 
Reducer<Text,CrawlDatum,Text,CrawlDatum> {
     long generateTime;
-    
+
     public void configure(JobConf job) {
       generateTime = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
     }
-    
-    public void map(Text key, CrawlDatum value, OutputCollector<Text, 
CrawlDatum> output, Reporter reporter) throws IOException {
-        output.collect(key, value);
+
+    public void map(Text key, CrawlDatum value, 
OutputCollector<Text,CrawlDatum> output,
+        Reporter reporter) throws IOException {
+      output.collect(key, value);
     }
-    
+
     private CrawlDatum orig = new CrawlDatum();
     private LongWritable genTime = new LongWritable(0L);
 
-    public void reduce(Text key, Iterator<CrawlDatum> values, 
OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException 
{
+    public void reduce(Text key, Iterator<CrawlDatum> values,
+        OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws 
IOException {
       genTime.set(0L);
       while (values.hasNext()) {
         CrawlDatum val = values.next();
         if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {
-          LongWritable gt = 
(LongWritable)val.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
+          LongWritable gt = (LongWritable) val.getMetaData().get(
+              Nutch.WRITABLE_GENERATE_TIME_KEY);
           genTime.set(gt.get());
           if (genTime.get() != generateTime) {
             orig.set(val);
@@ -373,85 +412,98 @@ public class Generator extends Configure
         orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
       }
       output.collect(key, orig);
-    }    
+    }
   }
-  
+
   public Generator() {}
-  
+
   public Generator(Configuration conf) {
     setConf(conf);
   }
-  
-  /**
-   * Generate fetchlists in a segment. Whether to filter URLs or not is
-   * read from the crawl.generate.filter property in the configuration
-   * files. If the property is not found, the URLs are filtered.
-   *
-   * @param dbDir     Crawl database directory
-   * @param segments  Segments directory
-   * @param numLists  Number of reduce tasks
-   * @param topN      Number of top URLs to be selected
-   * @param curTime   Current time in milliseconds
-   *
-   * @return Path to generated segment or null if no entries were
-   *         selected
-   *
-   * @throws IOException When an I/O error occurs
-   */
-  public Path generate(Path dbDir, Path segments, int numLists,
-                       long topN, long curTime) throws IOException {
+
+  public Path[] generate(Path dbDir, Path segments, int numLists, long topN, 
long curTime)
+      throws IOException {
 
     JobConf job = new NutchJob(getConf());
-    boolean filter = job.getBoolean(CRAWL_GENERATE_FILTER, true);
-    return generate(dbDir, segments, numLists, topN, curTime, filter, false);
+    boolean filter = job.getBoolean(GENERATOR_FILTER, true);
+    boolean normalise = job.getBoolean(GENERATOR_NORMALISE, true);
+    return generate(dbDir, segments, numLists, topN, curTime, filter, 
normalise, false, 1);
   }
 
   /**
-   * Generate fetchlists in a segment.
-   * @return Path to generated segment or null if no entries were selected.
-   * */
-  public Path generate(Path dbDir, Path segments,
-                       int numLists, long topN, long curTime, boolean filter,
-                       boolean force)
-    throws IOException {
-
-    Path tempDir =
-      new Path(getConf().get("mapred.temp.dir", ".") +
-               "/generate-temp-"+ System.currentTimeMillis());
+   * old signature used for compatibility - does not specify whether or not to
+   * normalise and set the number of segments to 1
+   **/
+  public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
+      long curTime, boolean filter, boolean force) throws IOException {
+    return generate(dbDir, segments, numLists, topN, curTime, filter, true, 
force, 1);
+  }
+
+  /**
+   * Generate fetchlists in one or more segments. Whether to filter URLs or not
+   * is read from the crawl.generate.filter property in the configuration 
files.
+   * If the property is not found, the URLs are filtered. Same for the
+   * normalisation.
+   * 
+   * @param dbDir
+   *          Crawl database directory
+   * @param segments
+   *          Segments directory
+   * @param numLists
+   *          Number of reduce tasks
+   * @param topN
+   *          Number of top URLs to be selected
+   * @param curTime
+   *          Current time in milliseconds
+   * 
+   * @return Path to generated segment or null if no entries were selected
+   * 
+   * @throws IOException
+   *           When an I/O error occurs
+   */
+  public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
+      long curTime, boolean filter, boolean norm, boolean force, int 
maxNumSegments)
+      throws IOException {
+
+    Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + 
"/generate-temp-"
+        + System.currentTimeMillis());
 
-    Path segment = new Path(segments, generateSegmentName());
-    Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME);
-    
     Path lock = new Path(dbDir, CrawlDb.LOCK_NAME);
     FileSystem fs = FileSystem.get(getConf());
     LockUtil.createLockFile(fs, lock, force);
 
     LOG.info("Generator: Selecting best-scoring urls due for fetch.");
     LOG.info("Generator: starting");
-    LOG.info("Generator: segment: " + segment);
     LOG.info("Generator: filtering: " + filter);
+    LOG.info("Generator: normalizing: " + norm);
     if (topN != Long.MAX_VALUE) {
       LOG.info("Generator: topN: " + topN);
     }
+    
+    if (getConf().get(GENERATE_MAX_PER_HOST_BY_IP).equals("true")){
+      LOG.info("Generator: GENERATE_MAX_PER_HOST_BY_IP will be ignored, use 
partition.url.mode instead");
+    }
 
     // map to inverted subset due for fetch, sort by score
     JobConf job = new NutchJob(getConf());
-    job.setJobName("generate: select " + segment);
+    job.setJobName("generate: select from " + dbDir);
 
-    if (numLists == -1) {                         // for politeness make
-      numLists = job.getNumMapTasks();            // a partition per fetch task
+    if (numLists == -1) { // for politeness make
+      numLists = job.getNumMapTasks(); // a partition per fetch task
     }
     if ("local".equals(job.get("mapred.job.tracker")) && numLists != 1) {
       // override
       LOG.info("Generator: jobtracker is 'local', generating exactly one 
partition.");
       numLists = 1;
     }
-    job.setLong(CRAWL_GEN_CUR_TIME, curTime);
+    job.setLong(GENERATOR_CUR_TIME, curTime);
     // record real generation time
     long generateTime = System.currentTimeMillis();
     job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
-    job.setLong(CRAWL_TOP_N, topN);
-    job.setBoolean(CRAWL_GENERATE_FILTER, filter);
+    job.setLong(GENERATOR_TOP_N, topN);
+    job.setBoolean(GENERATOR_FILTER, filter);
+    job.setBoolean(GENERATOR_NORMALISE, norm);
+    job.setInt(GENERATOR_MAX_NUM_SEGMENTS, maxNumSegments);
 
     FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
     job.setInputFormat(SequenceFileInputFormat.class);
@@ -465,75 +517,52 @@ public class Generator extends Configure
     job.setOutputKeyClass(FloatWritable.class);
     job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);
     job.setOutputValueClass(SelectorEntry.class);
+    job.setOutputFormat(GeneratorOutputFormat.class);
+
     try {
       JobClient.runJob(job);
     } catch (IOException e) {
-      LockUtil.removeLockFile(fs, lock);
       throw e;
     }
-    
-    // check that we selected at least some entries ...
-    SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(job, 
tempDir);
-    boolean empty = true;
-    if (readers != null && readers.length > 0) {
-      for (int num = 0; num < readers.length; num++) {
-        if (readers[num].next(new FloatWritable())) {
-          empty = false;
-          break;
-        }
+
+    // read the subdirectories generated in the temp
+    // output and turn them into segments
+    List<Path> generatedSegments = new ArrayList<Path>();
+
+    FileStatus[] status = fs.listStatus(tempDir);
+    try {
+      for (FileStatus stat : status) {
+        Path subfetchlist = stat.getPath();
+        if (!subfetchlist.getName().startsWith("fetchlist-")) continue;
+        // start a new partition job for this segment
+        Path newSeg = partitionSegment(fs, segments, subfetchlist, numLists);
+        generatedSegments.add(newSeg);
       }
-    }
-    
-    for (int i = 0; i < readers.length; i++) readers[i].close();
-    
-    if (empty) {
-      LOG.warn("Generator: 0 records selected for fetching, exiting ...");
-      LockUtil.removeLockFile(fs, lock);
+    } catch (Exception e) {
+      LOG.warn("Generator: exception while partitioning segments, exiting 
...");
       fs.delete(tempDir, true);
       return null;
     }
 
-    // invert again, paritition by host, sort by url hash
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Generator: Partitioning selected urls by host, for 
politeness.");
-    }
-    job = new NutchJob(getConf());
-    job.setJobName("generate: partition " + segment);
-    
-    job.setInt("partition.url.by.host.seed", new Random().nextInt());
-
-    FileInputFormat.addInputPath(job, tempDir);
-    job.setInputFormat(SequenceFileInputFormat.class);
-
-    job.setMapperClass(SelectorInverseMapper.class);
-    job.setMapOutputKeyClass(Text.class);
-    job.setMapOutputValueClass(SelectorEntry.class);
-    job.setPartitionerClass(PartitionUrlByHost.class);
-    job.setReducerClass(PartitionReducer.class);
-    job.setNumReduceTasks(numLists);
-
-    FileOutputFormat.setOutputPath(job, output);
-    job.setOutputFormat(SequenceFileOutputFormat.class);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(CrawlDatum.class);
-    job.setOutputKeyComparatorClass(HashComparator.class);
-    try {
-      JobClient.runJob(job);
-    } catch (IOException e) {
+    if (generatedSegments.size() == 0) {
+      LOG.warn("Generator: 0 records selected for fetching, exiting ...");
       LockUtil.removeLockFile(fs, lock);
       fs.delete(tempDir, true);
-      throw e;
+      return null;
     }
+
     if (getConf().getBoolean(GENERATE_UPDATE_CRAWLDB, false)) {
       // update the db from tempDir
-      Path tempDir2 =
-        new Path(getConf().get("mapred.temp.dir", ".") +
-                 "/generate-temp-"+ System.currentTimeMillis());
-  
+      Path tempDir2 = new Path(getConf().get("mapred.temp.dir", ".") + 
"/generate-temp-"
+          + System.currentTimeMillis());
+
       job = new NutchJob(getConf());
       job.setJobName("generate: updatedb " + dbDir);
       job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
-      FileInputFormat.addInputPath(job, output);
+      for (Path segmpaths : generatedSegments) {
+        Path subGenDir = new Path(segmpaths, CrawlDatum.GENERATE_DIR_NAME);
+        FileInputFormat.addInputPath(job, subGenDir);
+      }
       FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
       job.setInputFormat(SequenceFileInputFormat.class);
       job.setMapperClass(CrawlDbUpdater.class);
@@ -553,22 +582,61 @@ public class Generator extends Configure
       }
       fs.delete(tempDir2, true);
     }
+
     LockUtil.removeLockFile(fs, lock);
     fs.delete(tempDir, true);
 
-    if (LOG.isInfoEnabled()) { LOG.info("Generator: done."); }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Generator: done.");
+    }
+
+    Path[] patharray = new Path[generatedSegments.size()];
+    return generatedSegments.toArray(patharray);
+  }
+
+  private Path partitionSegment(FileSystem fs, Path segmentsDir, Path inputDir,
+      int numLists) throws IOException {
+    // invert again, partition by host/domain/IP, sort by url hash
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Generator: Partitioning selected urls for politeness.");
+    }
+    Path segment = new Path(segmentsDir, generateSegmentName());
+    Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME);
+
+    LOG.info("Generator: segment: " + segment);
+
+    NutchJob job = new NutchJob(getConf());
+    job.setJobName("generate: partition " + segment);
+
+    job.setInt("partition.url.seed", new Random().nextInt());
 
+    FileInputFormat.addInputPath(job, inputDir);
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    job.setMapperClass(SelectorInverseMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(SelectorEntry.class);
+    job.setPartitionerClass(URLPartitioner.class);
+    job.setReducerClass(PartitionReducer.class);
+    job.setNumReduceTasks(numLists);
+
+    FileOutputFormat.setOutputPath(job, output);
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(CrawlDatum.class);
+    job.setOutputKeyComparatorClass(HashComparator.class);
+    JobClient.runJob(job);
     return segment;
   }
-  
+
   private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
 
   public static synchronized String generateSegmentName() {
     try {
       Thread.sleep(1000);
-    } catch (Throwable t) {};
-    return sdf.format
-      (new Date(System.currentTimeMillis()));
+    } catch (Throwable t) {}
+    ;
+    return sdf.format(new Date(System.currentTimeMillis()));
   }
 
   /**
@@ -578,10 +646,11 @@ public class Generator extends Configure
     int res = ToolRunner.run(NutchConfiguration.create(), new Generator(), 
args);
     System.exit(res);
   }
-  
+
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
-      System.out.println("Usage: Generator <crawldb> <segments_dir> [-force] 
[-topN N] [-numFetchers numFetchers] [-adddays numDays] [-noFilter]");
+      System.out
+          .println("Usage: Generator <crawldb> <segments_dir> [-force] [-topN 
N] [-numFetchers numFetchers] [-adddays numDays] [-noFilter] 
[-noNorm][-maxNumSegments num]");
       return -1;
     }
 
@@ -591,33 +660,40 @@ public class Generator extends Configure
     long topN = Long.MAX_VALUE;
     int numFetchers = -1;
     boolean filter = true;
+    boolean norm = true;
     boolean force = false;
+    int maxNumSegments = 1;
 
     for (int i = 2; i < args.length; i++) {
       if ("-topN".equals(args[i])) {
-        topN = Long.parseLong(args[i+1]);
+        topN = Long.parseLong(args[i + 1]);
         i++;
       } else if ("-numFetchers".equals(args[i])) {
-        numFetchers = Integer.parseInt(args[i+1]);
+        numFetchers = Integer.parseInt(args[i + 1]);
         i++;
       } else if ("-adddays".equals(args[i])) {
-        long numDays = Integer.parseInt(args[i+1]);
+        long numDays = Integer.parseInt(args[i + 1]);
         curTime += numDays * 1000L * 60 * 60 * 24;
       } else if ("-noFilter".equals(args[i])) {
         filter = false;
+      } else if ("-noNorm".equals(args[i])) {
+        norm = false;
       } else if ("-force".equals(args[i])) {
         force = true;
+      } else if ("-maxNumSegments".equals(args[i])) {
+        maxNumSegments = Integer.parseInt(args[i + 1]);
       }
-      
+
     }
 
     try {
-      Path seg = generate(dbDir, segmentsDir, numFetchers, topN, curTime, 
filter, force);
-      if (seg == null) return -2;
-      else return 0;
+      Path[] segs = generate(dbDir, segmentsDir, numFetchers, topN, curTime, 
filter,
+          norm, force, maxNumSegments);
+      if (segs == null) return -1;
     } catch (Exception e) {
       LOG.fatal("Generator: " + StringUtils.stringifyException(e));
       return -1;
     }
+    return 0;
   }
 }

Added: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/URLPartitioner.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/URLPartitioner.java?rev=926155&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/URLPartitioner.java 
(added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/URLPartitioner.java Mon 
Mar 22 16:19:12 2010
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.MalformedURLException;
+import java.net.UnknownHostException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * Partition urls by host, domain name or IP depending on the value of the
+ * parameter 'partition.url.mode' which can be 'byHost', 'byDomain' or 'byIP'
+ */
+public class URLPartitioner implements Partitioner<Text,Writable> {
+  private static final Log LOG = LogFactory.getLog(URLPartitioner.class);
+
+  public static final String PARTITION_MODE_KEY = "partition.url.mode";
+
+  public static final String PARTITION_MODE_HOST = "byHost";
+  public static final String PARTITION_MODE_DOMAIN = "byDomain";
+  public static final String PARTITION_MODE_IP = "byIP";
+
+  private int seed;
+  private URLNormalizers normalizers;
+  private String mode = PARTITION_MODE_HOST;
+
+  public void configure(JobConf job) {
+    seed = job.getInt("partition.url.seed", 0);
+    mode = job.get(PARTITION_MODE_KEY, PARTITION_MODE_HOST);
+    // check that the mode is known
+    if (!mode.equals(PARTITION_MODE_IP) && !mode.equals(PARTITION_MODE_DOMAIN)
+        && !mode.equals(PARTITION_MODE_HOST)) {
+      LOG.error("Unknown partition mode : " + mode + " - forcing to byHost");
+      mode = PARTITION_MODE_HOST;
+    }
+    normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_PARTITION);
+  }
+
+  public void close() {}
+
+  /** Hash by domain name. */
+  public int getPartition(Text key, Writable value, int numReduceTasks) {
+    String urlString = key.toString();
+    URL url = null;
+    int hashCode = urlString.hashCode();
+    try {
+      urlString = normalizers.normalize(urlString, 
URLNormalizers.SCOPE_PARTITION);
+      url = new URL(urlString);
+      hashCode = url.getHost().hashCode();
+    } catch (MalformedURLException e) {
+      LOG.warn("Malformed URL: '" + urlString + "'");
+    }
+
+    if (mode.equals(PARTITION_MODE_DOMAIN) && url != null) hashCode = URLUtil
+        .getDomainName(url).hashCode();
+    else if (mode.equals(PARTITION_MODE_IP)) {
+      try {
+        InetAddress address = InetAddress.getByName(url.getHost());
+        hashCode = address.getHostAddress().hashCode();
+      } catch (UnknownHostException e) {
+        Generator.LOG.info("Couldn't find IP for host: " + url.getHost());
+      }
+    }
+
+    // make hosts wind up in different partitions on different runs
+    hashCode ^= seed;
+
+    return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
+  }
+
+}

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/net/URLNormalizers.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/net/URLNormalizers.java?rev=926155&r1=926154&r2=926155&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/net/URLNormalizers.java 
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/net/URLNormalizers.java Mon 
Mar 22 16:19:12 2010
@@ -79,7 +79,7 @@ public final class URLNormalizers {
    * this scope will be used.
    */
   public static final String SCOPE_DEFAULT = "default";
-  /** Scope used by {...@link org.apache.nutch.crawl.PartitionUrlByHost}. */
+  /** Scope used by {...@link org.apache.nutch.crawl.URLPartitioner}. */
   public static final String SCOPE_PARTITION = "partition";
   /** Scope used by {...@link org.apache.nutch.crawl.Generator}. */
   public static final String SCOPE_GENERATE_HOST_COUNT = "generate_host_count";

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/tools/FreeGenerator.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/tools/FreeGenerator.java?rev=926155&r1=926154&r2=926155&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/tools/FreeGenerator.java 
(original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/tools/FreeGenerator.java Mon 
Mar 22 16:19:12 2010
@@ -44,7 +44,7 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.Generator;
-import org.apache.nutch.crawl.PartitionUrlByHost;
+import org.apache.nutch.crawl.URLPartitioner;
 import org.apache.nutch.net.URLFilters;
 import org.apache.nutch.net.URLNormalizers;
 import org.apache.nutch.scoring.ScoringFilters;
@@ -165,7 +165,7 @@ public class FreeGenerator extends Confi
     job.setMapperClass(FG.class);
     job.setMapOutputKeyClass(Text.class);
     job.setMapOutputValueClass(Generator.SelectorEntry.class);
-    job.setPartitionerClass(PartitionUrlByHost.class);
+    job.setPartitionerClass(URLPartitioner.class);
     job.setReducerClass(FG.class);
     String segName = Generator.generateSegmentName();
     job.setNumReduceTasks(job.getNumMapTasks());

Modified: lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java?rev=926155&r1=926154&r2=926155&view=diff
==============================================================================
--- lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java 
(original)
+++ lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java Mon 
Mar 22 16:19:12 2010
@@ -144,7 +144,7 @@ public class TestGenerator extends TestC
     createCrawlDB(list);
 
     Configuration myConfiguration = new Configuration(conf);
-    myConfiguration.setInt(Generator.GENERATE_MAX_PER_HOST, 1);
+    myConfiguration.setInt(Generator.GENERATOR_MAX_COUNT, 1);
     Path generatedSegment = generateFetchlist(Integer.MAX_VALUE,
         myConfiguration, false);
 
@@ -157,7 +157,7 @@ public class TestGenerator extends TestC
     assertEquals(1, fetchList.size());
 
     myConfiguration = new Configuration(conf);
-    myConfiguration.setInt(Generator.GENERATE_MAX_PER_HOST, 2);
+    myConfiguration.setInt(Generator.GENERATOR_MAX_COUNT, 2);
     generatedSegment = generateFetchlist(Integer.MAX_VALUE, myConfiguration,
         false);
 
@@ -170,7 +170,7 @@ public class TestGenerator extends TestC
     assertEquals(2, fetchList.size());
 
     myConfiguration = new Configuration(conf);
-    myConfiguration.setInt(Generator.GENERATE_MAX_PER_HOST, 3);
+    myConfiguration.setInt(Generator.GENERATOR_MAX_COUNT, 3);
     generatedSegment = generateFetchlist(Integer.MAX_VALUE, myConfiguration,
         false);
 
@@ -184,22 +184,22 @@ public class TestGenerator extends TestC
   }
 
   /**
-   * Test that generator obeys the property "generate.max.per.host" and
-   * "generate.max.per.host.by.ip".
+   * Test that generator obeys the property "generator.max.count" and
+   * "generator.count.per.domain".
    * @throws Exception 
    */
-  public void testGenerateHostIPLimit() throws Exception{
+  public void testGenerateDomainLimit() throws Exception{
     ArrayList<URLCrawlDatum> list = new ArrayList<URLCrawlDatum>();
 
-    list.add(createURLCrawlDatum("http://www.example.com/index.html";, 1, 1));
-    list.add(createURLCrawlDatum("http://www.example.net/index.html";, 1, 1));
-    list.add(createURLCrawlDatum("http://www.example.org/index.html";, 1, 1));
+    list.add(createURLCrawlDatum("http://a.example.com/index.html";, 1, 1));
+    list.add(createURLCrawlDatum("http://b.example.com/index.html";, 1, 1));
+    list.add(createURLCrawlDatum("http://c.example.com/index.html";, 1, 1));
 
     createCrawlDB(list);
 
     Configuration myConfiguration = new Configuration(conf);
-    myConfiguration.setInt(Generator.GENERATE_MAX_PER_HOST, 1);
-    myConfiguration.setBoolean(Generator.GENERATE_MAX_PER_HOST_BY_IP, true);
+    myConfiguration.setInt(Generator.GENERATOR_MAX_COUNT, 1);
+    myConfiguration.set(Generator.GENERATOR_COUNT_MODE, 
Generator.GENERATOR_COUNT_VALUE_DOMAIN);
 
     Path generatedSegment = generateFetchlist(Integer.MAX_VALUE,
         myConfiguration, false);
@@ -213,7 +213,7 @@ public class TestGenerator extends TestC
     assertEquals(1, fetchList.size());
 
     myConfiguration = new Configuration(myConfiguration);
-    myConfiguration.setInt(Generator.GENERATE_MAX_PER_HOST, 2);
+    myConfiguration.setInt(Generator.GENERATOR_MAX_COUNT, 2);
     generatedSegment = generateFetchlist(Integer.MAX_VALUE, myConfiguration, 
false);
 
     fetchlistPath = new Path(new Path(generatedSegment,
@@ -225,7 +225,7 @@ public class TestGenerator extends TestC
     assertEquals(2, fetchList.size());
 
     myConfiguration = new Configuration(myConfiguration);
-    myConfiguration.setInt(Generator.GENERATE_MAX_PER_HOST, 3);
+    myConfiguration.setInt(Generator.GENERATOR_MAX_COUNT, 3);
     generatedSegment = generateFetchlist(Integer.MAX_VALUE, myConfiguration,
         false);
 
@@ -310,9 +310,10 @@ public class TestGenerator extends TestC
       boolean filter) throws IOException {
     // generate segment
     Generator g = new Generator(config);
-    Path generatedSegment = g.generate(dbDir, segmentsDir, -1, numResults,
+    Path[] generatedSegment = g.generate(dbDir, segmentsDir, -1, numResults,
         Long.MAX_VALUE, filter, false);
-    return generatedSegment;
+    if (generatedSegment==null) return null;
+    return generatedSegment[0];
   }
 
   /**

Modified: lucene/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java?rev=926155&r1=926154&r2=926155&view=diff
==============================================================================
--- lucene/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java 
(original)
+++ lucene/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java Mon 
Mar 22 16:19:12 2010
@@ -92,13 +92,13 @@ public class TestFetcher extends TestCas
 
     //generate
     Generator g=new Generator(conf);
-    Path generatedSegment = g.generate(crawldbPath, segmentsPath, 1,
+    Path[] generatedSegment = g.generate(crawldbPath, segmentsPath, 1,
         Long.MAX_VALUE, Long.MAX_VALUE, false, false);
 
     long time=System.currentTimeMillis();
     //fetch
     Fetcher fetcher=new Fetcher(conf);
-    fetcher.fetch(generatedSegment, 1, true);
+    fetcher.fetch(generatedSegment[0], 1, true);
 
     time=System.currentTimeMillis()-time;
     
@@ -107,7 +107,7 @@ public class TestFetcher extends TestCas
     assertTrue(time > minimumTime);
     
     //verify content
-    Path content=new Path(new Path(generatedSegment, 
Content.DIR_NAME),"part-00000/data");
+    Path content=new Path(new Path(generatedSegment[0], 
Content.DIR_NAME),"part-00000/data");
     SequenceFile.Reader reader=new SequenceFile.Reader(fs, content, conf);
     
     ArrayList<String> handledurls=new ArrayList<String>();
@@ -138,7 +138,7 @@ public class TestFetcher extends TestCas
     handledurls.clear();
 
     //verify parse data
-    Path parseData = new Path(new Path(generatedSegment, 
ParseData.DIR_NAME),"part-00000/data");
+    Path parseData = new Path(new Path(generatedSegment[0], 
ParseData.DIR_NAME),"part-00000/data");
     reader = new SequenceFile.Reader(fs, parseData, conf);
     
     READ_PARSE_DATA:


Reply via email to