Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/GeneratorJob.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/GeneratorJob.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/crawl/GeneratorJob.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/GeneratorJob.java Fri Jan 9 06:34:33 2015 @@ -74,13 +74,14 @@ public class GeneratorJob extends NutchT public static final Logger LOG = LoggerFactory.getLogger(GeneratorJob.class); - public static class SelectorEntry - implements WritableComparable<SelectorEntry> { + public static class SelectorEntry implements + WritableComparable<SelectorEntry> { String url; float score; - public SelectorEntry() { } + public SelectorEntry() { + } public SelectorEntry(String url, float score) { this.url = url; @@ -109,7 +110,7 @@ public class GeneratorJob extends NutchT public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + url.hashCode(); + result = prime * result + url.hashCode(); result = prime * result + Float.floatToIntBits(score); return result; } @@ -126,13 +127,13 @@ public class GeneratorJob extends NutchT /** * Sets url with score on this writable. Allows for writable reusing. - * + * * @param url * @param score */ public void set(String url, float score) { - this.url=url; - this.score=score; + this.url = url; + this.score = score; } } @@ -144,7 +145,7 @@ public class GeneratorJob extends NutchT static { WritableComparator.define(SelectorEntry.class, - new SelectorEntryComparator()); + new SelectorEntryComparator()); } public GeneratorJob() { @@ -157,24 +158,25 @@ public class GeneratorJob extends NutchT public Collection<WebPage.Field> getFields(Job job) { Collection<WebPage.Field> fields = new HashSet<WebPage.Field>(FIELDS); - fields.addAll(FetchScheduleFactory.getFetchSchedule(job.getConfiguration()).getFields()); + fields.addAll(FetchScheduleFactory.getFetchSchedule(job.getConfiguration()) + .getFields()); return fields; } - public Map<String,Object> run(Map<String,Object> args) throws Exception { - String batchId = (String)args.get(Nutch.ARG_BATCH); + public Map<String, Object> run(Map<String, Object> args) throws Exception { + String batchId = (String) args.get(Nutch.ARG_BATCH); if (batchId != null) { getConf().set(GeneratorJob.BATCH_ID, batchId); } - + // map to inverted subset due for fetch, sort by score - Long topN = (Long)args.get(Nutch.ARG_TOPN); - Long curTime = (Long)args.get(Nutch.ARG_CURTIME); + Long topN = (Long) args.get(Nutch.ARG_TOPN); + Long curTime = (Long) args.get(Nutch.ARG_CURTIME); if (curTime == null) { curTime = System.currentTimeMillis(); } - Boolean filter = (Boolean)args.get(Nutch.ARG_FILTER); - Boolean norm = (Boolean)args.get(Nutch.ARG_NORMALIZE); + Boolean filter = (Boolean) args.get(Nutch.ARG_FILTER); + Boolean norm = (Boolean) args.get(Nutch.ARG_NORMALIZE); // map to inverted subset due for fetch, sort by score getConf().setLong(GENERATOR_CUR_TIME, curTime); if (topN != null) @@ -185,22 +187,28 @@ public class GeneratorJob extends NutchT getConf().setLong(Nutch.GENERATE_TIME_KEY, System.currentTimeMillis()); if (norm != null) getConf().setBoolean(GENERATOR_NORMALISE, norm); - String mode = getConf().get(GENERATOR_COUNT_MODE, GENERATOR_COUNT_VALUE_HOST); + String mode = getConf().get(GENERATOR_COUNT_MODE, + GENERATOR_COUNT_VALUE_HOST); if (GENERATOR_COUNT_VALUE_HOST.equalsIgnoreCase(mode)) { - getConf().set(URLPartitioner.PARTITION_MODE_KEY, URLPartitioner.PARTITION_MODE_HOST); + getConf().set(URLPartitioner.PARTITION_MODE_KEY, + URLPartitioner.PARTITION_MODE_HOST); } else if (GENERATOR_COUNT_VALUE_DOMAIN.equalsIgnoreCase(mode)) { - getConf().set(URLPartitioner.PARTITION_MODE_KEY, URLPartitioner.PARTITION_MODE_DOMAIN); + getConf().set(URLPartitioner.PARTITION_MODE_KEY, + URLPartitioner.PARTITION_MODE_DOMAIN); } else { - LOG.warn("Unknown generator.max.count mode '" + mode + "', using mode=" + GENERATOR_COUNT_VALUE_HOST); + LOG.warn("Unknown generator.max.count mode '" + mode + "', using mode=" + + GENERATOR_COUNT_VALUE_HOST); getConf().set(GENERATOR_COUNT_MODE, GENERATOR_COUNT_VALUE_HOST); - getConf().set(URLPartitioner.PARTITION_MODE_KEY, URLPartitioner.PARTITION_MODE_HOST); + getConf().set(URLPartitioner.PARTITION_MODE_KEY, + URLPartitioner.PARTITION_MODE_HOST); } numJobs = 1; currentJobNum = 0; currentJob = new NutchJob(getConf(), "generate: " + getConf().get(BATCH_ID)); Collection<WebPage.Field> fields = getFields(currentJob); StorageUtils.initMapperJob(currentJob, fields, SelectorEntry.class, - WebPage.class, GeneratorMapper.class, SelectorEntryPartitioner.class, true); + WebPage.class, GeneratorMapper.class, SelectorEntryPartitioner.class, + true); StorageUtils.initReducerJob(currentJob, GeneratorReducer.class); currentJob.waitForCompletion(true); ToolUtil.recordJobStatus(null, currentJob, results); @@ -213,6 +221,7 @@ public class GeneratorJob extends NutchT /** * Mark URLs ready for fetching. + * * @throws ClassNotFoundException * @throws InterruptedException * */ @@ -229,16 +238,16 @@ public class GeneratorJob extends NutchT if (topN != Long.MAX_VALUE) { LOG.info("GeneratorJob: topN: " + topN); } - Map<String,Object> results = run(ToolUtil.toArgMap( - Nutch.ARG_TOPN, topN, - Nutch.ARG_CURTIME, curTime, - Nutch.ARG_FILTER, filter, + Map<String, Object> results = run(ToolUtil.toArgMap(Nutch.ARG_TOPN, topN, + Nutch.ARG_CURTIME, curTime, Nutch.ARG_FILTER, filter, Nutch.ARG_NORMALIZE, norm)); - String batchId = getConf().get(BATCH_ID); + String batchId = getConf().get(BATCH_ID); long finish = System.currentTimeMillis(); long generateCount = (Long) results.get(GENERATE_COUNT); - LOG.info("GeneratorJob: finished at " + sdf.format(finish) + ", time elapsed: " + TimingUtil.elapsedTime(start, finish)); - LOG.info("GeneratorJob: generated batch id: " + batchId + " containing " + generateCount + " URLs"); + LOG.info("GeneratorJob: finished at " + sdf.format(finish) + + ", time elapsed: " + TimingUtil.elapsedTime(start, finish)); + LOG.info("GeneratorJob: generated batch id: " + batchId + " containing " + + generateCount + " URLs"); if (generateCount == 0) { return null; } @@ -247,13 +256,20 @@ public class GeneratorJob extends NutchT public int run(String[] args) throws Exception { if (args.length <= 0) { - System.out.println("Usage: GeneratorJob [-topN N] [-crawlId id] [-noFilter] [-noNorm] [-adddays numDays]"); - System.out.println(" -topN <N> - number of top URLs to be selected, default is Long.MAX_VALUE "); - System.out.println(" -crawlId <id> - the id to prefix the schemas to operate on, \n \t \t (default: storage.crawl.id)\");"); - System.out.println(" -noFilter - do not activate the filter plugin to filter the url, default is true "); - System.out.println(" -noNorm - do not activate the normalizer plugin to normalize the url, default is true "); - System.out.println(" -adddays - Adds numDays to the current time to facilitate crawling urls already"); - System.out.println(" fetched sooner then db.fetch.interval.default. Default value is 0."); + System.out + .println("Usage: GeneratorJob [-topN N] [-crawlId id] [-noFilter] [-noNorm] [-adddays numDays]"); + System.out + .println(" -topN <N> - number of top URLs to be selected, default is Long.MAX_VALUE "); + System.out + .println(" -crawlId <id> - the id to prefix the schemas to operate on, \n \t \t (default: storage.crawl.id)\");"); + System.out + .println(" -noFilter - do not activate the filter plugin to filter the url, default is true "); + System.out + .println(" -noNorm - do not activate the normalizer plugin to normalize the url, default is true "); + System.out + .println(" -adddays - Adds numDays to the current time to facilitate crawling urls already"); + System.out + .println(" fetched sooner then db.fetch.interval.default. Default value is 0."); System.out.println(" -batchId - the batch id "); System.out.println("----------------------"); System.out.println("Please set the params."); @@ -280,8 +296,8 @@ public class GeneratorJob extends NutchT } else if ("-adddays".equals(args[i])) { long numDays = Integer.parseInt(args[++i]); curTime += numDays * 1000L * 60 * 60 * 24; - }else if ("-batchId".equals(args[i])) - getConf().set(BATCH_ID,args[++i]); + } else if ("-batchId".equals(args[i])) + getConf().set(BATCH_ID, args[++i]); else { System.err.println("Unrecognized arg " + args[i]); return -1; @@ -297,7 +313,8 @@ public class GeneratorJob extends NutchT } public static void main(String args[]) throws Exception { - int res = ToolRunner.run(NutchConfiguration.create(), new GeneratorJob(), args); + int res = ToolRunner.run(NutchConfiguration.create(), new GeneratorJob(), + args); System.exit(res); }
Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/GeneratorMapper.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/GeneratorMapper.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/crawl/GeneratorMapper.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/GeneratorMapper.java Fri Jan 9 06:34:33 2015 @@ -34,8 +34,8 @@ import java.net.MalformedURLException; import java.nio.ByteBuffer; import java.util.HashMap; -public class GeneratorMapper -extends GoraMapper<String, WebPage, SelectorEntry, WebPage> { +public class GeneratorMapper extends + GoraMapper<String, WebPage, SelectorEntry, WebPage> { private URLFilters filters; private URLNormalizers normalizers; @@ -48,8 +48,8 @@ extends GoraMapper<String, WebPage, Sele private int maxDistance; @Override - public void map(String reversedUrl, WebPage page, - Context context) throws IOException, InterruptedException { + public void map(String reversedUrl, WebPage page, Context context) + throws IOException, InterruptedException { String url = TableUtil.unreverseUrl(reversedUrl); if (Mark.GENERATE_MARK.checkMark(page) != null) { @@ -57,11 +57,11 @@ extends GoraMapper<String, WebPage, Sele return; } - //filter on distance + // filter on distance if (maxDistance > -1) { CharSequence distanceUtf8 = page.getMarkers().get(DbUpdaterJob.DISTANCE); if (distanceUtf8 != null) { - int distance=Integer.parseInt(distanceUtf8.toString()); + int distance = Integer.parseInt(distanceUtf8.toString()); if (distance > maxDistance) { return; } @@ -71,23 +71,26 @@ extends GoraMapper<String, WebPage, Sele // If filtering is on don't generate URLs that don't pass URLFilters try { if (normalise) { - url = normalizers.normalize(url, URLNormalizers.SCOPE_GENERATE_HOST_COUNT); + url = normalizers.normalize(url, + URLNormalizers.SCOPE_GENERATE_HOST_COUNT); } if (filter && filters.filter(url) == null) return; } catch (URLFilterException e) { - GeneratorJob.LOG.warn("Couldn't filter url: {} ({})", url, e.getMessage()); + GeneratorJob.LOG + .warn("Couldn't filter url: {} ({})", url, e.getMessage()); return; } catch (MalformedURLException e) { - GeneratorJob.LOG.warn("Couldn't filter url: {} ({})", url, e.getMessage()); + GeneratorJob.LOG + .warn("Couldn't filter url: {} ({})", url, e.getMessage()); return; } // check fetch schedule if (!schedule.shouldFetch(url, page, curTime)) { if (GeneratorJob.LOG.isDebugEnabled()) { - GeneratorJob.LOG.debug("-shouldFetch rejected '" + url + "', fetchTime=" + - page.getFetchTime() + ", curTime=" + curTime); + GeneratorJob.LOG.debug("-shouldFetch rejected '" + url + + "', fetchTime=" + page.getFetchTime() + ", curTime=" + curTime); } return; } @@ -95,7 +98,7 @@ extends GoraMapper<String, WebPage, Sele try { score = scoringFilters.generatorSortValue(url, page, score); } catch (ScoringFilterException e) { - //ignore + // ignore } entry.set(url, score); context.write(entry, page); @@ -110,10 +113,12 @@ extends GoraMapper<String, WebPage, Sele filters = new URLFilters(conf); } if (normalise) { - normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_GENERATE_HOST_COUNT); + normalizers = new URLNormalizers(conf, + URLNormalizers.SCOPE_GENERATE_HOST_COUNT); } - maxDistance=conf.getInt("generate.max.distance", -1); - curTime = conf.getLong(GeneratorJob.GENERATOR_CUR_TIME, System.currentTimeMillis()); + maxDistance = conf.getInt("generate.max.distance", -1); + curTime = conf.getLong(GeneratorJob.GENERATOR_CUR_TIME, + System.currentTimeMillis()); schedule = FetchScheduleFactory.getFetchSchedule(conf); scoringFilters = new ScoringFilters(conf); } Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/GeneratorReducer.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/GeneratorReducer.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/crawl/GeneratorReducer.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/GeneratorReducer.java Fri Jan 9 06:34:33 2015 @@ -34,14 +34,15 @@ import org.apache.nutch.storage.WebPage; import org.apache.nutch.util.TableUtil; import org.apache.nutch.util.URLUtil; -/** Reduce class for generate - * - * The #reduce() method write a random integer to all generated URLs. This random - * number is then used by {@link FetcherMapper}. - * +/** + * Reduce class for generate + * + * The #reduce() method write a random integer to all generated URLs. This + * random number is then used by {@link FetcherMapper}. + * */ -public class GeneratorReducer -extends GoraReducer<SelectorEntry, WebPage, String, WebPage> { +public class GeneratorReducer extends + GoraReducer<SelectorEntry, WebPage, String, WebPage> { private long limit; private long maxCount; @@ -81,7 +82,7 @@ extends GoraReducer<SelectorEntry, WebPa try { context.write(TableUtil.reverseUrl(key.url), page); } catch (MalformedURLException e) { - context.getCounter("Generator", "MALFORMED_URL").increment(1); + context.getCounter("Generator", "MALFORMED_URL").increment(1); continue; } context.getCounter("Generator", "GENERATE_MARK").increment(1); @@ -90,10 +91,11 @@ extends GoraReducer<SelectorEntry, WebPa } @Override - protected void setup(Context context) - throws IOException, InterruptedException { + protected void setup(Context context) throws IOException, + InterruptedException { Configuration conf = context.getConfiguration(); - long totalLimit = conf.getLong(GeneratorJob.GENERATOR_TOP_N, Long.MAX_VALUE); + long totalLimit = conf + .getLong(GeneratorJob.GENERATOR_TOP_N, Long.MAX_VALUE); if (totalLimit == Long.MAX_VALUE) { limit = Long.MAX_VALUE; } else { @@ -101,8 +103,8 @@ extends GoraReducer<SelectorEntry, WebPa } maxCount = conf.getLong(GeneratorJob.GENERATOR_MAX_COUNT, -2); batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID)); - String countMode = - conf.get(GeneratorJob.GENERATOR_COUNT_MODE, GeneratorJob.GENERATOR_COUNT_VALUE_HOST); + String countMode = conf.get(GeneratorJob.GENERATOR_COUNT_MODE, + GeneratorJob.GENERATOR_COUNT_VALUE_HOST); if (countMode.equals(GeneratorJob.GENERATOR_COUNT_VALUE_DOMAIN)) { byDomain = true; } Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/InjectorJob.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/InjectorJob.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/crawl/InjectorJob.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/InjectorJob.java Fri Jan 9 06:34:33 2015 @@ -47,14 +47,17 @@ import java.nio.ByteBuffer; import java.text.SimpleDateFormat; import java.util.*; -/** This class takes a flat file of URLs and adds them to the of pages to be - * crawled. Useful for bootstrapping the system. - * The URL files contain one URL per line, optionally followed by custom metadata - * separated by tabs with the metadata key separated from the corresponding value by '='. <br> +/** + * This class takes a flat file of URLs and adds them to the of pages to be + * crawled. Useful for bootstrapping the system. The URL files contain one URL + * per line, optionally followed by custom metadata separated by tabs with the + * metadata key separated from the corresponding value by '='. <br> * Note that some metadata keys are reserved : <br> * - <i>nutch.score</i> : allows to set a custom score for a specific URL <br> - * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a specific URL <br> - * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t userType=open_source + * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a + * specific URL <br> + * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 + * \t userType=open_source **/ public class InjectorJob extends NutchTool implements Tool { @@ -63,7 +66,7 @@ public class InjectorJob extends NutchTo private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>(); private static final Utf8 YES_STRING = new Utf8("y"); - + static { FIELDS.add(WebPage.Field.MARKERS); FIELDS.add(WebPage.Field.STATUS); @@ -75,7 +78,7 @@ public class InjectorJob extends NutchTo * metadata key reserved for setting a custom fetchInterval for a specific URL */ public static String nutchFetchIntervalMDName = "nutch.fetchInterval"; - + public static class UrlMapper extends Mapper<LongWritable, Text, String, WebPage> { private URLNormalizers urlNormalizers; @@ -86,24 +89,25 @@ public class InjectorJob extends NutchTo private long curTime; @Override - protected void setup(Context context) throws IOException, InterruptedException { + protected void setup(Context context) throws IOException, + InterruptedException { urlNormalizers = new URLNormalizers(context.getConfiguration(), - URLNormalizers.SCOPE_INJECT); + URLNormalizers.SCOPE_INJECT); interval = context.getConfiguration().getInt("db.fetch.interval.default", - 2592000); + 2592000); filters = new URLFilters(context.getConfiguration()); scfilters = new ScoringFilters(context.getConfiguration()); scoreInjected = context.getConfiguration().getFloat("db.score.injected", - 1.0f); + 1.0f); curTime = context.getConfiguration().getLong("injector.current.time", - System.currentTimeMillis()); + System.currentTimeMillis()); } protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String url = value.toString().trim(); // value is line of text - - if (url != null && ( url.length() == 0 || url.startsWith("#") ) ) { + + if (url != null && (url.length() == 0 || url.startsWith("#"))) { /* Ignore line that start with # */ return; } @@ -149,41 +153,43 @@ public class InjectorJob extends NutchTo if (url == null) { context.getCounter("injector", "urls_filtered").increment(1); return; - } else { // if it passes - String reversedUrl = TableUtil.reverseUrl(url); // collect it - WebPage row = WebPage.newBuilder().build(); - row.setFetchTime(curTime); - row.setFetchInterval(customInterval); - - // now add the metadata - Iterator<String> keysIter = metadata.keySet().iterator(); - while (keysIter.hasNext()) { - String keymd = keysIter.next(); - String valuemd = metadata.get(keymd); - row.getMetadata().put(new Utf8(keymd), ByteBuffer.wrap(valuemd.getBytes())); - } - - if (customScore != -1) - row.setScore(customScore); - else - row.setScore(scoreInjected); + } else { // if it passes + String reversedUrl = TableUtil.reverseUrl(url); // collect it + WebPage row = WebPage.newBuilder().build(); + row.setFetchTime(curTime); + row.setFetchInterval(customInterval); + + // now add the metadata + Iterator<String> keysIter = metadata.keySet().iterator(); + while (keysIter.hasNext()) { + String keymd = keysIter.next(); + String valuemd = metadata.get(keymd); + row.getMetadata().put(new Utf8(keymd), + ByteBuffer.wrap(valuemd.getBytes())); + } - try { - scfilters.injectedScore(url, row); - } catch (ScoringFilterException e) { - if (LOG.isWarnEnabled()) { - LOG.warn("Cannot filter injected score for url " + url - + ", using default (" + e.getMessage() + ")"); + if (customScore != -1) + row.setScore(customScore); + else + row.setScore(scoreInjected); + + try { + scfilters.injectedScore(url, row); + } catch (ScoringFilterException e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Cannot filter injected score for url " + url + + ", using default (" + e.getMessage() + ")"); + } } + context.getCounter("injector", "urls_injected").increment(1); + row.getMarkers() + .put(DbUpdaterJob.DISTANCE, new Utf8(String.valueOf(0))); + Mark.INJECT_MARK.putMark(row, YES_STRING); + context.write(reversedUrl, row); } - context.getCounter("injector", "urls_injected").increment(1); - row.getMarkers().put(DbUpdaterJob.DISTANCE, new Utf8(String.valueOf(0))); - Mark.INJECT_MARK.putMark(row, YES_STRING); - context.write(reversedUrl, row); - } } } - + public InjectorJob() { } @@ -191,12 +197,12 @@ public class InjectorJob extends NutchTo setConf(conf); } - public Map<String,Object> run(Map<String,Object> args) throws Exception { + public Map<String, Object> run(Map<String, Object> args) throws Exception { getConf().setLong("injector.current.time", System.currentTimeMillis()); Path input; Object path = args.get(Nutch.ARG_SEEDDIR); if (path instanceof Path) { - input = (Path)path; + input = (Path) path; } else { input = new Path(path.toString()); } @@ -208,26 +214,30 @@ public class InjectorJob extends NutchTo currentJob.setMapOutputKeyClass(String.class); currentJob.setMapOutputValueClass(WebPage.class); currentJob.setOutputFormatClass(GoraOutputFormat.class); - - DataStore<String, WebPage> store = StorageUtils.createWebStore(currentJob.getConfiguration(), - String.class, WebPage.class); + + DataStore<String, WebPage> store = StorageUtils.createWebStore( + currentJob.getConfiguration(), String.class, WebPage.class); GoraOutputFormat.setOutput(currentJob, store, true); - + // NUTCH-1471 Make explicit which datastore class we use - Class<? extends DataStore<Object, Persistent>> dataStoreClass = - StorageUtils.getDataStoreClass(currentJob.getConfiguration()); - LOG.info("InjectorJob: Using " + dataStoreClass + " as the Gora storage class."); - + Class<? extends DataStore<Object, Persistent>> dataStoreClass = StorageUtils + .getDataStoreClass(currentJob.getConfiguration()); + LOG.info("InjectorJob: Using " + dataStoreClass + + " as the Gora storage class."); + currentJob.setReducerClass(Reducer.class); currentJob.setNumReduceTasks(0); - + currentJob.waitForCompletion(true); ToolUtil.recordJobStatus(null, currentJob, results); // NUTCH-1370 Make explicit #URLs injected @runtime - long urlsInjected = currentJob.getCounters().findCounter("injector", "urls_injected").getValue(); - long urlsFiltered = currentJob.getCounters().findCounter("injector", "urls_filtered").getValue(); - LOG.info("InjectorJob: total number of urls rejected by filters: " + urlsFiltered); + long urlsInjected = currentJob.getCounters() + .findCounter("injector", "urls_injected").getValue(); + long urlsFiltered = currentJob.getCounters() + .findCounter("injector", "urls_filtered").getValue(); + LOG.info("InjectorJob: total number of urls rejected by filters: " + + urlsFiltered); LOG.info("InjectorJob: total number of urls injected after normalization and filtering: " + urlsInjected); @@ -238,10 +248,11 @@ public class InjectorJob extends NutchTo SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); LOG.info("InjectorJob: starting at " + sdf.format(start)); - LOG.info("InjectorJob: Injecting urlDir: " + urlDir); + LOG.info("InjectorJob: Injecting urlDir: " + urlDir); run(ToolUtil.toArgMap(Nutch.ARG_SEEDDIR, urlDir)); long end = System.currentTimeMillis(); - LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); + LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); } @Override @@ -252,7 +263,7 @@ public class InjectorJob extends NutchTo } for (int i = 1; i < args.length; i++) { if ("-crawlId".equals(args[i])) { - getConf().set(Nutch.CRAWL_ID_KEY, args[i+1]); + getConf().set(Nutch.CRAWL_ID_KEY, args[i + 1]); i++; } else { System.err.println("Unrecognized arg " + args[i]); @@ -270,7 +281,8 @@ public class InjectorJob extends NutchTo } public static void main(String[] args) throws Exception { - int res = ToolRunner.run(NutchConfiguration.create(), new InjectorJob(), args); + int res = ToolRunner.run(NutchConfiguration.create(), new InjectorJob(), + args); System.exit(res); } } Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/MD5Signature.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/MD5Signature.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/crawl/MD5Signature.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/MD5Signature.java Fri Jan 9 06:34:33 2015 @@ -26,10 +26,10 @@ import java.util.Collection; import java.util.HashSet; /** - * Default implementation of a page signature. It calculates an MD5 hash - * of the raw binary content of a page. In case there is no content, it - * calculates a hash from the page's URL. - * + * Default implementation of a page signature. It calculates an MD5 hash of the + * raw binary content of a page. In case there is no content, it calculates a + * hash from the page's URL. + * * @author Andrzej Bialecki <[email protected]> */ public class MD5Signature extends Signature { @@ -52,8 +52,7 @@ public class MD5Signature extends Signat data = null; of = 0; cb = 0; - } - else { + } else { data = baseUrl.getBytes(); of = 0; cb = baseUrl.length(); Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/NutchWritable.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/NutchWritable.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/crawl/NutchWritable.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/NutchWritable.java Fri Jan 9 06:34:33 2015 @@ -26,12 +26,12 @@ public class NutchWritable extends Gener static { CLASSES = (Class<? extends Writable>[]) new Class<?>[] { - org.apache.nutch.scoring.ScoreDatum.class, - org.apache.nutch.util.WebPageWritable.class - }; + org.apache.nutch.scoring.ScoreDatum.class, + org.apache.nutch.util.WebPageWritable.class }; } - public NutchWritable() { } + public NutchWritable() { + } public NutchWritable(Writable instance) { set(instance); Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/SignatureComparator.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/SignatureComparator.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/crawl/SignatureComparator.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/SignatureComparator.java Fri Jan 9 06:34:33 2015 @@ -21,27 +21,38 @@ import java.nio.ByteBuffer; public class SignatureComparator { public static int compare(byte[] data1, byte[] data2) { - if (data1 == null && data2 == null) return 0; - if (data1 == null) return -1; - if (data2 == null) return 1; + if (data1 == null && data2 == null) + return 0; + if (data1 == null) + return -1; + if (data2 == null) + return 1; return _compare(data1, 0, data1.length, data2, 0, data2.length); } public static int compare(ByteBuffer buf1, ByteBuffer buf2) { - if (buf1 == null && buf2 == null) return 0; - if (buf1 == null) return -1; - if (buf2 == null) return 1; - return _compare(buf1.array(), buf1.arrayOffset() + buf1.position(), buf1.remaining(), - buf2.array(), buf2.arrayOffset() + buf2.position(), buf2.remaining()); + if (buf1 == null && buf2 == null) + return 0; + if (buf1 == null) + return -1; + if (buf2 == null) + return 1; + return _compare(buf1.array(), buf1.arrayOffset() + buf1.position(), + buf1.remaining(), buf2.array(), buf2.arrayOffset() + buf2.position(), + buf2.remaining()); } - - public static int _compare(byte[] data1, int s1, int l1, byte[] data2, int s2, int l2) { - if (l2 > l1) return -1; - if (l2 < l1) return 1; + + public static int _compare(byte[] data1, int s1, int l1, byte[] data2, + int s2, int l2) { + if (l2 > l1) + return -1; + if (l2 < l1) + return 1; int res = 0; for (int i = 0; i < l1; i++) { res = (data1[s1 + i] - data2[s2 + i]); - if (res != 0) return res; + if (res != 0) + return res; } return 0; } Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/SignatureFactory.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/SignatureFactory.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/crawl/SignatureFactory.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/SignatureFactory.java Fri Jan 9 06:34:33 2015 @@ -28,26 +28,28 @@ import org.apache.nutch.util.ObjectCache /** * Factory class, which instantiates a Signature implementation according to the - * current Configuration configuration. This newly created instance is cached in the - * Configuration instance, so that it could be later retrieved. - * + * current Configuration configuration. This newly created instance is cached in + * the Configuration instance, so that it could be later retrieved. + * * @author Andrzej Bialecki <[email protected]> */ public class SignatureFactory { - private static final Logger LOG = LoggerFactory.getLogger(SignatureFactory.class); + private static final Logger LOG = LoggerFactory + .getLogger(SignatureFactory.class); - private SignatureFactory() {} // no public ctor + private SignatureFactory() { + } // no public ctor /** Return the default Signature implementation. */ public static Signature getSignature(Configuration conf) { String clazz = conf.get("db.signature.class", MD5Signature.class.getName()); ObjectCache objectCache = ObjectCache.get(conf); - Signature impl = (Signature)objectCache.getObject(clazz); + Signature impl = (Signature) objectCache.getObject(clazz); if (impl == null) { try { LOG.info("Using Signature impl: " + clazz); Class<?> implClass = Class.forName(clazz); - impl = (Signature)implClass.newInstance(); + impl = (Signature) implClass.newInstance(); impl.setConf(conf); objectCache.setObject(clazz, impl); } catch (Exception e) { Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/TextProfileSignature.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/TextProfileSignature.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/crawl/TextProfileSignature.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/TextProfileSignature.java Fri Jan 9 06:34:33 2015 @@ -29,28 +29,33 @@ import org.apache.hadoop.io.MD5Hash; import org.apache.nutch.storage.WebPage; /** - * <p>An implementation of a page signature. It calculates an MD5 hash - * of a plain text "profile" of a page. In case there is no text, it - * calculates a hash using the {@link MD5Signature}.</p> - * <p>The algorithm to calculate a page "profile" takes the plain text version of - * a page and performs the following steps: + * <p> + * An implementation of a page signature. It calculates an MD5 hash of a plain + * text "profile" of a page. In case there is no text, it calculates a hash + * using the {@link MD5Signature}. + * </p> + * <p> + * The algorithm to calculate a page "profile" takes the plain text version of a + * page and performs the following steps: * <ul> * <li>remove all characters except letters and digits, and bring all characters * to lower case,</li> * <li>split the text into tokens (all consecutive non-whitespace characters),</li> - * <li>discard tokens equal or shorter than MIN_TOKEN_LEN (default 2 characters),</li> + * <li>discard tokens equal or shorter than MIN_TOKEN_LEN (default 2 + * characters),</li> * <li>sort the list of tokens by decreasing frequency,</li> - * <li>round down the counts of tokens to the nearest multiple of QUANT - * (<code>QUANT = QUANT_RATE * maxFreq</code>, where <code>QUANT_RATE</code> is 0.01f - * by default, and <code>maxFreq</code> is the maximum token frequency). If - * <code>maxFreq</code> is higher than 1, then QUANT is always higher than 2 (which - * means that tokens with frequency 1 are always discarded).</li> - * <li>tokens, which frequency after quantization falls below QUANT, are discarded.</li> - * <li>create a list of tokens and their quantized frequency, separated by spaces, - * in the order of decreasing frequency.</li> + * <li>round down the counts of tokens to the nearest multiple of QUANT ( + * <code>QUANT = QUANT_RATE * maxFreq</code>, where <code>QUANT_RATE</code> is + * 0.01f by default, and <code>maxFreq</code> is the maximum token frequency). + * If <code>maxFreq</code> is higher than 1, then QUANT is always higher than 2 + * (which means that tokens with frequency 1 are always discarded).</li> + * <li>tokens, which frequency after quantization falls below QUANT, are + * discarded.</li> + * <li>create a list of tokens and their quantized frequency, separated by + * spaces, in the order of decreasing frequency.</li> * </ul> * This list is then submitted to an MD5 hash calculation. - * + * * @author Andrzej Bialecki <[email protected]> */ public class TextProfileSignature extends Signature { @@ -65,12 +70,16 @@ public class TextProfileSignature extend @Override public byte[] calculate(WebPage page) { - int MIN_TOKEN_LEN = getConf().getInt("db.signature.text_profile.min_token_len", 2); - float QUANT_RATE = getConf().getFloat("db.signature.text_profile.quant_rate", 0.01f); + int MIN_TOKEN_LEN = getConf().getInt( + "db.signature.text_profile.min_token_len", 2); + float QUANT_RATE = getConf().getFloat( + "db.signature.text_profile.quant_rate", 0.01f); HashMap<String, Token> tokens = new HashMap<String, Token>(); String text = null; - if (page.getText() != null) text = page.getText().toString(); - if (text == null || text.length() == 0) return fallback.calculate(page); + if (page.getText() != null) + text = page.getText().toString(); + if (text == null || text.length() == 0) + return fallback.calculate(page); StringBuffer curToken = new StringBuffer(); int maxFreq = 0; for (int i = 0; i < text.length(); i++) { @@ -88,7 +97,8 @@ public class TextProfileSignature extend tokens.put(s, tok); } tok.cnt++; - if (tok.cnt > maxFreq) maxFreq = tok.cnt; + if (tok.cnt > maxFreq) + maxFreq = tok.cnt; } curToken.setLength(0); } @@ -104,17 +114,20 @@ public class TextProfileSignature extend tokens.put(s, tok); } tok.cnt++; - if (tok.cnt > maxFreq) maxFreq = tok.cnt; + if (tok.cnt > maxFreq) + maxFreq = tok.cnt; } Iterator<Token> it = tokens.values().iterator(); ArrayList<Token> profile = new ArrayList<Token>(); // calculate the QUANT value int QUANT = Math.round(maxFreq * QUANT_RATE); if (QUANT < 2) { - if (maxFreq > 1) QUANT = 2; - else QUANT = 1; + if (maxFreq > 1) + QUANT = 2; + else + QUANT = 1; } - while(it.hasNext()) { + while (it.hasNext()) { Token t = it.next(); // round down to the nearest QUANT t.cnt = (t.cnt / QUANT) * QUANT; @@ -129,7 +142,8 @@ public class TextProfileSignature extend it = profile.iterator(); while (it.hasNext()) { Token t = it.next(); - if (newText.length() > 0) newText.append("\n"); + if (newText.length() > 0) + newText.append("\n"); newText.append(t.toString()); } return MD5Hash.digest(newText.toString()).getDigest(); Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/URLPartitioner.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/URLPartitioner.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/crawl/URLPartitioner.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/URLPartitioner.java Fri Jan 9 06:34:33 2015 @@ -40,14 +40,15 @@ import org.apache.nutch.util.URLUtil; * parameter 'partition.url.mode' which can be 'byHost', 'byDomain' or 'byIP' */ public class URLPartitioner implements Configurable { - private static final Logger LOG = LoggerFactory.getLogger(URLPartitioner.class); + private static final Logger LOG = LoggerFactory + .getLogger(URLPartitioner.class); public static final String PARTITION_MODE_KEY = "partition.url.mode"; public static final String PARTITION_MODE_HOST = "byHost"; public static final String PARTITION_MODE_DOMAIN = "byDomain"; public static final String PARTITION_MODE_IP = "byIP"; - + public static final String PARTITION_URL_SEED = "partition.url.seed"; private Configuration conf; @@ -77,21 +78,22 @@ public class URLPartitioner implements C public int getPartition(String urlString, int numReduceTasks) { if (numReduceTasks == 1) { - //this check can be removed when we use Hadoop with MAPREDUCE-1287 + // this check can be removed when we use Hadoop with MAPREDUCE-1287 return 0; } - + int hashCode; URL url = null; try { - urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_PARTITION); + urlString = normalizers.normalize(urlString, + URLNormalizers.SCOPE_PARTITION); hashCode = urlString.hashCode(); url = new URL(urlString); } catch (MalformedURLException e) { LOG.warn("Malformed URL: '" + urlString + "'"); hashCode = urlString.hashCode(); } - + if (url != null) { if (mode.equals(PARTITION_MODE_HOST)) { hashCode = url.getHost().hashCode(); @@ -106,20 +108,20 @@ public class URLPartitioner implements C } } } - + // make hosts wind up in different partitions on different runs hashCode ^= seed; return (hashCode & Integer.MAX_VALUE) % numReduceTasks; } - - - public static class SelectorEntryPartitioner - extends Partitioner<SelectorEntry, WebPage> implements Configurable { + + public static class SelectorEntryPartitioner extends + Partitioner<SelectorEntry, WebPage> implements Configurable { private URLPartitioner partitioner = new URLPartitioner(); private Configuration conf; - + @Override - public int getPartition(SelectorEntry selectorEntry, WebPage page, int numReduces) { + public int getPartition(SelectorEntry selectorEntry, WebPage page, + int numReduces) { return partitioner.getPartition(selectorEntry.url, numReduces); } @@ -130,23 +132,24 @@ public class URLPartitioner implements C @Override public void setConf(Configuration conf) { - this.conf=conf; + this.conf = conf; partitioner.setConf(conf); } } - - public static class FetchEntryPartitioner - extends Partitioner<IntWritable, FetchEntry> implements Configurable { + + public static class FetchEntryPartitioner extends + Partitioner<IntWritable, FetchEntry> implements Configurable { private URLPartitioner partitioner = new URLPartitioner(); private Configuration conf; - + @Override - public int getPartition(IntWritable intWritable, FetchEntry fetchEntry, int numReduces) { + public int getPartition(IntWritable intWritable, FetchEntry fetchEntry, + int numReduces) { String key = fetchEntry.getKey(); String url = TableUtil.unreverseUrl(key); return partitioner.getPartition(url, numReduces); } - + @Override public Configuration getConf() { return conf; @@ -154,9 +157,9 @@ public class URLPartitioner implements C @Override public void setConf(Configuration conf) { - this.conf=conf; + this.conf = conf; partitioner.setConf(conf); } } - + } Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/UrlWithScore.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/UrlWithScore.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/crawl/UrlWithScore.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/UrlWithScore.java Fri Jan 9 06:34:33 2015 @@ -90,7 +90,7 @@ public final class UrlWithScore implemen public void setUrl(Text url) { this.url = url; } - + public void setUrl(String url) { this.url.set(url); } @@ -102,7 +102,7 @@ public final class UrlWithScore implemen public void setScore(FloatWritable score) { this.score = score; } - + public void setScore(float score) { this.score.set(score); } @@ -111,13 +111,12 @@ public final class UrlWithScore implemen public int compareTo(UrlWithScore other) { return comp.compare(this, other); } - + @Override public String toString() { return "UrlWithScore [url=" + url + ", score=" + score + "]"; } - /** * A partitioner by {url}. */ @@ -144,7 +143,7 @@ public final class UrlWithScore implemen if (cmp != 0) { return cmp; } - //reverse order + // reverse order return -o1.getScore().compareTo(o2.getScore()); } @@ -159,9 +158,9 @@ public final class UrlWithScore implemen if (cmp != 0) { return cmp; } - //reverse order - return -floatComp.compare(b1, s1 + deptLen1, l1 - deptLen1, - b2, s2 + deptLen2, l2 - deptLen2); + // reverse order + return -floatComp.compare(b1, s1 + deptLen1, l1 - deptLen1, b2, s2 + + deptLen2, l2 - deptLen2); } catch (IOException e) { throw new IllegalArgumentException(e); } Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/WebTableReader.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/WebTableReader.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/crawl/WebTableReader.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/WebTableReader.java Fri Jan 9 06:34:33 2015 @@ -59,7 +59,8 @@ import java.util.regex.Pattern; public class WebTableReader extends NutchTool implements Tool { - public static final Logger LOG = LoggerFactory.getLogger(WebTableReader.class); + public static final Logger LOG = LoggerFactory + .getLogger(WebTableReader.class); public static class WebTableStatMapper extends GoraMapper<String, WebPage, Text, LongWritable> { @@ -209,12 +210,12 @@ public class WebTableReader extends Nutc if (LOG.isInfoEnabled()) { LOG.info("WebTable statistics start"); } - + run(ToolUtil.toArgMap(Nutch.ARG_SORT, sort)); - + if (LOG.isInfoEnabled()) { LOG.info("Statistics for WebTable: "); - for (Entry<String,Object> e : results.entrySet()) { + for (Entry<String, Object> e : results.entrySet()) { LOG.info(e.getKey() + ":\t" + e.getValue()); } LOG.info("WebTable statistics: done"); @@ -223,9 +224,10 @@ public class WebTableReader extends Nutc /** Prints out the entry to the standard out **/ private void read(String key, boolean dumpContent, boolean dumpHeaders, - boolean dumpLinks, boolean dumpText) throws ClassNotFoundException, IOException, Exception { - DataStore<String, WebPage> datastore = StorageUtils.createWebStore(getConf(), - String.class, WebPage.class); + boolean dumpLinks, boolean dumpText) throws ClassNotFoundException, + IOException, Exception { + DataStore<String, WebPage> datastore = StorageUtils.createWebStore( + getConf(), String.class, WebPage.class); Query<String, WebPage> query = datastore.newQuery(); String reversedUrl = TableUtil.reverseUrl(key); @@ -245,7 +247,7 @@ public class WebTableReader extends Nutc String url = TableUtil.unreverseUrl(skey); System.out.println(getPageRepresentation(url, page, dumpContent, dumpHeaders, dumpLinks, dumpText)); - }catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); } } @@ -280,9 +282,10 @@ public class WebTableReader extends Nutc // checks whether the Key passes the regex String url = TableUtil.unreverseUrl(key.toString()); if (regex.matcher(url).matches()) { - context.write(new Text(url), - new Text(getPageRepresentation(key, value, dumpContent, dumpHeaders, - dumpLinks, dumpText))); + context.write( + new Text(url), + new Text(getPageRepresentation(key, value, dumpContent, + dumpHeaders, dumpLinks, dumpText))); } } @@ -292,8 +295,10 @@ public class WebTableReader extends Nutc throws IOException, InterruptedException { regex = Pattern.compile(context.getConfiguration().get(regexParamName, ".+")); - dumpContent = context.getConfiguration().getBoolean(contentParamName, false); - dumpHeaders = context.getConfiguration().getBoolean(headersParamName, false); + dumpContent = context.getConfiguration().getBoolean(contentParamName, + false); + dumpHeaders = context.getConfiguration().getBoolean(headersParamName, + false); dumpLinks = context.getConfiguration().getBoolean(linksParamName, false); dumpText = context.getConfiguration().getBoolean(textParamName, false); } @@ -317,10 +322,10 @@ public class WebTableReader extends Nutc cfg.setBoolean(WebTableRegexMapper.linksParamName, links); cfg.setBoolean(WebTableRegexMapper.textParamName, text); - DataStore<String, WebPage> store = StorageUtils.createWebStore(job - .getConfiguration(), String.class, WebPage.class); + DataStore<String, WebPage> store = StorageUtils.createWebStore( + job.getConfiguration(), String.class, WebPage.class); Query<String, WebPage> query = store.newQuery(); - //remove the __g__dirty field since it is not stored + // remove the __g__dirty field since it is not stored String[] fields = Arrays.copyOfRange(WebPage._ALL_FIELDS, 1, WebPage._ALL_FIELDS.length); query.setFields(fields); @@ -342,30 +347,37 @@ public class WebTableReader extends Nutc } private static String getPageRepresentation(String key, WebPage page, - boolean dumpContent, boolean dumpHeaders, boolean dumpLinks, boolean dumpText) { + boolean dumpContent, boolean dumpHeaders, boolean dumpLinks, + boolean dumpText) { StringBuffer sb = new StringBuffer(); sb.append("key:\t" + key).append("\n"); sb.append("baseUrl:\t" + page.getBaseUrl()).append("\n"); - sb.append("status:\t").append(page.getStatus()).append(" (").append( - CrawlStatus.getName(page.getStatus().byteValue())).append(")\n"); + sb.append("status:\t").append(page.getStatus()).append(" (") + .append(CrawlStatus.getName(page.getStatus().byteValue())) + .append(")\n"); sb.append("fetchTime:\t" + page.getFetchTime()).append("\n"); sb.append("prevFetchTime:\t" + page.getPrevFetchTime()).append("\n"); - sb.append("fetchInterval:\t" + page.getFetchInterval()).append("\n"); - sb.append("retriesSinceFetch:\t" + page.getRetriesSinceFetch()).append("\n"); + sb.append("fetchInterval:\t" + page.getFetchInterval()).append("\n"); + sb.append("retriesSinceFetch:\t" + page.getRetriesSinceFetch()) + .append("\n"); sb.append("modifiedTime:\t" + page.getModifiedTime()).append("\n"); sb.append("prevModifiedTime:\t" + page.getPrevModifiedTime()).append("\n"); - sb.append("protocolStatus:\t" + - ProtocolStatusUtils.toString(page.getProtocolStatus())).append("\n"); + sb.append( + "protocolStatus:\t" + + ProtocolStatusUtils.toString(page.getProtocolStatus())).append( + "\n"); ByteBuffer prevSig = page.getPrevSignature(); - if (prevSig != null) { - sb.append("prevSignature:\t" + StringUtil.toHexString(prevSig)).append("\n"); + if (prevSig != null) { + sb.append("prevSignature:\t" + StringUtil.toHexString(prevSig)).append( + "\n"); } ByteBuffer sig = page.getSignature(); if (sig != null) { sb.append("signature:\t" + StringUtil.toHexString(sig)).append("\n"); } - sb.append("parseStatus:\t" + - ParseStatusUtils.toString(page.getParseStatus())).append("\n"); + sb.append( + "parseStatus:\t" + ParseStatusUtils.toString(page.getParseStatus())) + .append("\n"); sb.append("title:\t" + page.getTitle()).append("\n"); sb.append("score:\t" + page.getScore()).append("\n"); @@ -439,22 +451,29 @@ public class WebTableReader extends Nutc System.exit(res); } - private static enum Op {READ, STAT, DUMP}; + private static enum Op { + READ, STAT, DUMP + }; public int run(String[] args) throws Exception { if (args.length < 1) { System.err .println("Usage: WebTableReader (-stats | -url [url] | -dump <out_dir> [-regex regex]) \n \t \t [-crawlId <id>] [-content] [-headers] [-links] [-text]"); - System.err.println(" -crawlId <id> - the id to prefix the schemas to operate on, \n \t \t (default: storage.crawl.id)"); - System.err.println(" -stats [-sort] - print overall statistics to System.out"); + System.err + .println(" -crawlId <id> - the id to prefix the schemas to operate on, \n \t \t (default: storage.crawl.id)"); + System.err + .println(" -stats [-sort] - print overall statistics to System.out"); System.err.println(" [-sort] - list status sorted by host"); - System.err.println(" -url <url> - print information on <url> to System.out"); - System.err.println(" -dump <out_dir> [-regex regex] - dump the webtable to a text file in \n \t \t <out_dir>"); + System.err + .println(" -url <url> - print information on <url> to System.out"); + System.err + .println(" -dump <out_dir> [-regex regex] - dump the webtable to a text file in \n \t \t <out_dir>"); System.err.println(" -content - dump also raw content"); System.err.println(" -headers - dump protocol headers"); System.err.println(" -links - dump links"); System.err.println(" -text - dump extracted text"); - System.err.println(" [-regex] - filter on the URL of the webtable entry"); + System.err + .println(" [-regex] - filter on the URL of the webtable entry"); return -1; } String param = null; @@ -470,8 +489,8 @@ public class WebTableReader extends Nutc if (args[i].equals("-url")) { param = args[++i]; op = Op.READ; - //read(param); - //return 0; + // read(param); + // return 0; } else if (args[i].equals("-stats")) { op = Op.STAT; } else if (args[i].equals("-sort")) { @@ -516,30 +535,32 @@ public class WebTableReader extends Nutc // for now handles only -stat @Override - public Map<String,Object> run(Map<String,Object> args) throws Exception { + public Map<String, Object> run(Map<String, Object> args) throws Exception { Path tmpFolder = new Path(getConf().get("mapred.temp.dir", ".") + "stat_tmp" + System.currentTimeMillis()); numJobs = 1; currentJob = new NutchJob(getConf(), "db_stats"); - currentJob.getConfiguration().setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); - - Boolean sort = (Boolean)args.get(Nutch.ARG_SORT); - if (sort == null) sort = Boolean.FALSE; + currentJob.getConfiguration().setBoolean( + "mapreduce.fileoutputcommitter.marksuccessfuljobs", false); + + Boolean sort = (Boolean) args.get(Nutch.ARG_SORT); + if (sort == null) + sort = Boolean.FALSE; currentJob.getConfiguration().setBoolean("db.reader.stats.sort", sort); - DataStore<String, WebPage> store = StorageUtils.createWebStore(currentJob - .getConfiguration(), String.class, WebPage.class); + DataStore<String, WebPage> store = StorageUtils.createWebStore( + currentJob.getConfiguration(), String.class, WebPage.class); Query<String, WebPage> query = store.newQuery(); - //remove the __g__dirty field since it is not stored + // remove the __g__dirty field since it is not stored String[] fields = Arrays.copyOfRange(WebPage._ALL_FIELDS, 1, - WebPage._ALL_FIELDS.length); + WebPage._ALL_FIELDS.length); query.setFields(fields); - GoraMapper.initMapperJob(currentJob, query, store, Text.class, LongWritable.class, - WebTableStatMapper.class, null, true); + GoraMapper.initMapperJob(currentJob, query, store, Text.class, + LongWritable.class, WebTableStatMapper.class, null, true); currentJob.setCombinerClass(WebTableStatCombiner.class); currentJob.setReducerClass(WebTableStatReducer.class); @@ -596,7 +617,8 @@ public class WebTableReader extends Nutc } LongWritable totalCnt = stats.get("T"); - if (totalCnt==null)totalCnt=new LongWritable(0); + if (totalCnt == null) + totalCnt = new LongWritable(0); stats.remove("T"); results.put("TOTAL urls", totalCnt.get()); for (Map.Entry<String, LongWritable> entry : stats.entrySet()) { @@ -615,14 +637,15 @@ public class WebTableReader extends Nutc if (st.length > 2) results.put(st[2], val.get()); else - results.put(st[0] + " " + code + " (" - + CrawlStatus.getName((byte) code) + ")", val.get()); + results.put( + st[0] + " " + code + " (" + CrawlStatus.getName((byte) code) + + ")", val.get()); } else results.put(k, val.get()); } // removing the tmp folder fileSystem.delete(tmpFolder, true); - + return results; } } Modified: nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetchEntry.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetchEntry.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetchEntry.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetchEntry.java Fri Jan 9 06:34:33 2015 @@ -66,6 +66,5 @@ public class FetchEntry extends Configur public String toString() { return "FetchEntry [key=" + key + ", page=" + page + "]"; } - - + } Modified: nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetcherJob.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetcherJob.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetcherJob.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetcherJob.java Fri Jan 9 06:34:33 2015 @@ -53,7 +53,7 @@ import org.apache.gora.mapreduce.GoraMap /** * Multi-threaded fetcher. - * + * */ public class FetcherJob extends NutchTool implements Tool { @@ -80,8 +80,8 @@ public class FetcherJob extends NutchToo * Mapper class for Fetcher. * </p> * <p> - * This class reads the random integer written by {@link GeneratorJob} as its key - * while outputting the actual key and value arguments through a + * This class reads the random integer written by {@link GeneratorJob} as its + * key while outputting the actual key and value arguments through a * {@link FetchEntry} instance. * </p> * <p> @@ -92,8 +92,8 @@ public class FetcherJob extends NutchToo * from other hosts as well. * </p> */ - public static class FetcherMapper - extends GoraMapper<String, WebPage, IntWritable, FetchEntry> { + public static class FetcherMapper extends + GoraMapper<String, WebPage, IntWritable, FetchEntry> { private boolean shouldContinue; @@ -105,7 +105,8 @@ public class FetcherJob extends NutchToo protected void setup(Context context) { Configuration conf = context.getConfiguration(); shouldContinue = conf.getBoolean(RESUME_KEY, false); - batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID, Nutch.ALL_BATCH_ID_STR)); + batchId = new Utf8( + conf.get(GeneratorJob.BATCH_ID, Nutch.ALL_BATCH_ID_STR)); } @Override @@ -120,12 +121,13 @@ public class FetcherJob extends NutchToo } if (shouldContinue && Mark.FETCH_MARK.checkMark(page) != null) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; already fetched"); + LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + + "; already fetched"); } return; } - context.write(new IntWritable(random.nextInt(65536)), new FetchEntry(context - .getConfiguration(), key, page)); + context.write(new IntWritable(random.nextInt(65536)), new FetchEntry( + context.getConfiguration(), key, page)); } } @@ -145,20 +147,21 @@ public class FetcherJob extends NutchToo ParserJob parserJob = new ParserJob(); fields.addAll(parserJob.getFields(job)); } - ProtocolFactory protocolFactory = new ProtocolFactory(job.getConfiguration()); + ProtocolFactory protocolFactory = new ProtocolFactory( + job.getConfiguration()); fields.addAll(protocolFactory.getFields()); return fields; } @Override - public Map<String,Object> run(Map<String,Object> args) throws Exception { + public Map<String, Object> run(Map<String, Object> args) throws Exception { checkConfiguration(); - String batchId = (String)args.get(Nutch.ARG_BATCH); - Integer threads = (Integer)args.get(Nutch.ARG_THREADS); - Boolean shouldResume = (Boolean)args.get(Nutch.ARG_RESUME); - Integer numTasks = (Integer)args.get(Nutch.ARG_NUMTASKS); - + String batchId = (String) args.get(Nutch.ARG_BATCH); + Integer threads = (Integer) args.get(Nutch.ARG_THREADS); + Boolean shouldResume = (Boolean) args.get(Nutch.ARG_RESUME); + Integer numTasks = (Integer) args.get(Nutch.ARG_NUMTASKS); + if (threads != null && threads > 0) { getConf().setInt(THREADS_KEY, threads); } @@ -169,7 +172,7 @@ public class FetcherJob extends NutchToo if (shouldResume != null) { getConf().setBoolean(RESUME_KEY, shouldResume); } - + LOG.info("FetcherJob: threads: " + getConf().getInt(THREADS_KEY, 10)); LOG.info("FetcherJob: parsing: " + getConf().getBoolean(PARSE_KEY, false)); LOG.info("FetcherJob: resuming: " + getConf().getBoolean(RESUME_KEY, false)); @@ -182,13 +185,14 @@ public class FetcherJob extends NutchToo timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000); getConf().setLong("fetcher.timelimit", timelimit); } - LOG.info("FetcherJob : timelimit set for : " + getConf().getLong("fetcher.timelimit", -1)); + LOG.info("FetcherJob : timelimit set for : " + + getConf().getLong("fetcher.timelimit", -1)); numJobs = 1; currentJob = new NutchJob(getConf(), "fetch"); - + // for politeness, don't permit parallel execution of a single task currentJob.setReduceSpeculativeExecution(false); - + Collection<WebPage.Field> fields = getFields(currentJob); MapFieldValueFilter<String, WebPage> batchIdFilter = getBatchIdFilter(batchId); StorageUtils.initMapperJob(currentJob, fields, IntWritable.class, @@ -196,8 +200,8 @@ public class FetcherJob extends NutchToo batchIdFilter, false); StorageUtils.initReducerJob(currentJob, FetcherReducer.class); if (numTasks == null || numTasks < 1) { - currentJob.setNumReduceTasks(currentJob.getConfiguration().getInt("mapred.map.tasks", - currentJob.getNumReduceTasks())); + currentJob.setNumReduceTasks(currentJob.getConfiguration().getInt( + "mapred.map.tasks", currentJob.getNumReduceTasks())); } else { currentJob.setNumReduceTasks(numTasks); } @@ -219,19 +223,24 @@ public class FetcherJob extends NutchToo return filter; } - /** + /** * Run fetcher. - * @param batchId batchId (obtained from Generator) or null to fetch all generated fetchlists - * @param threads number of threads per map task + * + * @param batchId + * batchId (obtained from Generator) or null to fetch all generated + * fetchlists + * @param threads + * number of threads per map task * @param shouldResume - * @param numTasks number of fetching tasks (reducers). If set to < 1 then use the default, - * which is mapred.map.tasks. + * @param numTasks + * number of fetching tasks (reducers). If set to < 1 then use the + * default, which is mapred.map.tasks. * @return 0 on success * @throws Exception */ - public int fetch(String batchId, int threads, boolean shouldResume, int numTasks) - throws Exception { - + public int fetch(String batchId, int threads, boolean shouldResume, + int numTasks) throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); LOG.info("FetcherJob: starting at " + sdf.format(start)); @@ -242,15 +251,13 @@ public class FetcherJob extends NutchToo LOG.info("FetcherJob: batchId: " + batchId); } - run(ToolUtil.toArgMap( - Nutch.ARG_BATCH, batchId, - Nutch.ARG_THREADS, threads, - Nutch.ARG_RESUME, shouldResume, - Nutch.ARG_NUMTASKS, numTasks)); - + run(ToolUtil.toArgMap(Nutch.ARG_BATCH, batchId, Nutch.ARG_THREADS, threads, + Nutch.ARG_RESUME, shouldResume, Nutch.ARG_NUMTASKS, numTasks)); + long finish = System.currentTimeMillis(); - LOG.info("FetcherJob: finished at " + sdf.format(finish) + ", time elapsed: " + TimingUtil.elapsedTime(start, finish)); - + LOG.info("FetcherJob: finished at " + sdf.format(finish) + + ", time elapsed: " + TimingUtil.elapsedTime(start, finish)); + return 0; } @@ -273,13 +280,13 @@ public class FetcherJob extends NutchToo boolean shouldResume = false; String batchId; - String usage = "Usage: FetcherJob (<batchId> | -all) [-crawlId <id>] " + - "[-threads N] \n \t \t [-resume] [-numTasks N]\n" + - " <batchId> - crawl identifier returned by Generator, or -all for all \n \t \t generated batchId-s\n" + - " -crawlId <id> - the id to prefix the schemas to operate on, \n \t \t (default: storage.crawl.id)\n" + - " -threads N - number of fetching threads per task\n" + - " -resume - resume interrupted job\n" + - " -numTasks N - if N > 0 then use this many reduce tasks for fetching \n \t \t (default: mapred.map.tasks)"; + String usage = "Usage: FetcherJob (<batchId> | -all) [-crawlId <id>] " + + "[-threads N] \n \t \t [-resume] [-numTasks N]\n" + + " <batchId> - crawl identifier returned by Generator, or -all for all \n \t \t generated batchId-s\n" + + " -crawlId <id> - the id to prefix the schemas to operate on, \n \t \t (default: storage.crawl.id)\n" + + " -threads N - number of fetching threads per task\n" + + " -resume - resume interrupted job\n" + + " -numTasks N - if N > 0 then use this many reduce tasks for fetching \n \t \t (default: mapred.map.tasks)"; if (args.length == 0) { System.err.println(usage); @@ -303,17 +310,19 @@ public class FetcherJob extends NutchToo } else if ("-crawlId".equals(args[i])) { getConf().set(Nutch.CRAWL_ID_KEY, args[++i]); } else { - throw new IllegalArgumentException("arg " +args[i]+ " not recognized"); + throw new IllegalArgumentException("arg " + args[i] + " not recognized"); } } - int fetchcode = fetch(batchId, threads, shouldResume, numTasks); // run the Fetcher + int fetchcode = fetch(batchId, threads, shouldResume, numTasks); // run the + // Fetcher return fetchcode; } public static void main(String[] args) throws Exception { - int res = ToolRunner.run(NutchConfiguration.create(), new FetcherJob(), args); + int res = ToolRunner.run(NutchConfiguration.create(), new FetcherJob(), + args); System.exit(res); } }
