Hi nutch-dev,

I am looking at Injector code in trunk and I see that currently we are
launching two map-reduce jobs for the same:
1. sort job: get the urls from seeds file, emit CrawlDatum objects.
2. merge job: read CrawlDatum objects from both crawldb and output of sort
job. Merge and emit final CrawlDatum objects.

I realized that by using MultipleInputs, we can read CrawlDatum objects
from crawldb and urls from seeds file simultaneously and perform inject in
a single map-reduce job. PFA Injector2.java which is an implementation of
this approach. I did some basic testing on it and so far I have not
encountered any problems.

I am not sure why Injector was not written this way which is more efficient
than the one currently in trunk (maybe MultipleInputs was later added in
Hadoop). Wondering if I am wrong somewhere in my understanding. Any
comments about this ?

Thanks,
Tejas
/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.nutch.crawl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.MultipleInputs;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.scoring.ScoringFilterException;
import org.apache.nutch.scoring.ScoringFilters;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.TimingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;

/** This class takes a flat file of URLs and adds them to the of pages to be
 * crawled.  Useful for bootstrapping the system. 
 * The URL files contain one URL per line, optionally followed by custom metadata 
 * separated by tabs with the metadata key separated from the corresponding value by '='. <br>
 * Note that some metadata keys are reserved : <br>
 * - <i>nutch.score</i> : allows to set a custom score for a specific URL <br>
 * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a specific URL <br>
 * - <i>nutch.fetchInterval.fixed</i> : allows to set a custom fetch interval for a specific URL that is not changed by AdaptiveFetchSchedule <br>
 * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t userType=open_source
 **/
public class Injector2 extends Configured implements Tool {
  public static final Logger LOG = LoggerFactory.getLogger(Injector2.class);

  /** metadata key reserved for setting a custom score for a specific URL */
  public static String nutchScoreMDName = "nutch.score";
  /** metadata key reserved for setting a custom fetchInterval for a specific URL */
  public static String nutchFetchIntervalMDName = "nutch.fetchInterval";
  /** metadata key reserved for setting a fixed custom fetchInterval for a specific URL */
  public static String nutchFixedFetchIntervalMDName = "nutch.fetchInterval.fixed";

  /** Normalize and filter injected urls. */
  public static class InjectMapper implements Mapper<Text, Writable, Text, CrawlDatum> {
    public static final String URL_NORMALIZING_SCOPE = "crawldb.url.normalizers.scope";
    public static final String TAB_CHARACTER = "\t";
    public static final String EQUAL_CHARACTER = "=";

    private URLNormalizers urlNormalizers;
    private int interval;
    private float scoreInjected;
    private JobConf jobConf;
    private URLFilters filters;
    private ScoringFilters scfilters;
    private long curTime;
    private boolean url404Purging;
    private String scope;

    public void configure(JobConf job) {
      this.jobConf = job;
      scope = job.get(URL_NORMALIZING_SCOPE, URLNormalizers.SCOPE_CRAWLDB);
      urlNormalizers = new URLNormalizers(job, scope);
      interval = jobConf.getInt("db.fetch.interval.default", 2592000);
      filters = new URLFilters(jobConf);
      scfilters = new ScoringFilters(jobConf);
      scoreInjected = jobConf.getFloat("db.score.injected", 1.0f);
      curTime = job.getLong("injector.current.time", System.currentTimeMillis());
      url404Purging = job.getBoolean(CrawlDb.CRAWLDB_PURGE_404, false);
    }

    public void close() {}

    private String filterNormalize(String url) {
      if (url != null) {
        try {
          url = urlNormalizers.normalize(url, scope); // normalize the url
          url = filters.filter(url);                  // filter the url
        } catch (Exception e) {
          LOG.warn("Skipping " + url + ":" + e);
          url = null;
        }
      }
      return url;
    }

    private String processMetaData(String url, CrawlDatum datum) {
      // if tabs : metadata that could be stored must be name=value and separated by TAB_CHARACTER

      if (url.contains(TAB_CHARACTER)) {
        String[] splits = url.split(TAB_CHARACTER);
        url = splits[0];

        for (int i = 1; i < splits.length; i++) {
          // find separation between name and value
          int indexEquals = splits[i].indexOf(EQUAL_CHARACTER);
          if (indexEquals == -1) // skip anything without a EQUAL_CHARACTER
            continue;

          String metaname = splits[i].substring(0, indexEquals);
          String metavalue = splits[i].substring(indexEquals + 1);

          try {
            if (metaname.equals(nutchScoreMDName))
              datum.setScore(Float.parseFloat(metavalue));
            else if (metaname.equals(nutchFetchIntervalMDName))
              datum.setFetchInterval(Integer.parseInt(metavalue));
            else if (metaname.equals(nutchFixedFetchIntervalMDName)) {
              int fixedInterval = Integer.parseInt(metavalue);
              if (fixedInterval > -1) {
                // Set writable using float. Flaot is used by AdaptiveFetchSchedule
                datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY, new FloatWritable(fixedInterval));
                datum.setFetchInterval(fixedInterval);
              }
            }
            else
              datum.getMetaData().put(new Text(metaname), new Text(metavalue));
          } catch (NumberFormatException nfe) {
            // do nothing
          }
        }
      }
      return url;
    }

    public void map(Text key, Writable value,
                    OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws IOException {
      // if its a url from the seed list
      if(value instanceof Text) {
        String url = key.toString().trim();

        if (url.length() == 0 || url.startsWith("#"))  // remove empty string or one starting with '#'
          return;

        url = filterNormalize(url);

        if (url == null) {
          reporter.getCounter("injector", "urls_filtered").increment(1);
        } else {                                   // if it passes
          CrawlDatum datum = new CrawlDatum();
          datum.setStatus(CrawlDatum.STATUS_INJECTED);
          datum.setFetchTime(curTime);
          datum.setScore(scoreInjected);
          datum.setFetchInterval(interval);

          url = processMetaData(url, datum);
          key.set(url);                           // collect it

          try {
            scfilters.injectedScore(key, datum);
          } catch (ScoringFilterException e) {
            if (LOG.isWarnEnabled()) {
              LOG.warn("Cannot filter injected score for url " + url
                  + ", using default (" + e.getMessage() + ")");
            }
          }
          reporter.getCounter("injector", "urls_injected").increment(1);
          output.collect(key, datum);
        }
      }
      // if its a crawlDatum from the input crawldb
      else if(value instanceof CrawlDatum) {
        CrawlDatum datum = (CrawlDatum) value;

        // remove 404 urls
        if (url404Purging && CrawlDatum.STATUS_DB_GONE == datum.getStatus())
          return;

        String url = key.toString();
        url = filterNormalize(url);
        if(url != null) {
          key.set(url);
          output.collect(key, datum);
        }
      }
    }
  }

  /** Combine multiple new entries for a url. */
  public static class InjectReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
    private int interval;
    private float scoreInjected;
    private boolean overwrite = false;
    private boolean update = false;

    public void configure(JobConf job) {
      interval = job.getInt("db.fetch.interval.default", 2592000);
      scoreInjected = job.getFloat("db.score.injected", 1.0f);
      overwrite = job.getBoolean("db.injector.overwrite", false);
      update = job.getBoolean("db.injector.update", false);
      LOG.info("Injector: overwrite: " + overwrite);
      LOG.info("Injector: update: " + update);
    }

    public void close() {}

    private CrawlDatum old = new CrawlDatum();
    private CrawlDatum injected = new CrawlDatum();

    public void reduce(Text key, Iterator<CrawlDatum> values,
                       OutputCollector<Text, CrawlDatum> output, Reporter reporter)
        throws IOException {
      boolean oldSet = false;
      boolean injectedSet = false;
      while (values.hasNext()) {
        CrawlDatum val = values.next();
        if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
          injected.set(val);
          injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
          injectedSet = true;
        } else {
          old.set(val);
          oldSet = true;
        }
      }

      CrawlDatum res = null;

      // Old default behaviour
      if (injectedSet && !oldSet) {
        res = injected;
      } else {
        res = old;
      }

      /**
       * Whether to overwrite, ignore or update existing records
       * @see https://issues.apache.org/jira/browse/NUTCH-1405
       */
      // Injected record already exists and update but not overwrite
      if (injectedSet && oldSet && update && !overwrite) {
        res = old;
        old.putAllMetaData(injected);
        old.setScore(injected.getScore() != scoreInjected ? injected.getScore() : old.getScore());
        old.setFetchInterval(injected.getFetchInterval() != interval ? injected.getFetchInterval() : old.getFetchInterval());
      }

      // Injected record already exists and overwrite
      if (injectedSet && oldSet && overwrite) {
        res = injected;
      }

      output.collect(key, res);
    }
  }

  public Injector2() {}

  public Injector2(Configuration conf) {
    setConf(conf);
  }

  public void inject(Path crawlDb, Path urlDir) throws IOException {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    long start = System.currentTimeMillis();
    if (LOG.isInfoEnabled()) {
      LOG.info("Injector: starting at " + sdf.format(start));
      LOG.info("Injector: crawlDb: " + crawlDb);
      LOG.info("Injector: urlDir: " + urlDir);
    }

    if (LOG.isInfoEnabled())
      LOG.info("Injector: Converting injected urls to crawl db entries.");

    Path tempCrawlDb = new Path(crawlDb, "crawldb-"
        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

    JobConf injectJob = new NutchJob(getConf());
    injectJob.setJobName("inject " + urlDir);

    // TODO: lock the crawldb.. is it already taken care of ?
    MultipleInputs.addInputPath(injectJob, new Path(crawlDb, CrawlDb.CURRENT_NAME),
        SequenceFileInputFormat.class);
    MultipleInputs.addInputPath(injectJob, urlDir, KeyValueTextInputFormat.class);

    FileOutputFormat.setOutputPath(injectJob, tempCrawlDb);

    injectJob.setMapperClass(InjectMapper.class);
    injectJob.setMapOutputKeyClass(Text.class);
    injectJob.setMapOutputValueClass(CrawlDatum.class);
    injectJob.setLong("injector.current.time", System.currentTimeMillis());

    injectJob.setReducerClass(InjectReducer.class);
    injectJob.setOutputFormat(SequenceFileOutputFormat.class);
    injectJob.setOutputKeyClass(Text.class);
    injectJob.setOutputValueClass(CrawlDatum.class);

    RunningJob mapJob = JobClient.runJob(injectJob);
    CrawlDb.install(injectJob, crawlDb);
    FileSystem fs = FileSystem.get(getConf());
    fs.delete(tempCrawlDb, true);         // clean up

    long urlsInjected = mapJob.getCounters().findCounter("injector", "urls_injected").getValue();
    long urlsFiltered = mapJob.getCounters().findCounter("injector", "urls_filtered").getValue();
    LOG.info("Injector: total number of urls rejected by filters: " + urlsFiltered);
    LOG.info("Injector: total number of urls injected after normalization and filtering: "
        + urlsInjected);
    long end = System.currentTimeMillis();
    LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
  }

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(NutchConfiguration.create(), new Injector2(), args);
    System.exit(res);
  }

  public int run(String[] args) throws Exception {
    if (args.length < 2) {
      System.err.println("Usage: Injector <crawldb> <url_dir>");
      return -1;
    }
    try {
      inject(new Path(args[0]), new Path(args[1]));
      return 0;
    } catch (Exception e) {
      LOG.error("Injector: " + StringUtils.stringifyException(e));
      return -1;
    }
  }
}

Reply via email to