Author: jnioche Date: Mon Mar 22 16:19:12 2010 New Revision: 926155 URL: http://svn.apache.org/viewvc?rev=926155&view=rev Log: NUTCH-762 : Generator can generate several segments in one parse of the crawlDB
Added: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/URLPartitioner.java Removed: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/PartitionUrlByHost.java Modified: lucene/nutch/trunk/CHANGES.txt lucene/nutch/trunk/conf/nutch-default.xml lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Crawl.java lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java lucene/nutch/trunk/src/java/org/apache/nutch/net/URLNormalizers.java lucene/nutch/trunk/src/java/org/apache/nutch/tools/FreeGenerator.java lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java lucene/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java Modified: lucene/nutch/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?rev=926155&r1=926154&r2=926155&view=diff ============================================================================== --- lucene/nutch/trunk/CHANGES.txt (original) +++ lucene/nutch/trunk/CHANGES.txt Mon Mar 22 16:19:12 2010 @@ -2,6 +2,8 @@ Nutch Change Log Unreleased Changes +* NUTCH-762 Generator can generate several segments in one parse of the crawlDB (jnioche) + * NUTCH-740 Configuration option to override default language for fetched pages (Marcin Okraszewski via jnioche) * NUTCH-803 Upgrade to Hadoop 0.20.2 (ab) Modified: lucene/nutch/trunk/conf/nutch-default.xml URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/conf/nutch-default.xml?rev=926155&r1=926154&r2=926155&view=diff ============================================================================== --- lucene/nutch/trunk/conf/nutch-default.xml (original) +++ lucene/nutch/trunk/conf/nutch-default.xml Mon Mar 22 16:19:12 2010 @@ -514,24 +514,21 @@ <!-- generate properties --> <property> - <name>generate.max.per.host</name> + <name>generate.max.count</name> <value>-1</value> - <description>The maximum number of urls per host in a single - fetchlist. -1 if unlimited.</description> + <description>The maximum number of urls in a single + fetchlist. -1 if unlimited. The urls are counted according + to the value of the parameter generator.count.mode. + </description> </property> <property> - <name>generate.max.per.host.by.ip</name> - <value>false</value> - <description>If false, same host names are counted. If true, - hosts' IP addresses are resolved and the same IP-s are counted. - - -+-+-+- WARNING !!! -+-+-+- - When set to true, Generator will create a lot of DNS lookup - requests, rapidly. This may cause a DOS attack on - remote DNS servers, not to mention increased external traffic - and latency. For these reasons when using this option it is - required that a local caching DNS be used.</description> + <name>generate.count.mode</name> + <value>host</value> + <description>Determines how the URLs are counted for generator.max.count. + Default value is 'host' but can be 'domain'. Note that we do not count + per IP in the new version of the Generator. + </description> </property> <property> @@ -545,6 +542,34 @@ updatedb will generate identical fetchlists.</description> </property> +<property> + <name>generate.max.per.host</name> + <value>-1</value> + <description>(Deprecated). Use generate.max.count and generate.count.mode instead. + The maximum number of urls per host in a single + fetchlist. -1 if unlimited.</description> +</property> + +<!-- urlpartitioner properties --> +<property> + <name>partition.url.mode</name> + <value>byHost</value> + <description>Determines how to partition URLs. Default value is 'byHost', + also takes 'byDomain' or 'byIP'. + </description> +</property> + +<property> + <name>crawl.gen.delay</name> + <value>604800000</value> + <description> + This value, expressed in days, defines how long we should keep the lock on records + in CrawlDb that were just selected for fetching. If these records are not updated + in the meantime, the lock is canceled, i.e. the become eligible for selecting. + Default value of this is 7 days. + </description> +</property> + <!-- fetcher properties --> <property> Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Crawl.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Crawl.java?rev=926155&r1=926154&r2=926155&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Crawl.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Crawl.java Mon Mar 22 16:19:12 2010 @@ -124,17 +124,17 @@ public class Crawl { injector.inject(crawlDb, rootUrlDir); int i; for (i = 0; i < depth; i++) { // generate new segment - Path segment = generator.generate(crawlDb, segments, -1, topN, System + Path[] segs = generator.generate(crawlDb, segments, -1, topN, System .currentTimeMillis()); - if (segment == null) { + if (segments == null) { LOG.info("Stopping at depth=" + i + " - no more URLs to fetch."); break; } - fetcher.fetch(segment, threads, org.apache.nutch.fetcher.Fetcher.isParsing(conf)); // fetch it + fetcher.fetch(segs[0], threads, org.apache.nutch.fetcher.Fetcher.isParsing(conf)); // fetch it if (!Fetcher.isParsing(job)) { - parseSegment.parse(segment); // parse it, if needed + parseSegment.parse(segs[0]); // parse it, if needed } - crawlDbTool.update(crawlDb, new Path[]{segment}, true, true); // update crawldb + crawlDbTool.update(crawlDb, segs, true, true); // update crawldb } if (i > 0) { linkDbTool.invert(linkDb, segments, true, true, false); // invert links Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java?rev=926155&r1=926154&r2=926155&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java Mon Mar 22 16:19:12 2010 @@ -29,7 +29,9 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.io.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat; import org.apache.hadoop.util.*; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -42,98 +44,133 @@ import org.apache.nutch.scoring.ScoringF import org.apache.nutch.util.LockUtil; import org.apache.nutch.util.NutchConfiguration; import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.URLUtil; -/** Generates a subset of a crawl db to fetch. */ +/** + * Generates a subset of a crawl db to fetch. This version allows to generate + * fetchlists for several segments in one go. Unlike in the initial version + * (OldGenerator), the IP resolution is done ONLY on the entries which have been + * selected for fetching. The URLs are partitioned by IP, domain or host within a + * segment. We can chose separately how to count the URLS i.e. by domain or host + * to limit the entries. + **/ public class Generator extends Configured implements Tool { - public static final String CRAWL_GENERATE_FILTER = "crawl.generate.filter"; - public static final String GENERATE_MAX_PER_HOST_BY_IP = "generate.max.per.host.by.ip"; - public static final String GENERATE_MAX_PER_HOST = "generate.max.per.host"; - public static final String GENERATE_UPDATE_CRAWLDB = "generate.update.crawldb"; - public static final String CRAWL_TOP_N = "crawl.topN"; - public static final String CRAWL_GEN_CUR_TIME = "crawl.gen.curTime"; - public static final String CRAWL_GEN_DELAY = "crawl.gen.delay"; public static final Log LOG = LogFactory.getLog(Generator.class); + + public static final String GENERATE_UPDATE_CRAWLDB = "generate.update.crawldb"; + public static final String GENERATOR_MIN_SCORE = "generate.min.score"; + public static final String GENERATOR_FILTER = "generate.filter"; + public static final String GENERATOR_NORMALISE = "generate.normalise"; + public static final String GENERATOR_MAX_COUNT = "generate.max.count"; + public static final String GENERATOR_COUNT_MODE = "generate.count.mode"; + public static final String GENERATOR_COUNT_VALUE_DOMAIN = "domain"; + public static final String GENERATOR_COUNT_VALUE_HOST = "host"; + public static final String GENERATOR_TOP_N = "generate.topN"; + public static final String GENERATOR_CUR_TIME = "generate.curTime"; + public static final String GENERATOR_DELAY = "crawl.gen.delay"; + public static final String GENERATOR_MAX_NUM_SEGMENTS = "generate.max.num.segments"; + // deprecated parameters + public static final String GENERATE_MAX_PER_HOST_BY_IP = "generate.max.per.host.by.ip"; + public static final String GENERATE_MAX_PER_HOST = "generate.max.per.host"; + public static class SelectorEntry implements Writable { public Text url; public CrawlDatum datum; - + public IntWritable segnum; + public SelectorEntry() { url = new Text(); datum = new CrawlDatum(); + segnum = new IntWritable(0); } public void readFields(DataInput in) throws IOException { url.readFields(in); datum.readFields(in); + segnum.readFields(in); } public void write(DataOutput out) throws IOException { url.write(out); datum.write(out); + segnum.write(out); } - + public String toString() { - return "url=" + url.toString() + ", datum=" + datum.toString(); + return "url=" + url.toString() + ", datum=" + datum.toString() + ", segnum=" + + segnum.toString(); } } /** Selects entries due for fetch. */ - public static class Selector implements Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry>, Partitioner<FloatWritable, Writable>, Reducer<FloatWritable, SelectorEntry, FloatWritable, SelectorEntry> { + public static class Selector implements + Mapper<Text,CrawlDatum,FloatWritable,SelectorEntry>, + Partitioner<FloatWritable,Writable>, + Reducer<FloatWritable,SelectorEntry,FloatWritable,SelectorEntry> { private LongWritable genTime = new LongWritable(System.currentTimeMillis()); private long curTime; private long limit; private long count; - private HashMap<String, IntWritable> hostCounts = - new HashMap<String, IntWritable>(); - private int maxPerHost; - private HashSet<String> maxedHosts = new HashSet<String>(); - private HashSet<String> dnsFailureHosts = new HashSet<String>(); - private Partitioner<Text, Writable> hostPartitioner = new PartitionUrlByHost(); + private HashMap<String,int[]> hostCounts = new HashMap<String,int[]>(); + private int maxCount; + private boolean byDomain = false; + private Partitioner<Text,Writable> partitioner = new URLPartitioner(); private URLFilters filters; private URLNormalizers normalizers; private ScoringFilters scfilters; private SelectorEntry entry = new SelectorEntry(); private FloatWritable sortValue = new FloatWritable(); - private boolean byIP; - private long dnsFailure = 0L; private boolean filter; + private boolean normalise; private long genDelay; private FetchSchedule schedule; + private float scoreThreshold = 0f; + private int maxNumSegments = 1; + int currentsegmentnum = 1; public void configure(JobConf job) { - curTime = job.getLong(CRAWL_GEN_CUR_TIME, System.currentTimeMillis()); - limit = job.getLong(CRAWL_TOP_N,Long.MAX_VALUE)/job.getNumReduceTasks(); - maxPerHost = job.getInt(GENERATE_MAX_PER_HOST, -1); - byIP = job.getBoolean(GENERATE_MAX_PER_HOST_BY_IP, false); + curTime = job.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis()); + limit = job.getLong(GENERATOR_TOP_N, Long.MAX_VALUE) / job.getNumReduceTasks(); + maxCount = job.getInt(GENERATOR_MAX_COUNT, -1); + // back compatibility with old param + int oldMaxPerHost = job.getInt(GENERATE_MAX_PER_HOST, -1); + if (maxCount==-1 && oldMaxPerHost!=-1){ + maxCount = oldMaxPerHost; + byDomain = false; + } + if (GENERATOR_COUNT_VALUE_DOMAIN.equals(job.get(GENERATOR_COUNT_MODE))) byDomain = true; filters = new URLFilters(job); - normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_GENERATE_HOST_COUNT); + normalise = job.getBoolean(GENERATOR_NORMALISE, true); + if (normalise) normalizers = new URLNormalizers(job, + URLNormalizers.SCOPE_GENERATE_HOST_COUNT); scfilters = new ScoringFilters(job); - hostPartitioner.configure(job); - filter = job.getBoolean(CRAWL_GENERATE_FILTER, true); - genDelay = job.getLong(CRAWL_GEN_DELAY, 7L) * 3600L * 24L * 1000L; + partitioner.configure(job); + filter = job.getBoolean(GENERATOR_FILTER, true); + genDelay = job.getLong(GENERATOR_DELAY, 7L) * 3600L * 24L * 1000L; long time = job.getLong(Nutch.GENERATE_TIME_KEY, 0L); if (time > 0) genTime.set(time); schedule = FetchScheduleFactory.getFetchSchedule(job); + scoreThreshold = job.getFloat(GENERATOR_MIN_SCORE, Float.NaN); + maxNumSegments = job.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1); } public void close() {} /** Select & invert subset due for fetch. */ public void map(Text key, CrawlDatum value, - OutputCollector<FloatWritable, SelectorEntry> output, Reporter reporter) - throws IOException { + OutputCollector<FloatWritable,SelectorEntry> output, Reporter reporter) + throws IOException { Text url = key; if (filter) { - // If filtering is on don't generate URLs that don't pass URLFilters + // If filtering is on don't generate URLs that don't pass + // URLFilters try { - if (filters.filter(url.toString()) == null) - return; + if (filters.filter(url.toString()) == null) return; } catch (URLFilterException e) { if (LOG.isWarnEnabled()) { - LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() - + ")"); + LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + ")"); } } } @@ -141,169 +178,166 @@ public class Generator extends Configure // check fetch schedule if (!schedule.shouldFetch(url, crawlDatum, curTime)) { - LOG.debug("-shouldFetch rejected '" + url+ "', fetchTime=" + crawlDatum.getFetchTime() + ", curTime=" + curTime); + LOG.debug("-shouldFetch rejected '" + url + "', fetchTime=" + + crawlDatum.getFetchTime() + ", curTime=" + curTime); return; } - LongWritable oldGenTime = (LongWritable)crawlDatum.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY); + LongWritable oldGenTime = (LongWritable) crawlDatum.getMetaData().get( + Nutch.WRITABLE_GENERATE_TIME_KEY); if (oldGenTime != null) { // awaiting fetch & update - if (oldGenTime.get() + genDelay > curTime) // still wait for update - return; + if (oldGenTime.get() + genDelay > curTime) // still wait for + // update + return; } float sort = 1.0f; try { - sort = scfilters.generatorSortValue((Text)key, crawlDatum, sort); + sort = scfilters.generatorSortValue((Text) key, crawlDatum, sort); } catch (ScoringFilterException sfe) { if (LOG.isWarnEnabled()) { LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + sfe); } } + + // consider only entries with a score superior to the threshold + if (scoreThreshold != Float.NaN && sort < scoreThreshold) return; + // sort by decreasing score, using DecreasingFloatComparator sortValue.set(sort); // record generation time crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime); entry.datum = crawlDatum; - entry.url = (Text)key; - output.collect(sortValue, entry); // invert for sort by score + entry.url = (Text) key; + output.collect(sortValue, entry); // invert for sort by score } - /** Partition by host. */ - public int getPartition(FloatWritable key, Writable value, - int numReduceTasks) { - return hostPartitioner.getPartition(((SelectorEntry)value).url, key, - numReduceTasks); + /** Partition by host / domain or IP. */ + public int getPartition(FloatWritable key, Writable value, int numReduceTasks) { + return partitioner.getPartition(((SelectorEntry) value).url, key, numReduceTasks); } /** Collect until limit is reached. */ public void reduce(FloatWritable key, Iterator<SelectorEntry> values, - OutputCollector<FloatWritable, SelectorEntry> output, - Reporter reporter) - throws IOException { + OutputCollector<FloatWritable,SelectorEntry> output, Reporter reporter) + throws IOException { + + while (values.hasNext()) { - while (values.hasNext() && count < limit) { + if (count == limit) { + // do we have any segments left? + if (currentsegmentnum < maxNumSegments) { + count = 0; + currentsegmentnum++; + } else break; + } SelectorEntry entry = values.next(); - Text url = entry.url; - String urlString = url.toString(); + Text url = entry.url; + String urlString = url.toString(); URL u = null; - - // skip bad urls, including empty and null urls + + String hostordomain = null; + try { - u = new URL(url.toString()); - } catch (MalformedURLException e) { - LOG.info("Bad protocol in url: " + url.toString()); - continue; - } - - String host = u.getHost(); - host = host.toLowerCase(); - String hostname = host; - - // partitioning by ip will generate lots of DNS requests here, and will - // be up to double the overall dns load, do not run this way unless you - // are running a local caching DNS server or a two layer DNS cache - if (byIP) { - if (maxedHosts.contains(host)) { - if (LOG.isDebugEnabled()) { LOG.debug("Host already maxed out: " + host); } - continue; + if (normalise && normalizers != null) { + urlString = normalizers.normalize(urlString, + URLNormalizers.SCOPE_GENERATE_HOST_COUNT); } - if (dnsFailureHosts.contains(host)) { - if (LOG.isDebugEnabled()) { LOG.debug("Host name lookup already failed: " + host); } - continue; + u = new URL(urlString); + if (byDomain) { + hostordomain = URLUtil.getDomainName(u); + } else { + hostordomain = new URL(urlString).getHost(); } - try { - InetAddress ia = InetAddress.getByName(host); - host = ia.getHostAddress(); - urlString = new URL(u.getProtocol(), host, u.getPort(), u.getFile()).toString(); - } - catch (UnknownHostException uhe) { - // remember hostnames that could not be looked up - dnsFailureHosts.add(hostname); - if (LOG.isDebugEnabled()) { - LOG.debug("DNS lookup failed: " + host + ", skipping."); - } - dnsFailure++; - if ((dnsFailure % 1000 == 0) && (LOG.isWarnEnabled())) { - LOG.warn("DNS failures: " + dnsFailure); - } - continue; - } - } - - try { - urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_GENERATE_HOST_COUNT); - host = new URL(urlString).getHost(); } catch (Exception e) { - LOG.warn("Malformed URL: '" + urlString + "', skipping (" + - StringUtils.stringifyException(e) + ")"); + LOG.warn("Malformed URL: '" + urlString + "', skipping (" + + StringUtils.stringifyException(e) + ")"); continue; } - - // only filter if we are counting hosts - if (maxPerHost > 0) { - - IntWritable hostCount = hostCounts.get(host); + + hostordomain = hostordomain.toLowerCase(); + + // only filter if we are counting hosts or domains + if (maxCount > 0) { + int[] hostCount = hostCounts.get(hostordomain); if (hostCount == null) { - hostCount = new IntWritable(); - hostCounts.put(host, hostCount); + hostCount = new int[] {1, 0}; + hostCounts.put(hostordomain, hostCount); } - + // increment hostCount - hostCount.set(hostCount.get() + 1); - - // skip URL if above the limit per host. - if (hostCount.get() > maxPerHost) { - if (hostCount.get() == maxPerHost + 1) { - // remember the raw hostname that is maxed out - maxedHosts.add(hostname); - if (LOG.isInfoEnabled()) { - LOG.info("Host " + host + " has more than " + maxPerHost + - " URLs." + " Skipping additional."); + hostCount[1]++; + + // reached the limit of allowed URLs per host / domain + // see if we can put it in the next segment? + if (hostCount[1] > maxCount) { + if (hostCount[0] < maxNumSegments) { + hostCount[0]++; + hostCount[1] = 0; + } else { + if (hostCount[1] == maxCount + 1 && LOG.isInfoEnabled()) { + LOG.info("Host or domain " + hostordomain + " has more than " + maxCount + + " URLs for all " + maxNumSegments + " segments - skipping"); } + // skip this entry + continue; } - continue; } - } + entry.segnum = new IntWritable(hostCount[0]); + } else entry.segnum = new IntWritable(currentsegmentnum); output.collect(key, entry); // Count is incremented only when we keep the URL - // maxPerHost may cause us to skip it. + // maxCount may cause us to skip it. count++; } } } + // Allows the reducers to generate one subfile per + public static class GeneratorOutputFormat extends + MultipleSequenceFileOutputFormat<FloatWritable,SelectorEntry> { + // generate a filename based on the segnum stored for this entry + protected String generateFileNameForKeyValue(FloatWritable key, SelectorEntry value, + String name) { + return "fetchlist-" + value.segnum.toString() + "/" + name; + } + + } + public static class DecreasingFloatComparator extends FloatWritable.Comparator { /** Compares two FloatWritables decreasing. */ - public int compare(byte[] b1, int s1, int l1, - byte[] b2, int s2, int l2) { + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return super.compare(b2, s2, l2, b1, s1, l1); } } - public static class SelectorInverseMapper extends MapReduceBase implements Mapper<FloatWritable, SelectorEntry, Text, SelectorEntry> { + public static class SelectorInverseMapper extends MapReduceBase implements + Mapper<FloatWritable,SelectorEntry,Text,SelectorEntry> { - public void map(FloatWritable key, SelectorEntry value, OutputCollector<Text, SelectorEntry> output, Reporter reporter) throws IOException { - SelectorEntry entry = (SelectorEntry)value; + public void map(FloatWritable key, SelectorEntry value, + OutputCollector<Text,SelectorEntry> output, Reporter reporter) throws IOException { + SelectorEntry entry = (SelectorEntry) value; output.collect(entry.url, entry); } } - - public static class PartitionReducer extends MapReduceBase - implements Reducer<Text, SelectorEntry, Text, CrawlDatum> { + + public static class PartitionReducer extends MapReduceBase implements + Reducer<Text,SelectorEntry,Text,CrawlDatum> { public void reduce(Text key, Iterator<SelectorEntry> values, - OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException { - // if using HashComparator, we get only one input key in case of hash collision + OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws IOException { + // if using HashComparator, we get only one input key in case of + // hash collision // so use only URLs from values while (values.hasNext()) { SelectorEntry entry = values.next(); output.collect(entry.url, entry.datum); } } - + } /** Sort fetch lists by hash of URL. */ @@ -328,7 +362,8 @@ public class Generator extends Configure private static int hash(byte[] bytes, int start, int length) { int hash = 1; - // make later bytes more significant in hash code, so that sorting by + // make later bytes more significant in hash code, so that sorting + // by // hashcode correlates less with by-host ordering. for (int i = length - 1; i >= 0; i--) hash = (31 * hash) + (int) bytes[start + i]; @@ -339,26 +374,30 @@ public class Generator extends Configure /** * Update the CrawlDB so that the next generate won't include the same URLs. */ - public static class CrawlDbUpdater extends MapReduceBase implements Mapper<Text, CrawlDatum, Text, CrawlDatum>, Reducer<Text, CrawlDatum, Text, CrawlDatum> { + public static class CrawlDbUpdater extends MapReduceBase implements + Mapper<Text,CrawlDatum,Text,CrawlDatum>, Reducer<Text,CrawlDatum,Text,CrawlDatum> { long generateTime; - + public void configure(JobConf job) { generateTime = job.getLong(Nutch.GENERATE_TIME_KEY, 0L); } - - public void map(Text key, CrawlDatum value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException { - output.collect(key, value); + + public void map(Text key, CrawlDatum value, OutputCollector<Text,CrawlDatum> output, + Reporter reporter) throws IOException { + output.collect(key, value); } - + private CrawlDatum orig = new CrawlDatum(); private LongWritable genTime = new LongWritable(0L); - public void reduce(Text key, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException { + public void reduce(Text key, Iterator<CrawlDatum> values, + OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws IOException { genTime.set(0L); while (values.hasNext()) { CrawlDatum val = values.next(); if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) { - LongWritable gt = (LongWritable)val.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY); + LongWritable gt = (LongWritable) val.getMetaData().get( + Nutch.WRITABLE_GENERATE_TIME_KEY); genTime.set(gt.get()); if (genTime.get() != generateTime) { orig.set(val); @@ -373,85 +412,98 @@ public class Generator extends Configure orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime); } output.collect(key, orig); - } + } } - + public Generator() {} - + public Generator(Configuration conf) { setConf(conf); } - - /** - * Generate fetchlists in a segment. Whether to filter URLs or not is - * read from the crawl.generate.filter property in the configuration - * files. If the property is not found, the URLs are filtered. - * - * @param dbDir Crawl database directory - * @param segments Segments directory - * @param numLists Number of reduce tasks - * @param topN Number of top URLs to be selected - * @param curTime Current time in milliseconds - * - * @return Path to generated segment or null if no entries were - * selected - * - * @throws IOException When an I/O error occurs - */ - public Path generate(Path dbDir, Path segments, int numLists, - long topN, long curTime) throws IOException { + + public Path[] generate(Path dbDir, Path segments, int numLists, long topN, long curTime) + throws IOException { JobConf job = new NutchJob(getConf()); - boolean filter = job.getBoolean(CRAWL_GENERATE_FILTER, true); - return generate(dbDir, segments, numLists, topN, curTime, filter, false); + boolean filter = job.getBoolean(GENERATOR_FILTER, true); + boolean normalise = job.getBoolean(GENERATOR_NORMALISE, true); + return generate(dbDir, segments, numLists, topN, curTime, filter, normalise, false, 1); } /** - * Generate fetchlists in a segment. - * @return Path to generated segment or null if no entries were selected. - * */ - public Path generate(Path dbDir, Path segments, - int numLists, long topN, long curTime, boolean filter, - boolean force) - throws IOException { - - Path tempDir = - new Path(getConf().get("mapred.temp.dir", ".") + - "/generate-temp-"+ System.currentTimeMillis()); + * old signature used for compatibility - does not specify whether or not to + * normalise and set the number of segments to 1 + **/ + public Path[] generate(Path dbDir, Path segments, int numLists, long topN, + long curTime, boolean filter, boolean force) throws IOException { + return generate(dbDir, segments, numLists, topN, curTime, filter, true, force, 1); + } + + /** + * Generate fetchlists in one or more segments. Whether to filter URLs or not + * is read from the crawl.generate.filter property in the configuration files. + * If the property is not found, the URLs are filtered. Same for the + * normalisation. + * + * @param dbDir + * Crawl database directory + * @param segments + * Segments directory + * @param numLists + * Number of reduce tasks + * @param topN + * Number of top URLs to be selected + * @param curTime + * Current time in milliseconds + * + * @return Path to generated segment or null if no entries were selected + * + * @throws IOException + * When an I/O error occurs + */ + public Path[] generate(Path dbDir, Path segments, int numLists, long topN, + long curTime, boolean filter, boolean norm, boolean force, int maxNumSegments) + throws IOException { + + Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + "/generate-temp-" + + System.currentTimeMillis()); - Path segment = new Path(segments, generateSegmentName()); - Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME); - Path lock = new Path(dbDir, CrawlDb.LOCK_NAME); FileSystem fs = FileSystem.get(getConf()); LockUtil.createLockFile(fs, lock, force); LOG.info("Generator: Selecting best-scoring urls due for fetch."); LOG.info("Generator: starting"); - LOG.info("Generator: segment: " + segment); LOG.info("Generator: filtering: " + filter); + LOG.info("Generator: normalizing: " + norm); if (topN != Long.MAX_VALUE) { LOG.info("Generator: topN: " + topN); } + + if (getConf().get(GENERATE_MAX_PER_HOST_BY_IP).equals("true")){ + LOG.info("Generator: GENERATE_MAX_PER_HOST_BY_IP will be ignored, use partition.url.mode instead"); + } // map to inverted subset due for fetch, sort by score JobConf job = new NutchJob(getConf()); - job.setJobName("generate: select " + segment); + job.setJobName("generate: select from " + dbDir); - if (numLists == -1) { // for politeness make - numLists = job.getNumMapTasks(); // a partition per fetch task + if (numLists == -1) { // for politeness make + numLists = job.getNumMapTasks(); // a partition per fetch task } if ("local".equals(job.get("mapred.job.tracker")) && numLists != 1) { // override LOG.info("Generator: jobtracker is 'local', generating exactly one partition."); numLists = 1; } - job.setLong(CRAWL_GEN_CUR_TIME, curTime); + job.setLong(GENERATOR_CUR_TIME, curTime); // record real generation time long generateTime = System.currentTimeMillis(); job.setLong(Nutch.GENERATE_TIME_KEY, generateTime); - job.setLong(CRAWL_TOP_N, topN); - job.setBoolean(CRAWL_GENERATE_FILTER, filter); + job.setLong(GENERATOR_TOP_N, topN); + job.setBoolean(GENERATOR_FILTER, filter); + job.setBoolean(GENERATOR_NORMALISE, norm); + job.setInt(GENERATOR_MAX_NUM_SEGMENTS, maxNumSegments); FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); job.setInputFormat(SequenceFileInputFormat.class); @@ -465,75 +517,52 @@ public class Generator extends Configure job.setOutputKeyClass(FloatWritable.class); job.setOutputKeyComparatorClass(DecreasingFloatComparator.class); job.setOutputValueClass(SelectorEntry.class); + job.setOutputFormat(GeneratorOutputFormat.class); + try { JobClient.runJob(job); } catch (IOException e) { - LockUtil.removeLockFile(fs, lock); throw e; } - - // check that we selected at least some entries ... - SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(job, tempDir); - boolean empty = true; - if (readers != null && readers.length > 0) { - for (int num = 0; num < readers.length; num++) { - if (readers[num].next(new FloatWritable())) { - empty = false; - break; - } + + // read the subdirectories generated in the temp + // output and turn them into segments + List<Path> generatedSegments = new ArrayList<Path>(); + + FileStatus[] status = fs.listStatus(tempDir); + try { + for (FileStatus stat : status) { + Path subfetchlist = stat.getPath(); + if (!subfetchlist.getName().startsWith("fetchlist-")) continue; + // start a new partition job for this segment + Path newSeg = partitionSegment(fs, segments, subfetchlist, numLists); + generatedSegments.add(newSeg); } - } - - for (int i = 0; i < readers.length; i++) readers[i].close(); - - if (empty) { - LOG.warn("Generator: 0 records selected for fetching, exiting ..."); - LockUtil.removeLockFile(fs, lock); + } catch (Exception e) { + LOG.warn("Generator: exception while partitioning segments, exiting ..."); fs.delete(tempDir, true); return null; } - // invert again, paritition by host, sort by url hash - if (LOG.isInfoEnabled()) { - LOG.info("Generator: Partitioning selected urls by host, for politeness."); - } - job = new NutchJob(getConf()); - job.setJobName("generate: partition " + segment); - - job.setInt("partition.url.by.host.seed", new Random().nextInt()); - - FileInputFormat.addInputPath(job, tempDir); - job.setInputFormat(SequenceFileInputFormat.class); - - job.setMapperClass(SelectorInverseMapper.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(SelectorEntry.class); - job.setPartitionerClass(PartitionUrlByHost.class); - job.setReducerClass(PartitionReducer.class); - job.setNumReduceTasks(numLists); - - FileOutputFormat.setOutputPath(job, output); - job.setOutputFormat(SequenceFileOutputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(CrawlDatum.class); - job.setOutputKeyComparatorClass(HashComparator.class); - try { - JobClient.runJob(job); - } catch (IOException e) { + if (generatedSegments.size() == 0) { + LOG.warn("Generator: 0 records selected for fetching, exiting ..."); LockUtil.removeLockFile(fs, lock); fs.delete(tempDir, true); - throw e; + return null; } + if (getConf().getBoolean(GENERATE_UPDATE_CRAWLDB, false)) { // update the db from tempDir - Path tempDir2 = - new Path(getConf().get("mapred.temp.dir", ".") + - "/generate-temp-"+ System.currentTimeMillis()); - + Path tempDir2 = new Path(getConf().get("mapred.temp.dir", ".") + "/generate-temp-" + + System.currentTimeMillis()); + job = new NutchJob(getConf()); job.setJobName("generate: updatedb " + dbDir); job.setLong(Nutch.GENERATE_TIME_KEY, generateTime); - FileInputFormat.addInputPath(job, output); + for (Path segmpaths : generatedSegments) { + Path subGenDir = new Path(segmpaths, CrawlDatum.GENERATE_DIR_NAME); + FileInputFormat.addInputPath(job, subGenDir); + } FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(CrawlDbUpdater.class); @@ -553,22 +582,61 @@ public class Generator extends Configure } fs.delete(tempDir2, true); } + LockUtil.removeLockFile(fs, lock); fs.delete(tempDir, true); - if (LOG.isInfoEnabled()) { LOG.info("Generator: done."); } + if (LOG.isInfoEnabled()) { + LOG.info("Generator: done."); + } + + Path[] patharray = new Path[generatedSegments.size()]; + return generatedSegments.toArray(patharray); + } + + private Path partitionSegment(FileSystem fs, Path segmentsDir, Path inputDir, + int numLists) throws IOException { + // invert again, partition by host/domain/IP, sort by url hash + if (LOG.isInfoEnabled()) { + LOG.info("Generator: Partitioning selected urls for politeness."); + } + Path segment = new Path(segmentsDir, generateSegmentName()); + Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME); + + LOG.info("Generator: segment: " + segment); + + NutchJob job = new NutchJob(getConf()); + job.setJobName("generate: partition " + segment); + + job.setInt("partition.url.seed", new Random().nextInt()); + FileInputFormat.addInputPath(job, inputDir); + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(SelectorInverseMapper.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(SelectorEntry.class); + job.setPartitionerClass(URLPartitioner.class); + job.setReducerClass(PartitionReducer.class); + job.setNumReduceTasks(numLists); + + FileOutputFormat.setOutputPath(job, output); + job.setOutputFormat(SequenceFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(CrawlDatum.class); + job.setOutputKeyComparatorClass(HashComparator.class); + JobClient.runJob(job); return segment; } - + private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); public static synchronized String generateSegmentName() { try { Thread.sleep(1000); - } catch (Throwable t) {}; - return sdf.format - (new Date(System.currentTimeMillis())); + } catch (Throwable t) {} + ; + return sdf.format(new Date(System.currentTimeMillis())); } /** @@ -578,10 +646,11 @@ public class Generator extends Configure int res = ToolRunner.run(NutchConfiguration.create(), new Generator(), args); System.exit(res); } - + public int run(String[] args) throws Exception { if (args.length < 2) { - System.out.println("Usage: Generator <crawldb> <segments_dir> [-force] [-topN N] [-numFetchers numFetchers] [-adddays numDays] [-noFilter]"); + System.out + .println("Usage: Generator <crawldb> <segments_dir> [-force] [-topN N] [-numFetchers numFetchers] [-adddays numDays] [-noFilter] [-noNorm][-maxNumSegments num]"); return -1; } @@ -591,33 +660,40 @@ public class Generator extends Configure long topN = Long.MAX_VALUE; int numFetchers = -1; boolean filter = true; + boolean norm = true; boolean force = false; + int maxNumSegments = 1; for (int i = 2; i < args.length; i++) { if ("-topN".equals(args[i])) { - topN = Long.parseLong(args[i+1]); + topN = Long.parseLong(args[i + 1]); i++; } else if ("-numFetchers".equals(args[i])) { - numFetchers = Integer.parseInt(args[i+1]); + numFetchers = Integer.parseInt(args[i + 1]); i++; } else if ("-adddays".equals(args[i])) { - long numDays = Integer.parseInt(args[i+1]); + long numDays = Integer.parseInt(args[i + 1]); curTime += numDays * 1000L * 60 * 60 * 24; } else if ("-noFilter".equals(args[i])) { filter = false; + } else if ("-noNorm".equals(args[i])) { + norm = false; } else if ("-force".equals(args[i])) { force = true; + } else if ("-maxNumSegments".equals(args[i])) { + maxNumSegments = Integer.parseInt(args[i + 1]); } - + } try { - Path seg = generate(dbDir, segmentsDir, numFetchers, topN, curTime, filter, force); - if (seg == null) return -2; - else return 0; + Path[] segs = generate(dbDir, segmentsDir, numFetchers, topN, curTime, filter, + norm, force, maxNumSegments); + if (segs == null) return -1; } catch (Exception e) { LOG.fatal("Generator: " + StringUtils.stringifyException(e)); return -1; } + return 0; } } Added: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/URLPartitioner.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/URLPartitioner.java?rev=926155&view=auto ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/URLPartitioner.java (added) +++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/URLPartitioner.java Mon Mar 22 16:19:12 2010 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.crawl; + +import java.net.InetAddress; +import java.net.URL; +import java.net.MalformedURLException; +import java.net.UnknownHostException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.nutch.net.URLNormalizers; +import org.apache.nutch.util.URLUtil; + +/** + * Partition urls by host, domain name or IP depending on the value of the + * parameter 'partition.url.mode' which can be 'byHost', 'byDomain' or 'byIP' + */ +public class URLPartitioner implements Partitioner<Text,Writable> { + private static final Log LOG = LogFactory.getLog(URLPartitioner.class); + + public static final String PARTITION_MODE_KEY = "partition.url.mode"; + + public static final String PARTITION_MODE_HOST = "byHost"; + public static final String PARTITION_MODE_DOMAIN = "byDomain"; + public static final String PARTITION_MODE_IP = "byIP"; + + private int seed; + private URLNormalizers normalizers; + private String mode = PARTITION_MODE_HOST; + + public void configure(JobConf job) { + seed = job.getInt("partition.url.seed", 0); + mode = job.get(PARTITION_MODE_KEY, PARTITION_MODE_HOST); + // check that the mode is known + if (!mode.equals(PARTITION_MODE_IP) && !mode.equals(PARTITION_MODE_DOMAIN) + && !mode.equals(PARTITION_MODE_HOST)) { + LOG.error("Unknown partition mode : " + mode + " - forcing to byHost"); + mode = PARTITION_MODE_HOST; + } + normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_PARTITION); + } + + public void close() {} + + /** Hash by domain name. */ + public int getPartition(Text key, Writable value, int numReduceTasks) { + String urlString = key.toString(); + URL url = null; + int hashCode = urlString.hashCode(); + try { + urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_PARTITION); + url = new URL(urlString); + hashCode = url.getHost().hashCode(); + } catch (MalformedURLException e) { + LOG.warn("Malformed URL: '" + urlString + "'"); + } + + if (mode.equals(PARTITION_MODE_DOMAIN) && url != null) hashCode = URLUtil + .getDomainName(url).hashCode(); + else if (mode.equals(PARTITION_MODE_IP)) { + try { + InetAddress address = InetAddress.getByName(url.getHost()); + hashCode = address.getHostAddress().hashCode(); + } catch (UnknownHostException e) { + Generator.LOG.info("Couldn't find IP for host: " + url.getHost()); + } + } + + // make hosts wind up in different partitions on different runs + hashCode ^= seed; + + return (hashCode & Integer.MAX_VALUE) % numReduceTasks; + } + +} Modified: lucene/nutch/trunk/src/java/org/apache/nutch/net/URLNormalizers.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/net/URLNormalizers.java?rev=926155&r1=926154&r2=926155&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/net/URLNormalizers.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/net/URLNormalizers.java Mon Mar 22 16:19:12 2010 @@ -79,7 +79,7 @@ public final class URLNormalizers { * this scope will be used. */ public static final String SCOPE_DEFAULT = "default"; - /** Scope used by {...@link org.apache.nutch.crawl.PartitionUrlByHost}. */ + /** Scope used by {...@link org.apache.nutch.crawl.URLPartitioner}. */ public static final String SCOPE_PARTITION = "partition"; /** Scope used by {...@link org.apache.nutch.crawl.Generator}. */ public static final String SCOPE_GENERATE_HOST_COUNT = "generate_host_count"; Modified: lucene/nutch/trunk/src/java/org/apache/nutch/tools/FreeGenerator.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/tools/FreeGenerator.java?rev=926155&r1=926154&r2=926155&view=diff ============================================================================== --- lucene/nutch/trunk/src/java/org/apache/nutch/tools/FreeGenerator.java (original) +++ lucene/nutch/trunk/src/java/org/apache/nutch/tools/FreeGenerator.java Mon Mar 22 16:19:12 2010 @@ -44,7 +44,7 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.crawl.Generator; -import org.apache.nutch.crawl.PartitionUrlByHost; +import org.apache.nutch.crawl.URLPartitioner; import org.apache.nutch.net.URLFilters; import org.apache.nutch.net.URLNormalizers; import org.apache.nutch.scoring.ScoringFilters; @@ -165,7 +165,7 @@ public class FreeGenerator extends Confi job.setMapperClass(FG.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Generator.SelectorEntry.class); - job.setPartitionerClass(PartitionUrlByHost.class); + job.setPartitionerClass(URLPartitioner.class); job.setReducerClass(FG.class); String segName = Generator.generateSegmentName(); job.setNumReduceTasks(job.getNumMapTasks()); Modified: lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java?rev=926155&r1=926154&r2=926155&view=diff ============================================================================== --- lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java (original) +++ lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java Mon Mar 22 16:19:12 2010 @@ -144,7 +144,7 @@ public class TestGenerator extends TestC createCrawlDB(list); Configuration myConfiguration = new Configuration(conf); - myConfiguration.setInt(Generator.GENERATE_MAX_PER_HOST, 1); + myConfiguration.setInt(Generator.GENERATOR_MAX_COUNT, 1); Path generatedSegment = generateFetchlist(Integer.MAX_VALUE, myConfiguration, false); @@ -157,7 +157,7 @@ public class TestGenerator extends TestC assertEquals(1, fetchList.size()); myConfiguration = new Configuration(conf); - myConfiguration.setInt(Generator.GENERATE_MAX_PER_HOST, 2); + myConfiguration.setInt(Generator.GENERATOR_MAX_COUNT, 2); generatedSegment = generateFetchlist(Integer.MAX_VALUE, myConfiguration, false); @@ -170,7 +170,7 @@ public class TestGenerator extends TestC assertEquals(2, fetchList.size()); myConfiguration = new Configuration(conf); - myConfiguration.setInt(Generator.GENERATE_MAX_PER_HOST, 3); + myConfiguration.setInt(Generator.GENERATOR_MAX_COUNT, 3); generatedSegment = generateFetchlist(Integer.MAX_VALUE, myConfiguration, false); @@ -184,22 +184,22 @@ public class TestGenerator extends TestC } /** - * Test that generator obeys the property "generate.max.per.host" and - * "generate.max.per.host.by.ip". + * Test that generator obeys the property "generator.max.count" and + * "generator.count.per.domain". * @throws Exception */ - public void testGenerateHostIPLimit() throws Exception{ + public void testGenerateDomainLimit() throws Exception{ ArrayList<URLCrawlDatum> list = new ArrayList<URLCrawlDatum>(); - list.add(createURLCrawlDatum("http://www.example.com/index.html", 1, 1)); - list.add(createURLCrawlDatum("http://www.example.net/index.html", 1, 1)); - list.add(createURLCrawlDatum("http://www.example.org/index.html", 1, 1)); + list.add(createURLCrawlDatum("http://a.example.com/index.html", 1, 1)); + list.add(createURLCrawlDatum("http://b.example.com/index.html", 1, 1)); + list.add(createURLCrawlDatum("http://c.example.com/index.html", 1, 1)); createCrawlDB(list); Configuration myConfiguration = new Configuration(conf); - myConfiguration.setInt(Generator.GENERATE_MAX_PER_HOST, 1); - myConfiguration.setBoolean(Generator.GENERATE_MAX_PER_HOST_BY_IP, true); + myConfiguration.setInt(Generator.GENERATOR_MAX_COUNT, 1); + myConfiguration.set(Generator.GENERATOR_COUNT_MODE, Generator.GENERATOR_COUNT_VALUE_DOMAIN); Path generatedSegment = generateFetchlist(Integer.MAX_VALUE, myConfiguration, false); @@ -213,7 +213,7 @@ public class TestGenerator extends TestC assertEquals(1, fetchList.size()); myConfiguration = new Configuration(myConfiguration); - myConfiguration.setInt(Generator.GENERATE_MAX_PER_HOST, 2); + myConfiguration.setInt(Generator.GENERATOR_MAX_COUNT, 2); generatedSegment = generateFetchlist(Integer.MAX_VALUE, myConfiguration, false); fetchlistPath = new Path(new Path(generatedSegment, @@ -225,7 +225,7 @@ public class TestGenerator extends TestC assertEquals(2, fetchList.size()); myConfiguration = new Configuration(myConfiguration); - myConfiguration.setInt(Generator.GENERATE_MAX_PER_HOST, 3); + myConfiguration.setInt(Generator.GENERATOR_MAX_COUNT, 3); generatedSegment = generateFetchlist(Integer.MAX_VALUE, myConfiguration, false); @@ -310,9 +310,10 @@ public class TestGenerator extends TestC boolean filter) throws IOException { // generate segment Generator g = new Generator(config); - Path generatedSegment = g.generate(dbDir, segmentsDir, -1, numResults, + Path[] generatedSegment = g.generate(dbDir, segmentsDir, -1, numResults, Long.MAX_VALUE, filter, false); - return generatedSegment; + if (generatedSegment==null) return null; + return generatedSegment[0]; } /** Modified: lucene/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java?rev=926155&r1=926154&r2=926155&view=diff ============================================================================== --- lucene/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java (original) +++ lucene/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java Mon Mar 22 16:19:12 2010 @@ -92,13 +92,13 @@ public class TestFetcher extends TestCas //generate Generator g=new Generator(conf); - Path generatedSegment = g.generate(crawldbPath, segmentsPath, 1, + Path[] generatedSegment = g.generate(crawldbPath, segmentsPath, 1, Long.MAX_VALUE, Long.MAX_VALUE, false, false); long time=System.currentTimeMillis(); //fetch Fetcher fetcher=new Fetcher(conf); - fetcher.fetch(generatedSegment, 1, true); + fetcher.fetch(generatedSegment[0], 1, true); time=System.currentTimeMillis()-time; @@ -107,7 +107,7 @@ public class TestFetcher extends TestCas assertTrue(time > minimumTime); //verify content - Path content=new Path(new Path(generatedSegment, Content.DIR_NAME),"part-00000/data"); + Path content=new Path(new Path(generatedSegment[0], Content.DIR_NAME),"part-00000/data"); SequenceFile.Reader reader=new SequenceFile.Reader(fs, content, conf); ArrayList<String> handledurls=new ArrayList<String>(); @@ -138,7 +138,7 @@ public class TestFetcher extends TestCas handledurls.clear(); //verify parse data - Path parseData = new Path(new Path(generatedSegment, ParseData.DIR_NAME),"part-00000/data"); + Path parseData = new Path(new Path(generatedSegment[0], ParseData.DIR_NAME),"part-00000/data"); reader = new SequenceFile.Reader(fs, parseData, conf); READ_PARSE_DATA: