Hi nutch-dev, I am looking at Injector code in trunk and I see that currently we are launching two map-reduce jobs for the same: 1. sort job: get the urls from seeds file, emit CrawlDatum objects. 2. merge job: read CrawlDatum objects from both crawldb and output of sort job. Merge and emit final CrawlDatum objects.
I realized that by using MultipleInputs, we can read CrawlDatum objects from crawldb and urls from seeds file simultaneously and perform inject in a single map-reduce job. PFA Injector2.java which is an implementation of this approach. I did some basic testing on it and so far I have not encountered any problems. I am not sure why Injector was not written this way which is more efficient than the one currently in trunk (maybe MultipleInputs was later added in Hadoop). Wondering if I am wrong somewhere in my understanding. Any comments about this ? Thanks, Tejas
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.nutch.crawl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.MultipleInputs; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.nutch.metadata.Nutch; import org.apache.nutch.net.URLFilters; import org.apache.nutch.net.URLNormalizers; import org.apache.nutch.scoring.ScoringFilterException; import org.apache.nutch.scoring.ScoringFilters; import org.apache.nutch.util.NutchConfiguration; import org.apache.nutch.util.NutchJob; import org.apache.nutch.util.TimingUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Iterator; import java.util.Map; import java.util.Random; import java.util.TreeMap; /** This class takes a flat file of URLs and adds them to the of pages to be * crawled. Useful for bootstrapping the system. * The URL files contain one URL per line, optionally followed by custom metadata * separated by tabs with the metadata key separated from the corresponding value by '='. <br> * Note that some metadata keys are reserved : <br> * - <i>nutch.score</i> : allows to set a custom score for a specific URL <br> * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a specific URL <br> * - <i>nutch.fetchInterval.fixed</i> : allows to set a custom fetch interval for a specific URL that is not changed by AdaptiveFetchSchedule <br> * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t userType=open_source **/ public class Injector2 extends Configured implements Tool { public static final Logger LOG = LoggerFactory.getLogger(Injector2.class); /** metadata key reserved for setting a custom score for a specific URL */ public static String nutchScoreMDName = "nutch.score"; /** metadata key reserved for setting a custom fetchInterval for a specific URL */ public static String nutchFetchIntervalMDName = "nutch.fetchInterval"; /** metadata key reserved for setting a fixed custom fetchInterval for a specific URL */ public static String nutchFixedFetchIntervalMDName = "nutch.fetchInterval.fixed"; /** Normalize and filter injected urls. */ public static class InjectMapper implements Mapper<Text, Writable, Text, CrawlDatum> { public static final String URL_NORMALIZING_SCOPE = "crawldb.url.normalizers.scope"; public static final String TAB_CHARACTER = "\t"; public static final String EQUAL_CHARACTER = "="; private URLNormalizers urlNormalizers; private int interval; private float scoreInjected; private JobConf jobConf; private URLFilters filters; private ScoringFilters scfilters; private long curTime; private boolean url404Purging; private String scope; public void configure(JobConf job) { this.jobConf = job; scope = job.get(URL_NORMALIZING_SCOPE, URLNormalizers.SCOPE_CRAWLDB); urlNormalizers = new URLNormalizers(job, scope); interval = jobConf.getInt("db.fetch.interval.default", 2592000); filters = new URLFilters(jobConf); scfilters = new ScoringFilters(jobConf); scoreInjected = jobConf.getFloat("db.score.injected", 1.0f); curTime = job.getLong("injector.current.time", System.currentTimeMillis()); url404Purging = job.getBoolean(CrawlDb.CRAWLDB_PURGE_404, false); } public void close() {} private String filterNormalize(String url) { if (url != null) { try { url = urlNormalizers.normalize(url, scope); // normalize the url url = filters.filter(url); // filter the url } catch (Exception e) { LOG.warn("Skipping " + url + ":" + e); url = null; } } return url; } private String processMetaData(String url, CrawlDatum datum) { // if tabs : metadata that could be stored must be name=value and separated by TAB_CHARACTER if (url.contains(TAB_CHARACTER)) { String[] splits = url.split(TAB_CHARACTER); url = splits[0]; for (int i = 1; i < splits.length; i++) { // find separation between name and value int indexEquals = splits[i].indexOf(EQUAL_CHARACTER); if (indexEquals == -1) // skip anything without a EQUAL_CHARACTER continue; String metaname = splits[i].substring(0, indexEquals); String metavalue = splits[i].substring(indexEquals + 1); try { if (metaname.equals(nutchScoreMDName)) datum.setScore(Float.parseFloat(metavalue)); else if (metaname.equals(nutchFetchIntervalMDName)) datum.setFetchInterval(Integer.parseInt(metavalue)); else if (metaname.equals(nutchFixedFetchIntervalMDName)) { int fixedInterval = Integer.parseInt(metavalue); if (fixedInterval > -1) { // Set writable using float. Flaot is used by AdaptiveFetchSchedule datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY, new FloatWritable(fixedInterval)); datum.setFetchInterval(fixedInterval); } } else datum.getMetaData().put(new Text(metaname), new Text(metavalue)); } catch (NumberFormatException nfe) { // do nothing } } } return url; } public void map(Text key, Writable value, OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws IOException { // if its a url from the seed list if(value instanceof Text) { String url = key.toString().trim(); if (url.length() == 0 || url.startsWith("#")) // remove empty string or one starting with '#' return; url = filterNormalize(url); if (url == null) { reporter.getCounter("injector", "urls_filtered").increment(1); } else { // if it passes CrawlDatum datum = new CrawlDatum(); datum.setStatus(CrawlDatum.STATUS_INJECTED); datum.setFetchTime(curTime); datum.setScore(scoreInjected); datum.setFetchInterval(interval); url = processMetaData(url, datum); key.set(url); // collect it try { scfilters.injectedScore(key, datum); } catch (ScoringFilterException e) { if (LOG.isWarnEnabled()) { LOG.warn("Cannot filter injected score for url " + url + ", using default (" + e.getMessage() + ")"); } } reporter.getCounter("injector", "urls_injected").increment(1); output.collect(key, datum); } } // if its a crawlDatum from the input crawldb else if(value instanceof CrawlDatum) { CrawlDatum datum = (CrawlDatum) value; // remove 404 urls if (url404Purging && CrawlDatum.STATUS_DB_GONE == datum.getStatus()) return; String url = key.toString(); url = filterNormalize(url); if(url != null) { key.set(url); output.collect(key, datum); } } } } /** Combine multiple new entries for a url. */ public static class InjectReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> { private int interval; private float scoreInjected; private boolean overwrite = false; private boolean update = false; public void configure(JobConf job) { interval = job.getInt("db.fetch.interval.default", 2592000); scoreInjected = job.getFloat("db.score.injected", 1.0f); overwrite = job.getBoolean("db.injector.overwrite", false); update = job.getBoolean("db.injector.update", false); LOG.info("Injector: overwrite: " + overwrite); LOG.info("Injector: update: " + update); } public void close() {} private CrawlDatum old = new CrawlDatum(); private CrawlDatum injected = new CrawlDatum(); public void reduce(Text key, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException { boolean oldSet = false; boolean injectedSet = false; while (values.hasNext()) { CrawlDatum val = values.next(); if (val.getStatus() == CrawlDatum.STATUS_INJECTED) { injected.set(val); injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); injectedSet = true; } else { old.set(val); oldSet = true; } } CrawlDatum res = null; // Old default behaviour if (injectedSet && !oldSet) { res = injected; } else { res = old; } /** * Whether to overwrite, ignore or update existing records * @see https://issues.apache.org/jira/browse/NUTCH-1405 */ // Injected record already exists and update but not overwrite if (injectedSet && oldSet && update && !overwrite) { res = old; old.putAllMetaData(injected); old.setScore(injected.getScore() != scoreInjected ? injected.getScore() : old.getScore()); old.setFetchInterval(injected.getFetchInterval() != interval ? injected.getFetchInterval() : old.getFetchInterval()); } // Injected record already exists and overwrite if (injectedSet && oldSet && overwrite) { res = injected; } output.collect(key, res); } } public Injector2() {} public Injector2(Configuration conf) { setConf(conf); } public void inject(Path crawlDb, Path urlDir) throws IOException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); if (LOG.isInfoEnabled()) { LOG.info("Injector: starting at " + sdf.format(start)); LOG.info("Injector: crawlDb: " + crawlDb); LOG.info("Injector: urlDir: " + urlDir); } if (LOG.isInfoEnabled()) LOG.info("Injector: Converting injected urls to crawl db entries."); Path tempCrawlDb = new Path(crawlDb, "crawldb-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); JobConf injectJob = new NutchJob(getConf()); injectJob.setJobName("inject " + urlDir); // TODO: lock the crawldb.. is it already taken care of ? MultipleInputs.addInputPath(injectJob, new Path(crawlDb, CrawlDb.CURRENT_NAME), SequenceFileInputFormat.class); MultipleInputs.addInputPath(injectJob, urlDir, KeyValueTextInputFormat.class); FileOutputFormat.setOutputPath(injectJob, tempCrawlDb); injectJob.setMapperClass(InjectMapper.class); injectJob.setMapOutputKeyClass(Text.class); injectJob.setMapOutputValueClass(CrawlDatum.class); injectJob.setLong("injector.current.time", System.currentTimeMillis()); injectJob.setReducerClass(InjectReducer.class); injectJob.setOutputFormat(SequenceFileOutputFormat.class); injectJob.setOutputKeyClass(Text.class); injectJob.setOutputValueClass(CrawlDatum.class); RunningJob mapJob = JobClient.runJob(injectJob); CrawlDb.install(injectJob, crawlDb); FileSystem fs = FileSystem.get(getConf()); fs.delete(tempCrawlDb, true); // clean up long urlsInjected = mapJob.getCounters().findCounter("injector", "urls_injected").getValue(); long urlsFiltered = mapJob.getCounters().findCounter("injector", "urls_filtered").getValue(); LOG.info("Injector: total number of urls rejected by filters: " + urlsFiltered); LOG.info("Injector: total number of urls injected after normalization and filtering: " + urlsInjected); long end = System.currentTimeMillis(); LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); } public static void main(String[] args) throws Exception { int res = ToolRunner.run(NutchConfiguration.create(), new Injector2(), args); System.exit(res); } public int run(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: Injector <crawldb> <url_dir>"); return -1; } try { inject(new Path(args[0]), new Path(args[1])); return 0; } catch (Exception e) { LOG.error("Injector: " + StringUtils.stringifyException(e)); return -1; } } }

