http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDb.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDb.java 
b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDb.java
new file mode 100644
index 0000000..1537cdc
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDb.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.*;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+// Commons Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.*;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.TimingUtil;
+
+/**
+ * This class takes the output of the fetcher and updates the crawldb
+ * accordingly.
+ */
+public class CrawlDb extends NutchTool implements Tool {
+  public static final Logger LOG = LoggerFactory.getLogger(CrawlDb.class);
+
+  public static final String CRAWLDB_ADDITIONS_ALLOWED = 
"db.update.additions.allowed";
+
+  public static final String CRAWLDB_PURGE_404 = "db.update.purge.404";
+
+  public static final String CURRENT_NAME = "current";
+
+  public static final String LOCK_NAME = ".locked";
+
+  public CrawlDb() {
+  }
+
+  public CrawlDb(Configuration conf) {
+    setConf(conf);
+  }
+
+  public void update(Path crawlDb, Path[] segments, boolean normalize,
+      boolean filter) throws IOException {
+    boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED,
+        true);
+    update(crawlDb, segments, normalize, filter, additionsAllowed, false);
+  }
+
+  public void update(Path crawlDb, Path[] segments, boolean normalize,
+      boolean filter, boolean additionsAllowed, boolean force)
+      throws IOException {
+    FileSystem fs = FileSystem.get(getConf());
+    Path lock = new Path(crawlDb, LOCK_NAME);
+    LockUtil.createLockFile(fs, lock, force);
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+
+    JobConf job = CrawlDb.createJob(getConf(), crawlDb);
+    job.setBoolean(CRAWLDB_ADDITIONS_ALLOWED, additionsAllowed);
+    job.setBoolean(CrawlDbFilter.URL_FILTERING, filter);
+    job.setBoolean(CrawlDbFilter.URL_NORMALIZING, normalize);
+
+    boolean url404Purging = job.getBoolean(CRAWLDB_PURGE_404, false);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("CrawlDb update: starting at " + sdf.format(start));
+      LOG.info("CrawlDb update: db: " + crawlDb);
+      LOG.info("CrawlDb update: segments: " + Arrays.asList(segments));
+      LOG.info("CrawlDb update: additions allowed: " + additionsAllowed);
+      LOG.info("CrawlDb update: URL normalizing: " + normalize);
+      LOG.info("CrawlDb update: URL filtering: " + filter);
+      LOG.info("CrawlDb update: 404 purging: " + url404Purging);
+    }
+
+    for (int i = 0; i < segments.length; i++) {
+      Path fetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME);
+      Path parse = new Path(segments[i], CrawlDatum.PARSE_DIR_NAME);
+      if (fs.exists(fetch) && fs.exists(parse)) {
+        FileInputFormat.addInputPath(job, fetch);
+        FileInputFormat.addInputPath(job, parse);
+      } else {
+        LOG.info(" - skipping invalid segment " + segments[i]);
+      }
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("CrawlDb update: Merging segment data into db.");
+    }
+    try {
+      JobClient.runJob(job);
+    } catch (IOException e) {
+      LockUtil.removeLockFile(fs, lock);
+      Path outPath = FileOutputFormat.getOutputPath(job);
+      if (fs.exists(outPath))
+        fs.delete(outPath, true);
+      throw e;
+    }
+
+    CrawlDb.install(job, crawlDb);
+    long end = System.currentTimeMillis();
+    LOG.info("CrawlDb update: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
+  }
+
+  /*
+   * Configure a new CrawlDb in a temp folder at crawlDb/<rand>
+   */
+  public static JobConf createJob(Configuration config, Path crawlDb)
+      throws IOException {
+    Path newCrawlDb = new Path(crawlDb, Integer.toString(new Random()
+        .nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new NutchJob(config);
+    job.setJobName("crawldb " + crawlDb);
+
+    Path current = new Path(crawlDb, CURRENT_NAME);
+    if (FileSystem.get(job).exists(current)) {
+      FileInputFormat.addInputPath(job, current);
+    }
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    job.setMapperClass(CrawlDbFilter.class);
+    job.setReducerClass(CrawlDbReducer.class);
+
+    FileOutputFormat.setOutputPath(job, newCrawlDb);
+    job.setOutputFormat(MapFileOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(CrawlDatum.class);
+
+    // https://issues.apache.org/jira/browse/NUTCH-1110
+    job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+
+    return job;
+  }
+
+  public static void install(JobConf job, Path crawlDb) throws IOException {
+    boolean preserveBackup = job.getBoolean("db.preserve.backup", true);
+
+    Path newCrawlDb = FileOutputFormat.getOutputPath(job);
+    FileSystem fs = new JobClient(job).getFs();
+    Path old = new Path(crawlDb, "old");
+    Path current = new Path(crawlDb, CURRENT_NAME);
+    if (fs.exists(current)) {
+      if (fs.exists(old))
+        fs.delete(old, true);
+      fs.rename(current, old);
+    }
+    fs.mkdirs(crawlDb);
+    fs.rename(newCrawlDb, current);
+    if (!preserveBackup && fs.exists(old))
+      fs.delete(old, true);
+    Path lock = new Path(crawlDb, LOCK_NAME);
+    LockUtil.removeLockFile(fs, lock);
+  }
+
+  public static void install(Job job, Path crawlDb) throws IOException {
+    Configuration conf = job.getConfiguration();
+    boolean preserveBackup = conf.getBoolean("db.preserve.backup", true);
+    FileSystem fs = FileSystem.get(conf);
+    Path old = new Path(crawlDb, "old");
+    Path current = new Path(crawlDb, CURRENT_NAME);
+    Path tempCrawlDb = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+        .getOutputPath(job);
+    FSUtils.replace(fs, old, current, true);
+    FSUtils.replace(fs, current, tempCrawlDb, true);
+    Path lock = new Path(crawlDb, LOCK_NAME);
+    LockUtil.removeLockFile(fs, lock);
+    if (!preserveBackup && fs.exists(old)) {
+      fs.delete(old, true);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new CrawlDb(), args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 1) {
+      System.err
+          .println("Usage: CrawlDb <crawldb> (-dir <segments> | <seg1> <seg2> 
...) [-force] [-normalize] [-filter] [-noAdditions]");
+      System.err.println("\tcrawldb\tCrawlDb to update");
+      System.err
+          .println("\t-dir segments\tparent directory containing all segments 
to update from");
+      System.err
+          .println("\tseg1 seg2 ...\tlist of segment names to update from");
+      System.err
+          .println("\t-force\tforce update even if CrawlDb appears to be 
locked (CAUTION advised)");
+      System.err
+          .println("\t-normalize\tuse URLNormalizer on urls in CrawlDb and 
segment (usually not needed)");
+      System.err
+          .println("\t-filter\tuse URLFilters on urls in CrawlDb and segment");
+      System.err
+          .println("\t-noAdditions\tonly update already existing URLs, don't 
add any newly discovered URLs");
+
+      return -1;
+    }
+    boolean normalize = getConf().getBoolean(CrawlDbFilter.URL_NORMALIZING,
+        false);
+    boolean filter = getConf().getBoolean(CrawlDbFilter.URL_FILTERING, false);
+    boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED,
+        true);
+    boolean force = false;
+    final FileSystem fs = FileSystem.get(getConf());
+    HashSet<Path> dirs = new HashSet<Path>();
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-normalize")) {
+        normalize = true;
+      } else if (args[i].equals("-filter")) {
+        filter = true;
+      } else if (args[i].equals("-force")) {
+        force = true;
+      } else if (args[i].equals("-noAdditions")) {
+        additionsAllowed = false;
+      } else if (args[i].equals("-dir")) {
+        FileStatus[] paths = fs.listStatus(new Path(args[++i]),
+            HadoopFSUtil.getPassDirectoriesFilter(fs));
+        dirs.addAll(Arrays.asList(HadoopFSUtil.getPaths(paths)));
+      } else {
+        dirs.add(new Path(args[i]));
+      }
+    }
+    try {
+      update(new Path(args[0]), dirs.toArray(new Path[dirs.size()]), normalize,
+          filter, additionsAllowed, force);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("CrawlDb update: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+
+  /*
+   * Used for Nutch REST service
+   */
+  @Override
+  public Map<String, Object> run(Map<String, Object> args, String crawlId) 
throws Exception {
+
+    Map<String, Object> results = new HashMap<String, Object>();
+
+    boolean normalize = getConf().getBoolean(CrawlDbFilter.URL_NORMALIZING,
+        false);
+    boolean filter = getConf().getBoolean(CrawlDbFilter.URL_FILTERING, false);
+    boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED,
+        true);
+    boolean force = false;
+    HashSet<Path> dirs = new HashSet<Path>();
+
+    if (args.containsKey("normalize")) {
+      normalize = true;
+    } 
+    if (args.containsKey("filter")) {
+      filter = true;
+    } 
+    if (args.containsKey("force")) {
+      force = true;
+    } 
+    if (args.containsKey("noAdditions")) {
+      additionsAllowed = false;
+    }
+
+    Path crawlDb;
+    if(args.containsKey(Nutch.ARG_CRAWLDB)) {
+      Object crawldbPath = args.get(Nutch.ARG_CRAWLDB);
+      if(crawldbPath instanceof Path) {
+        crawlDb = (Path) crawldbPath;
+      }
+      else {
+        crawlDb = new Path(crawldbPath.toString());
+      }
+    }
+    else {
+      crawlDb = new Path(crawlId+"/crawldb");
+    }
+
+    Path segmentsDir;
+    final FileSystem fs = FileSystem.get(getConf());
+    if(args.containsKey(Nutch.ARG_SEGMENTDIR)) {
+      Object segDir = args.get(Nutch.ARG_SEGMENTDIR);
+      if(segDir instanceof Path) {
+        segmentsDir = (Path) segDir;
+      }
+      else {
+        segmentsDir = new Path(segDir.toString());
+      }
+      FileStatus[] paths = fs.listStatus(segmentsDir,
+          HadoopFSUtil.getPassDirectoriesFilter(fs));
+      dirs.addAll(Arrays.asList(HadoopFSUtil.getPaths(paths)));
+    }
+
+    else if(args.containsKey(Nutch.ARG_SEGMENT)) {
+      Object segments = args.get(Nutch.ARG_SEGMENT);
+      ArrayList<String> segmentList = new ArrayList<String>();
+      if(segments instanceof ArrayList) {
+        segmentList = (ArrayList<String>)segments;
+      }
+      for(String segment: segmentList) {
+        dirs.add(new Path(segment));
+      }
+    }
+    else {
+      String segment_dir = crawlId+"/segments";
+      File dir = new File(segment_dir);
+      File[] segmentsList = dir.listFiles();  
+      Arrays.sort(segmentsList, new Comparator<File>(){
+        @Override
+        public int compare(File f1, File f2) {
+          if(f1.lastModified()>f2.lastModified())
+            return -1;
+          else
+            return 0;
+        }      
+      });
+      dirs.add(new Path(segmentsList[0].getPath()));
+    }
+    try {
+      update(crawlDb, dirs.toArray(new Path[dirs.size()]), normalize,
+          filter, additionsAllowed, force);
+      results.put(Nutch.VAL_RESULT, Integer.toString(0));
+      return results;
+    } catch (Exception e) {
+      LOG.error("CrawlDb update: " + StringUtils.stringifyException(e));
+      results.put(Nutch.VAL_RESULT, Integer.toString(-1));
+      return results;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbFilter.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbFilter.java 
b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbFilter.java
new file mode 100644
index 0000000..de4c37b
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbFilter.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+
+/**
+ * This class provides a way to separate the URL normalization and filtering
+ * steps from the rest of CrawlDb manipulation code.
+ * 
+ * @author Andrzej Bialecki
+ */
+public class CrawlDbFilter implements
+    Mapper<Text, CrawlDatum, Text, CrawlDatum> {
+  public static final String URL_FILTERING = "crawldb.url.filters";
+
+  public static final String URL_NORMALIZING = "crawldb.url.normalizers";
+
+  public static final String URL_NORMALIZING_SCOPE = 
"crawldb.url.normalizers.scope";
+
+  private boolean urlFiltering;
+
+  private boolean urlNormalizers;
+
+  private boolean url404Purging;
+
+  private URLFilters filters;
+
+  private URLNormalizers normalizers;
+
+  private String scope;
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(CrawlDbFilter.class);
+
+  public void configure(JobConf job) {
+    urlFiltering = job.getBoolean(URL_FILTERING, false);
+    urlNormalizers = job.getBoolean(URL_NORMALIZING, false);
+    url404Purging = job.getBoolean(CrawlDb.CRAWLDB_PURGE_404, false);
+
+    if (urlFiltering) {
+      filters = new URLFilters(job);
+    }
+    if (urlNormalizers) {
+      scope = job.get(URL_NORMALIZING_SCOPE, URLNormalizers.SCOPE_CRAWLDB);
+      normalizers = new URLNormalizers(job, scope);
+    }
+  }
+
+  public void close() {
+  }
+
+  private Text newKey = new Text();
+
+  public void map(Text key, CrawlDatum value,
+      OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+      throws IOException {
+
+    String url = key.toString();
+
+    // https://issues.apache.org/jira/browse/NUTCH-1101 check status first,
+    // cheaper than normalizing or filtering
+    if (url404Purging && CrawlDatum.STATUS_DB_GONE == value.getStatus()) {
+      url = null;
+    }
+    if (url != null && urlNormalizers) {
+      try {
+        url = normalizers.normalize(url, scope); // normalize the url
+      } catch (Exception e) {
+        LOG.warn("Skipping " + url + ":" + e);
+        url = null;
+      }
+    }
+    if (url != null && urlFiltering) {
+      try {
+        url = filters.filter(url); // filter the url
+      } catch (Exception e) {
+        LOG.warn("Skipping " + url + ":" + e);
+        url = null;
+      }
+    }
+    if (url != null) { // if it passes
+      newKey.set(url); // collect it
+      output.collect(newKey, value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbMerger.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbMerger.java 
b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbMerger.java
new file mode 100644
index 0000000..cd775d8
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbMerger.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.Map.Entry;
+
+// Commons Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.conf.*;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+
+/**
+ * This tool merges several CrawlDb-s into one, optionally filtering URLs
+ * through the current URLFilters, to skip prohibited pages.
+ * 
+ * <p>
+ * It's possible to use this tool just for filtering - in that case only one
+ * CrawlDb should be specified in arguments.
+ * </p>
+ * <p>
+ * If more than one CrawlDb contains information about the same URL, only the
+ * most recent version is retained, as determined by the value of
+ * {@link org.apache.nutch.crawl.CrawlDatum#getFetchTime()}. However, all
+ * metadata information from all versions is accumulated, with newer values
+ * taking precedence over older values.
+ * 
+ * @author Andrzej Bialecki
+ */
+public class CrawlDbMerger extends Configured implements Tool {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(CrawlDbMerger.class);
+
+  public static class Merger extends MapReduceBase implements
+      Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+    private org.apache.hadoop.io.MapWritable meta;
+    private CrawlDatum res = new CrawlDatum();
+    private FetchSchedule schedule;
+
+    public void close() throws IOException {
+    }
+
+    public void configure(JobConf conf) {
+      schedule = FetchScheduleFactory.getFetchSchedule(conf);
+    }
+
+    public void reduce(Text key, Iterator<CrawlDatum> values,
+        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+        throws IOException {
+      long resTime = 0L;
+      boolean resSet = false;
+      meta = new org.apache.hadoop.io.MapWritable();
+      while (values.hasNext()) {
+        CrawlDatum val = values.next();
+        if (!resSet) {
+          res.set(val);
+          resSet = true;
+          resTime = schedule.calculateLastFetchTime(res);
+          for (Entry<Writable, Writable> e : res.getMetaData().entrySet()) {
+            meta.put(e.getKey(), e.getValue());
+          }
+          continue;
+        }
+        // compute last fetch time, and pick the latest
+        long valTime = schedule.calculateLastFetchTime(val);
+        if (valTime > resTime) {
+          // collect all metadata, newer values override older values
+          for (Entry<Writable, Writable> e : val.getMetaData().entrySet()) {
+            meta.put(e.getKey(), e.getValue());
+          }
+          res.set(val);
+          resTime = valTime;
+        } else {
+          // insert older metadata before newer
+          for (Entry<Writable, Writable> e : meta.entrySet()) {
+            val.getMetaData().put(e.getKey(), e.getValue());
+          }
+          meta = val.getMetaData();
+        }
+      }
+      res.setMetaData(meta);
+      output.collect(key, res);
+    }
+  }
+
+  public CrawlDbMerger() {
+
+  }
+
+  public CrawlDbMerger(Configuration conf) {
+    setConf(conf);
+  }
+
+  public void merge(Path output, Path[] dbs, boolean normalize, boolean filter)
+      throws Exception {
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("CrawlDb merge: starting at " + sdf.format(start));
+
+    JobConf job = createMergeJob(getConf(), output, normalize, filter);
+    for (int i = 0; i < dbs.length; i++) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Adding " + dbs[i]);
+      }
+      FileInputFormat.addInputPath(job, new Path(dbs[i], 
CrawlDb.CURRENT_NAME));
+    }
+    JobClient.runJob(job);
+    FileSystem fs = FileSystem.get(getConf());
+    if (fs.exists(output))
+      fs.delete(output, true);
+    fs.mkdirs(output);
+    fs.rename(FileOutputFormat.getOutputPath(job), new Path(output,
+        CrawlDb.CURRENT_NAME));
+    long end = System.currentTimeMillis();
+    LOG.info("CrawlDb merge: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static JobConf createMergeJob(Configuration conf, Path output,
+      boolean normalize, boolean filter) {
+    Path newCrawlDb = new Path("crawldb-merge-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new NutchJob(conf);
+    job.setJobName("crawldb merge " + output);
+
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    job.setMapperClass(CrawlDbFilter.class);
+    job.setBoolean(CrawlDbFilter.URL_FILTERING, filter);
+    job.setBoolean(CrawlDbFilter.URL_NORMALIZING, normalize);
+    job.setReducerClass(Merger.class);
+
+    FileOutputFormat.setOutputPath(job, newCrawlDb);
+    job.setOutputFormat(MapFileOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(CrawlDatum.class);
+
+    return job;
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new CrawlDbMerger(),
+        args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err
+          .println("Usage: CrawlDbMerger <output_crawldb> <crawldb1> 
[<crawldb2> <crawldb3> ...] [-normalize] [-filter]");
+      System.err.println("\toutput_crawldb\toutput CrawlDb");
+      System.err
+          .println("\tcrawldb1 ...\tinput CrawlDb-s (single input CrawlDb is 
ok)");
+      System.err
+          .println("\t-normalize\tuse URLNormalizer on urls in the crawldb(s) 
(usually not needed)");
+      System.err.println("\t-filter\tuse URLFilters on urls in the 
crawldb(s)");
+      return -1;
+    }
+    Path output = new Path(args[0]);
+    ArrayList<Path> dbs = new ArrayList<Path>();
+    boolean filter = false;
+    boolean normalize = false;
+    FileSystem fs = FileSystem.get(getConf());
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-filter")) {
+        filter = true;
+        continue;
+      } else if (args[i].equals("-normalize")) {
+        normalize = true;
+        continue;
+      }
+      final Path dbPath = new Path(args[i]);
+      if (fs.exists(dbPath))
+        dbs.add(dbPath);
+    }
+    try {
+      merge(output, dbs.toArray(new Path[dbs.size()]), normalize, filter);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("CrawlDb merge: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReader.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReader.java 
b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReader.java
new file mode 100644
index 0000000..5db5f95
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReader.java
@@ -0,0 +1,887 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.Closeable;
+import java.net.URL;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.TreeMap;
+
+
+// Commons Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.JexlUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.commons.jexl2.Expression;
+import org.apache.commons.jexl2.JexlEngine;
+import org.apache.commons.lang.time.DateUtils;
+
+/**
+ * Read utility for the CrawlDB.
+ * 
+ * @author Andrzej Bialecki
+ * 
+ */
+public class CrawlDbReader extends Configured implements Closeable, Tool {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(CrawlDbReader.class);
+
+  private MapFile.Reader[] readers = null;
+
+  private void openReaders(String crawlDb, JobConf config)
+      throws IOException {
+    if (readers != null)
+      return;
+    FileSystem fs = FileSystem.get(config);
+    readers = MapFileOutputFormat.getReaders(fs, new Path(crawlDb,
+        CrawlDb.CURRENT_NAME), config);
+  }
+
+  private void closeReaders() {
+    if (readers == null)
+      return;
+    for (int i = 0; i < readers.length; i++) {
+      try {
+        readers[i].close();
+      } catch (Exception e) {
+
+      }
+    }
+  }
+
+  public static class CrawlDatumCsvOutputFormat extends
+      FileOutputFormat<Text, CrawlDatum> {
+    protected static class LineRecordWriter implements
+        RecordWriter<Text, CrawlDatum> {
+      private DataOutputStream out;
+
+      public LineRecordWriter(DataOutputStream out) {
+        this.out = out;
+        try {
+          out.writeBytes("Url,Status code,Status name,Fetch Time,Modified 
Time,Retries since fetch,Retry interval seconds,Retry interval 
days,Score,Signature,Metadata\n");
+        } catch (IOException e) {
+        }
+      }
+
+      public synchronized void write(Text key, CrawlDatum value)
+          throws IOException {
+        out.writeByte('"');
+        out.writeBytes(key.toString());
+        out.writeByte('"');
+        out.writeByte(',');
+        out.writeBytes(Integer.toString(value.getStatus()));
+        out.writeByte(',');
+        out.writeByte('"');
+        out.writeBytes(CrawlDatum.getStatusName(value.getStatus()));
+        out.writeByte('"');
+        out.writeByte(',');
+        out.writeBytes(new Date(value.getFetchTime()).toString());
+        out.writeByte(',');
+        out.writeBytes(new Date(value.getModifiedTime()).toString());
+        out.writeByte(',');
+        out.writeBytes(Integer.toString(value.getRetriesSinceFetch()));
+        out.writeByte(',');
+        out.writeBytes(Float.toString(value.getFetchInterval()));
+        out.writeByte(',');
+        out.writeBytes(Float.toString((value.getFetchInterval() / 
FetchSchedule.SECONDS_PER_DAY)));
+        out.writeByte(',');
+        out.writeBytes(Float.toString(value.getScore()));
+        out.writeByte(',');
+        out.writeByte('"');
+        out.writeBytes(value.getSignature() != null ? StringUtil
+            .toHexString(value.getSignature()) : "null");
+        out.writeByte('"');
+        out.writeByte(',');
+        out.writeByte('"');
+        if (value.getMetaData() != null) {
+          for (Entry<Writable, Writable> e : value.getMetaData().entrySet()) {
+            out.writeBytes(e.getKey().toString());
+            out.writeByte(':');
+            out.writeBytes(e.getValue().toString());
+            out.writeBytes("|||");
+          }
+        }
+        out.writeByte('"');
+
+        out.writeByte('\n');
+      }
+
+      public synchronized void close(Reporter reporter) throws IOException {
+        out.close();
+      }
+    }
+
+    public RecordWriter<Text, CrawlDatum> getRecordWriter(FileSystem fs,
+        JobConf job, String name, Progressable progress) throws IOException {
+      Path dir = FileOutputFormat.getOutputPath(job);
+      DataOutputStream fileOut = fs.create(new Path(dir, name), progress);
+      return new LineRecordWriter(fileOut);
+    }
+  }
+
+  public static class CrawlDbStatMapper implements
+      Mapper<Text, CrawlDatum, Text, LongWritable> {
+    LongWritable COUNT_1 = new LongWritable(1);
+    private boolean sort = false;
+
+    public void configure(JobConf job) {
+      sort = job.getBoolean("db.reader.stats.sort", false);
+    }
+
+    public void close() {
+    }
+
+    public void map(Text key, CrawlDatum value,
+        OutputCollector<Text, LongWritable> output, Reporter reporter)
+        throws IOException {
+      output.collect(new Text("T"), COUNT_1);
+      output.collect(new Text("status " + value.getStatus()), COUNT_1);
+      output
+          .collect(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1);
+      output.collect(new Text("sc"), new LongWritable(
+          (long) (value.getScore() * 1000.0)));
+      // fetch time (in minutes to prevent from overflows when summing up)
+      output.collect(new Text("ft"),
+          new LongWritable(value.getFetchTime() / (1000 * 60)));
+      // fetch interval (in seconds)
+      output.collect(new Text("fi"),
+          new LongWritable(value.getFetchInterval()));
+      if (sort) {
+        URL u = new URL(key.toString());
+        String host = u.getHost();
+        output.collect(new Text("status " + value.getStatus() + " " + host),
+            COUNT_1);
+      }
+    }
+  }
+
+  public static class CrawlDbStatCombiner implements
+      Reducer<Text, LongWritable, Text, LongWritable> {
+    LongWritable val = new LongWritable();
+
+    public CrawlDbStatCombiner() {
+    }
+
+    public void configure(JobConf job) {
+    }
+
+    public void close() {
+    }
+
+    private void reduceMinMaxTotal(String keyPrefix, Iterator<LongWritable> 
values,
+        OutputCollector<Text, LongWritable> output, Reporter reporter)
+        throws IOException {
+      long total = 0;
+      long min = Long.MAX_VALUE;
+      long max = Long.MIN_VALUE;
+      while (values.hasNext()) {
+        LongWritable cnt = values.next();
+        if (cnt.get() < min)
+          min = cnt.get();
+        if (cnt.get() > max)
+          max = cnt.get();
+        total += cnt.get();
+      }
+      output.collect(new Text(keyPrefix+"n"), new LongWritable(min));
+      output.collect(new Text(keyPrefix+"x"), new LongWritable(max));
+      output.collect(new Text(keyPrefix+"t"), new LongWritable(total));
+    }
+    
+    public void reduce(Text key, Iterator<LongWritable> values,
+        OutputCollector<Text, LongWritable> output, Reporter reporter)
+        throws IOException {
+      val.set(0L);
+      String k = key.toString();
+      if (k.equals("sc") || k.equals("ft") || k.equals("fi")) {
+        reduceMinMaxTotal(k, values, output, reporter);
+      } else {
+        while (values.hasNext()) {
+          LongWritable cnt = values.next();
+          val.set(val.get() + cnt.get());
+        }
+        output.collect(key, val);
+      }
+    }
+  }
+
+  public static class CrawlDbStatReducer implements
+      Reducer<Text, LongWritable, Text, LongWritable> {
+    public void configure(JobConf job) {
+    }
+
+    public void close() {
+    }
+
+    public void reduce(Text key, Iterator<LongWritable> values,
+        OutputCollector<Text, LongWritable> output, Reporter reporter)
+        throws IOException {
+
+      String k = key.toString();
+      if (k.equals("T")) {
+        // sum all values for this key
+        long sum = 0;
+        while (values.hasNext()) {
+          sum += values.next().get();
+        }
+        // output sum
+        output.collect(key, new LongWritable(sum));
+      } else if (k.startsWith("status") || k.startsWith("retry")) {
+        LongWritable cnt = new LongWritable();
+        while (values.hasNext()) {
+          LongWritable val = values.next();
+          cnt.set(cnt.get() + val.get());
+        }
+        output.collect(key, cnt);
+      } else if (k.equals("scx") || k.equals("ftx") || k.equals("fix")) {
+        LongWritable cnt = new LongWritable(Long.MIN_VALUE);
+        while (values.hasNext()) {
+          LongWritable val = values.next();
+          if (cnt.get() < val.get())
+            cnt.set(val.get());
+        }
+        output.collect(key, cnt);
+      } else if (k.equals("scn") || k.equals("ftn") || k.equals("fin")) {
+        LongWritable cnt = new LongWritable(Long.MAX_VALUE);
+        while (values.hasNext()) {
+          LongWritable val = values.next();
+          if (cnt.get() > val.get())
+            cnt.set(val.get());
+        }
+        output.collect(key, cnt);
+      } else if (k.equals("sct") || k.equals("ftt") || k.equals("fit")) {
+        LongWritable cnt = new LongWritable();
+        while (values.hasNext()) {
+          LongWritable val = values.next();
+          cnt.set(cnt.get() + val.get());
+        }
+        output.collect(key, cnt);
+      }
+    }
+  }
+
+  public static class CrawlDbTopNMapper implements
+      Mapper<Text, CrawlDatum, FloatWritable, Text> {
+    private static final FloatWritable fw = new FloatWritable();
+    private float min = 0.0f;
+
+    public void configure(JobConf job) {
+      min = job.getFloat("db.reader.topn.min", 0.0f);
+    }
+
+    public void close() {
+    }
+
+    public void map(Text key, CrawlDatum value,
+        OutputCollector<FloatWritable, Text> output, Reporter reporter)
+        throws IOException {
+      if (value.getScore() < min)
+        return; // don't collect low-scoring records
+      fw.set(-value.getScore()); // reverse sorting order
+      output.collect(fw, key); // invert mapping: score -> url
+    }
+  }
+
+  public static class CrawlDbTopNReducer implements
+      Reducer<FloatWritable, Text, FloatWritable, Text> {
+    private long topN;
+    private long count = 0L;
+
+    public void reduce(FloatWritable key, Iterator<Text> values,
+        OutputCollector<FloatWritable, Text> output, Reporter reporter)
+        throws IOException {
+      while (values.hasNext() && count < topN) {
+        key.set(-key.get());
+        output.collect(key, values.next());
+        count++;
+      }
+    }
+
+    public void configure(JobConf job) {
+      topN = job.getLong("db.reader.topn", 100) / job.getNumReduceTasks();
+    }
+
+    public void close() {
+    }
+  }
+
+  public void close() {
+    closeReaders();
+  }
+
+  private TreeMap<String, LongWritable> processStatJobHelper(String crawlDb, 
Configuration config, boolean sort) throws IOException{
+         Path tmpFolder = new Path(crawlDb, "stat_tmp" + 
System.currentTimeMillis());
+
+         JobConf job = new NutchJob(config);
+         job.setJobName("stats " + crawlDb);
+         job.setBoolean("db.reader.stats.sort", sort);
+
+         FileInputFormat.addInputPath(job, new Path(crawlDb, 
CrawlDb.CURRENT_NAME));
+         job.setInputFormat(SequenceFileInputFormat.class);
+
+         job.setMapperClass(CrawlDbStatMapper.class);
+         job.setCombinerClass(CrawlDbStatCombiner.class);
+         job.setReducerClass(CrawlDbStatReducer.class);
+
+         FileOutputFormat.setOutputPath(job, tmpFolder);
+         job.setOutputFormat(SequenceFileOutputFormat.class);
+         job.setOutputKeyClass(Text.class);
+         job.setOutputValueClass(LongWritable.class);
+
+         // https://issues.apache.org/jira/browse/NUTCH-1029
+         job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", 
false);
+
+         JobClient.runJob(job);
+
+         // reading the result
+         FileSystem fileSystem = FileSystem.get(config);
+         SequenceFile.Reader[] readers = 
SequenceFileOutputFormat.getReaders(config,
+                         tmpFolder);
+
+         Text key = new Text();
+         LongWritable value = new LongWritable();
+
+         TreeMap<String, LongWritable> stats = new TreeMap<String, 
LongWritable>();
+         for (int i = 0; i < readers.length; i++) {
+                 SequenceFile.Reader reader = readers[i];
+                 while (reader.next(key, value)) {
+                         String k = key.toString();
+                         LongWritable val = stats.get(k);
+                         if (val == null) {
+                                 val = new LongWritable();
+                                 if (k.equals("scx") || k.equals("ftx") || 
k.equals("fix"))
+                                         val.set(Long.MIN_VALUE);
+                                 if (k.equals("scn") || k.equals("ftn") || 
k.equals("fin"))
+                                         val.set(Long.MAX_VALUE);
+                                 stats.put(k, val);
+                         }
+                         if (k.equals("scx") || k.equals("ftx") || 
k.equals("fix")) {
+                                 if (val.get() < value.get())
+                                         val.set(value.get());
+                         } else if (k.equals("scn") || k.equals("ftn") || 
k.equals("fin")) {
+                                 if (val.get() > value.get())
+                                         val.set(value.get());
+                         } else {
+                                 val.set(val.get() + value.get());
+                         }
+                 }
+                 reader.close();
+         }
+         // removing the tmp folder
+         fileSystem.delete(tmpFolder, true);
+         return stats;
+  }
+  
+  public void processStatJob(String crawlDb, Configuration config, boolean 
sort)
+      throws IOException {
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("CrawlDb statistics start: " + crawlDb);
+    }
+    TreeMap<String, LongWritable> stats = processStatJobHelper(crawlDb, 
config, sort);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Statistics for CrawlDb: " + crawlDb);
+      LongWritable totalCnt = stats.get("T");
+      stats.remove("T");
+      LOG.info("TOTAL urls:\t" + totalCnt.get());
+      for (Map.Entry<String, LongWritable> entry : stats.entrySet()) {
+        String k = entry.getKey();
+        LongWritable val = entry.getValue();
+        if (k.equals("scn")) {
+          LOG.info("min score:\t" + (val.get() / 1000.0f));
+        } else if (k.equals("scx")) {
+          LOG.info("max score:\t" + (val.get() / 1000.0f));
+        } else if (k.equals("sct")) {
+          LOG.info("avg score:\t"
+              + (float) ((((double) val.get()) / totalCnt.get()) / 1000.0));
+        } else if (k.equals("ftn")) {
+          LOG.info("earliest fetch time:\t" + new Date(1000 * 60 * val.get()));
+        } else if (k.equals("ftx")) {
+          LOG.info("latest fetch time:\t" + new Date(1000 * 60 * val.get()));
+        } else if (k.equals("ftt")) {
+          LOG.info("avg of fetch times:\t"
+              + new Date(1000 * 60 * (val.get() / totalCnt.get())));
+        } else if (k.equals("fin")) {
+          LOG.info("shortest fetch interval:\t{}",
+              TimingUtil.secondsToDaysHMS(val.get()));
+        } else if (k.equals("fix")) {
+          LOG.info("longest fetch interval:\t{}",
+              TimingUtil.secondsToDaysHMS(val.get()));
+        } else if (k.equals("fit")) {
+          LOG.info("avg fetch interval:\t{}",
+              TimingUtil.secondsToDaysHMS(val.get() / totalCnt.get()));
+        } else if (k.startsWith("status")) {
+          String[] st = k.split(" ");
+          int code = Integer.parseInt(st[1]);
+          if (st.length > 2)
+            LOG.info("   " + st[2] + " :\t" + val);
+          else
+            LOG.info(st[0] + " " + code + " ("
+                + CrawlDatum.getStatusName((byte) code) + "):\t" + val);
+        } else
+          LOG.info(k + ":\t" + val);
+      }
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("CrawlDb statistics: done");
+    }
+
+  }
+
+  public CrawlDatum get(String crawlDb, String url, JobConf config)
+      throws IOException {
+    Text key = new Text(url);
+    CrawlDatum val = new CrawlDatum();
+    openReaders(crawlDb, config);
+    CrawlDatum res = (CrawlDatum) MapFileOutputFormat.getEntry(readers,
+        new HashPartitioner<Text, CrawlDatum>(), key, val);
+    return res;
+  }
+
+  public void readUrl(String crawlDb, String url, JobConf config)
+      throws IOException {
+    CrawlDatum res = get(crawlDb, url, config);
+    System.out.println("URL: " + url);
+    if (res != null) {
+      System.out.println(res);
+    } else {
+      System.out.println("not found");
+    }
+  }
+
+  public void processDumpJob(String crawlDb, String output,
+      JobConf config, String format, String regex, String status,
+      Integer retry, String expr) throws IOException {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("CrawlDb dump: starting");
+      LOG.info("CrawlDb db: " + crawlDb);
+    }
+
+    Path outFolder = new Path(output);
+
+    JobConf job = new NutchJob(config);
+    job.setJobName("dump " + crawlDb);
+
+    FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
+    job.setInputFormat(SequenceFileInputFormat.class);
+    FileOutputFormat.setOutputPath(job, outFolder);
+
+    if (format.equals("csv")) {
+      job.setOutputFormat(CrawlDatumCsvOutputFormat.class);
+    } else if (format.equals("crawldb")) {
+      job.setOutputFormat(MapFileOutputFormat.class);
+    } else {
+      job.setOutputFormat(TextOutputFormat.class);
+    }
+
+    if (status != null)
+      job.set("status", status);
+    if (regex != null)
+      job.set("regex", regex);
+    if (retry != null)
+      job.setInt("retry", retry);
+    if (expr != null) {
+      job.set("expr", expr);
+      LOG.info("CrawlDb db: expr: " + expr);
+    }
+
+    job.setMapperClass(CrawlDbDumpMapper.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(CrawlDatum.class);
+
+    JobClient.runJob(job);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("CrawlDb dump: done");
+    }
+  }
+
+  public static class CrawlDbDumpMapper implements
+      Mapper<Text, CrawlDatum, Text, CrawlDatum> {
+    Pattern pattern = null;
+    Matcher matcher = null;
+    String status = null;
+    Integer retry = null;
+    Expression expr = null;
+
+    public void configure(JobConf job) {
+      if (job.get("regex", null) != null) {
+        pattern = Pattern.compile(job.get("regex"));
+      }
+      status = job.get("status", null);
+      retry = job.getInt("retry", -1);
+      
+      if (job.get("expr", null) != null) {
+        expr = JexlUtil.parseExpression(job.get("expr", null));
+      }
+    }
+
+    public void close() {
+    }
+
+    public void map(Text key, CrawlDatum value,
+        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+        throws IOException {
+
+      // check retry
+      if (retry != -1) {
+        if (value.getRetriesSinceFetch() < retry) {
+          return;
+        }
+      }
+
+      // check status
+      if (status != null
+          && !status.equalsIgnoreCase(CrawlDatum.getStatusName(value
+              .getStatus())))
+        return;
+
+      // check regex
+      if (pattern != null) {
+        matcher = pattern.matcher(key.toString());
+        if (!matcher.matches()) {
+          return;
+        }
+      }
+      
+      // check expr
+      if (expr != null) {
+        if (!value.evaluate(expr)) {
+          return;
+        }
+      }
+
+      output.collect(key, value);
+    }
+  }
+
+  public void processTopNJob(String crawlDb, long topN, float min,
+      String output, JobConf config) throws IOException {
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")");
+      LOG.info("CrawlDb db: " + crawlDb);
+    }
+
+    Path outFolder = new Path(output);
+    Path tempDir = new Path(config.get("mapred.temp.dir", ".")
+        + "/readdb-topN-temp-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new NutchJob(config);
+    job.setJobName("topN prepare " + crawlDb);
+    FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setMapperClass(CrawlDbTopNMapper.class);
+    job.setReducerClass(IdentityReducer.class);
+
+    FileOutputFormat.setOutputPath(job, tempDir);
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(FloatWritable.class);
+    job.setOutputValueClass(Text.class);
+
+    job.setFloat("db.reader.topn.min", min);
+    JobClient.runJob(job);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("CrawlDb topN: collecting topN scores.");
+    }
+    job = new NutchJob(config);
+    job.setJobName("topN collect " + crawlDb);
+    job.setLong("db.reader.topn", topN);
+
+    FileInputFormat.addInputPath(job, tempDir);
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setMapperClass(IdentityMapper.class);
+    job.setReducerClass(CrawlDbTopNReducer.class);
+
+    FileOutputFormat.setOutputPath(job, outFolder);
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setOutputKeyClass(FloatWritable.class);
+    job.setOutputValueClass(Text.class);
+
+    job.setNumReduceTasks(1); // create a single file.
+
+    JobClient.runJob(job);
+    FileSystem fs = FileSystem.get(config);
+    fs.delete(tempDir, true);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("CrawlDb topN: done");
+    }
+
+  }
+
+  public int run(String[] args) throws IOException {
+    @SuppressWarnings("resource")
+    CrawlDbReader dbr = new CrawlDbReader();
+
+    if (args.length < 2) {
+      System.err
+          .println("Usage: CrawlDbReader <crawldb> (-stats | -dump <out_dir> | 
-topN <nnnn> <out_dir> [<min>] | -url <url>)");
+      System.err
+          .println("\t<crawldb>\tdirectory name where crawldb is located");
+      System.err
+          .println("\t-stats [-sort] \tprint overall statistics to 
System.out");
+      System.err.println("\t\t[-sort]\tlist status sorted by host");
+      System.err
+          .println("\t-dump <out_dir> [-format normal|csv|crawldb]\tdump the 
whole db to a text file in <out_dir>");
+      System.err.println("\t\t[-format csv]\tdump in Csv format");
+      System.err
+          .println("\t\t[-format normal]\tdump in standard format (default 
option)");
+      System.err.println("\t\t[-format crawldb]\tdump as CrawlDB");
+      System.err.println("\t\t[-regex <expr>]\tfilter records with 
expression");
+      System.err.println("\t\t[-retry <num>]\tminimum retry count");
+      System.err
+          .println("\t\t[-status <status>]\tfilter records by CrawlDatum 
status");
+      System.err.println("\t\t[-expr <expr>]\tJexl expression to evaluate for 
this record");
+      System.err
+          .println("\t-url <url>\tprint information on <url> to System.out");
+      System.err
+          .println("\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn> urls 
sorted by score to <out_dir>");
+      System.err
+          .println("\t\t[<min>]\tskip records with scores below this value.");
+      System.err.println("\t\t\tThis can significantly improve performance.");
+      return -1;
+    }
+    String param = null;
+    String crawlDb = args[0];
+    JobConf job = new NutchJob(getConf());
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-stats")) {
+        boolean toSort = false;
+        if (i < args.length - 1 && "-sort".equals(args[i + 1])) {
+          toSort = true;
+          i++;
+        }
+        dbr.processStatJob(crawlDb, job, toSort);
+      } else if (args[i].equals("-dump")) {
+        param = args[++i];
+        String format = "normal";
+        String regex = null;
+        Integer retry = null;
+        String status = null;
+        String expr = null;
+        for (int j = i + 1; j < args.length; j++) {
+          if (args[j].equals("-format")) {
+            format = args[++j];
+            i = i + 2;
+          }
+          if (args[j].equals("-regex")) {
+            regex = args[++j];
+            i = i + 2;
+          }
+          if (args[j].equals("-retry")) {
+            retry = Integer.parseInt(args[++j]);
+            i = i + 2;
+          }
+          if (args[j].equals("-status")) {
+            status = args[++j];
+            i = i + 2;
+          }
+          if (args[j].equals("-expr")) {
+            expr = args[++j];
+            i=i+2;
+          }
+        }
+        dbr.processDumpJob(crawlDb, param, job, format, regex, status, retry, 
expr);
+      } else if (args[i].equals("-url")) {
+        param = args[++i];
+        dbr.readUrl(crawlDb, param, job);
+      } else if (args[i].equals("-topN")) {
+        param = args[++i];
+        long topN = Long.parseLong(param);
+        param = args[++i];
+        float min = 0.0f;
+        if (i < args.length - 1) {
+          min = Float.parseFloat(args[++i]);
+        }
+        dbr.processTopNJob(crawlDb, topN, min, param, job);
+      } else {
+        System.err.println("\nError: wrong argument " + args[i]);
+        return -1;
+      }
+    }
+    return 0;
+  }
+  
+  public static void main(String[] args) throws Exception {
+    int result = ToolRunner.run(NutchConfiguration.create(),
+        new CrawlDbReader(), args);
+    System.exit(result);
+  }
+
+  public Object query(Map<String, String> args, Configuration conf, String 
type, String crawlId) throws Exception {
+ 
+
+    Map<String, Object> results = new HashMap<String, Object>();
+    String crawlDb = crawlId + "/crawldb";
+
+    if(type.equalsIgnoreCase("stats")){
+      boolean sort = false;
+      if(args.containsKey("sort")){
+        if(args.get("sort").equalsIgnoreCase("true"))
+          sort = true;
+      }
+      TreeMap<String , LongWritable> stats = processStatJobHelper(crawlDb, 
NutchConfiguration.create(), sort);
+      LongWritable totalCnt = stats.get("T");
+      stats.remove("T");
+      results.put("totalUrls", String.valueOf(totalCnt.get()));
+      Map<String, Object> statusMap = new HashMap<String, Object>();      
+
+      for (Map.Entry<String, LongWritable> entry : stats.entrySet()) {
+        String k = entry.getKey();
+        LongWritable val = entry.getValue();
+        if (k.equals("scn")) {
+          results.put("minScore", String.valueOf((val.get() / 1000.0f)));
+        } else if (k.equals("scx")) {
+          results.put("maxScore", String.valueOf((val.get() / 1000.0f)));
+        } else if (k.equals("sct")) {
+          results.put("avgScore", String.valueOf((float) ((((double) 
val.get()) / totalCnt.get()) / 1000.0)));
+        } else if (k.startsWith("status")) {
+          String[] st = k.split(" ");
+          int code = Integer.parseInt(st[1]);
+          if (st.length > 2){
+            @SuppressWarnings("unchecked")
+            Map<String, Object> individualStatusInfo = (Map<String, Object>) 
statusMap.get(String.valueOf(code));
+            Map<String, String> hostValues;
+            if(individualStatusInfo.containsKey("hostValues")){
+              hostValues= (Map<String, String>) 
individualStatusInfo.get("hostValues");
+            }
+            else{
+              hostValues = new HashMap<String, String>();
+              individualStatusInfo.put("hostValues", hostValues);
+            }
+            hostValues.put(st[2], String.valueOf(val));
+          }
+          else{
+            Map<String, Object> individualStatusInfo = new HashMap<String, 
Object>();
+
+            individualStatusInfo.put("statusValue", 
CrawlDatum.getStatusName((byte) code));
+            individualStatusInfo.put("count", String.valueOf(val));
+
+            statusMap.put(String.valueOf(code), individualStatusInfo);
+          }
+        } else
+          results.put(k, String.valueOf(val));                   
+      }
+      results.put("status", statusMap);
+      return results;
+    }
+    if(type.equalsIgnoreCase("dump")){
+      String output = args.get("out_dir");
+      String format = "normal";
+      String regex = null;
+      Integer retry = null;
+      String status = null;
+      String expr = null;
+      if (args.containsKey("format")) {
+        format = args.get("format");
+      }
+      if (args.containsKey("regex")) {
+        regex = args.get("regex");
+      }
+      if (args.containsKey("retry")) {
+        retry = Integer.parseInt(args.get("retry"));
+      }
+      if (args.containsKey("status")) {
+        status = args.get("status");
+      }
+      if (args.containsKey("expr")) {
+        expr = args.get("expr");
+      }
+      processDumpJob(crawlDb, output, new NutchJob(conf), format, regex, 
status, retry, expr);
+      File dumpFile = new File(output+"/part-00000");
+      return dumpFile;           
+    }
+    if (type.equalsIgnoreCase("topN")) {
+      String output = args.get("out_dir");
+      long topN = Long.parseLong(args.get("nnn"));
+      float min = 0.0f;
+      if(args.containsKey("min")){
+        min = Float.parseFloat(args.get("min"));
+      }
+      processTopNJob(crawlDb, topN, min, output, new NutchJob(conf));
+      File dumpFile = new File(output+"/part-00000");
+      return dumpFile;
+    }
+
+    if(type.equalsIgnoreCase("url")){
+      String url = args.get("url");
+      CrawlDatum res = get(crawlDb, url, new NutchJob(conf));
+      results.put("status", res.getStatus());
+      results.put("fetchTime", new Date(res.getFetchTime()));
+      results.put("modifiedTime", new Date(res.getModifiedTime()));
+      results.put("retriesSinceFetch", res.getRetriesSinceFetch());
+      results.put("retryInterval", res.getFetchInterval());
+      results.put("score", res.getScore());
+      results.put("signature", StringUtil.toHexString(res.getSignature()));
+      Map<String, String> metadata = new HashMap<String, String>();
+      if(res.getMetaData()!=null){
+        for (Entry<Writable, Writable> e : res.getMetaData().entrySet()) {
+          metadata.put(String.valueOf(e.getKey()), 
String.valueOf(e.getValue()));
+        }
+      }
+      results.put("metadata", metadata);
+
+      return results;
+    }
+    return results;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReducer.java
----------------------------------------------------------------------
diff --git 
a/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReducer.java 
b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReducer.java
new file mode 100644
index 0000000..1ae73b8
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/CrawlDbReducer.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.io.IOException;
+
+// Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.PriorityQueue;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+
+/** Merge new page entries with existing entries. */
+public class CrawlDbReducer implements
+    Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+  public static final Logger LOG = LoggerFactory
+      .getLogger(CrawlDbReducer.class);
+
+  private int retryMax;
+  private CrawlDatum result = new CrawlDatum();
+  private InlinkPriorityQueue linked = null;
+  private ScoringFilters scfilters = null;
+  private boolean additionsAllowed;
+  private int maxInterval;
+  private FetchSchedule schedule;
+
+  public void configure(JobConf job) {
+    retryMax = job.getInt("db.fetch.retry.max", 3);
+    scfilters = new ScoringFilters(job);
+    additionsAllowed = job.getBoolean(CrawlDb.CRAWLDB_ADDITIONS_ALLOWED, true);
+    maxInterval = job.getInt("db.fetch.interval.max", 0);
+    schedule = FetchScheduleFactory.getFetchSchedule(job);
+    int maxLinks = job.getInt("db.update.max.inlinks", 10000);
+    linked = new InlinkPriorityQueue(maxLinks);
+  }
+
+  public void close() {
+  }
+
+  public void reduce(Text key, Iterator<CrawlDatum> values,
+      OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+      throws IOException {
+
+    CrawlDatum fetch = new CrawlDatum();
+    CrawlDatum old = new CrawlDatum();
+
+    boolean fetchSet = false;
+    boolean oldSet = false;
+    byte[] signature = null;
+    boolean multiple = false; // avoid deep copy when only single value exists
+    linked.clear();
+    org.apache.hadoop.io.MapWritable metaFromParse = null;
+
+    while (values.hasNext()) {
+      CrawlDatum datum = values.next();
+      if (!multiple && values.hasNext())
+        multiple = true;
+      if (CrawlDatum.hasDbStatus(datum)) {
+        if (!oldSet) {
+          if (multiple) {
+            old.set(datum);
+          } else {
+            // no need for a deep copy - this is the only value
+            old = datum;
+          }
+          oldSet = true;
+        } else {
+          // always take the latest version
+          if (old.getFetchTime() < datum.getFetchTime())
+            old.set(datum);
+        }
+        continue;
+      }
+
+      if (CrawlDatum.hasFetchStatus(datum)) {
+        if (!fetchSet) {
+          if (multiple) {
+            fetch.set(datum);
+          } else {
+            fetch = datum;
+          }
+          fetchSet = true;
+        } else {
+          // always take the latest version
+          if (fetch.getFetchTime() < datum.getFetchTime())
+            fetch.set(datum);
+        }
+        continue;
+      }
+
+      switch (datum.getStatus()) { // collect other info
+      case CrawlDatum.STATUS_LINKED:
+        CrawlDatum link;
+        if (multiple) {
+          link = new CrawlDatum();
+          link.set(datum);
+        } else {
+          link = datum;
+        }
+        linked.insert(link);
+        break;
+      case CrawlDatum.STATUS_SIGNATURE:
+        signature = datum.getSignature();
+        break;
+      case CrawlDatum.STATUS_PARSE_META:
+        metaFromParse = datum.getMetaData();
+        break;
+      default:
+        LOG.warn("Unknown status, key: " + key + ", datum: " + datum);
+      }
+    }
+
+    // copy the content of the queue into a List
+    // in reversed order
+    int numLinks = linked.size();
+    List<CrawlDatum> linkList = new ArrayList<CrawlDatum>(numLinks);
+    for (int i = numLinks - 1; i >= 0; i--) {
+      linkList.add(linked.pop());
+    }
+
+    // if it doesn't already exist, skip it
+    if (!oldSet && !additionsAllowed)
+      return;
+
+    // if there is no fetched datum, perhaps there is a link
+    if (!fetchSet && linkList.size() > 0) {
+      fetch = linkList.get(0);
+      fetchSet = true;
+    }
+
+    // still no new data - record only unchanged old data, if exists, and 
return
+    if (!fetchSet) {
+      if (oldSet) {// at this point at least "old" should be present
+        output.collect(key, old);
+        reporter.getCounter("CrawlDB status",
+            CrawlDatum.getStatusName(old.getStatus())).increment(1);
+      } else {
+        LOG.warn("Missing fetch and old value, signature=" + signature);
+      }
+      return;
+    }
+
+    if (signature == null)
+      signature = fetch.getSignature();
+    long prevModifiedTime = oldSet ? old.getModifiedTime() : 0L;
+    long prevFetchTime = oldSet ? old.getFetchTime() : 0L;
+
+    // initialize with the latest version, be it fetch or link
+    result.set(fetch);
+    if (oldSet) {
+      // copy metadata from old, if exists
+      if (old.getMetaData().size() > 0) {
+        result.putAllMetaData(old);
+        // overlay with new, if any
+        if (fetch.getMetaData().size() > 0)
+          result.putAllMetaData(fetch);
+      }
+      // set the most recent valid value of modifiedTime
+      if (old.getModifiedTime() > 0 && fetch.getModifiedTime() == 0) {
+        result.setModifiedTime(old.getModifiedTime());
+      }
+    }
+
+    switch (fetch.getStatus()) { // determine new status
+
+    case CrawlDatum.STATUS_LINKED: // it was link
+      if (oldSet) { // if old exists
+        result.set(old); // use it
+      } else {
+        result = schedule.initializeSchedule(key, result);
+        result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
+        try {
+          scfilters.initialScore(key, result);
+        } catch (ScoringFilterException e) {
+          if (LOG.isWarnEnabled()) {
+            LOG.warn("Cannot filter init score for url " + key
+                + ", using default: " + e.getMessage());
+          }
+          result.setScore(0.0f);
+        }
+      }
+      break;
+
+    case CrawlDatum.STATUS_FETCH_SUCCESS: // succesful fetch
+    case CrawlDatum.STATUS_FETCH_REDIR_TEMP: // successful fetch, redirected
+    case CrawlDatum.STATUS_FETCH_REDIR_PERM:
+    case CrawlDatum.STATUS_FETCH_NOTMODIFIED: // successful fetch, notmodified
+      // https://issues.apache.org/jira/browse/NUTCH-1656
+      if (metaFromParse != null) {
+        for (Entry<Writable, Writable> e : metaFromParse.entrySet()) {
+          result.getMetaData().put(e.getKey(), e.getValue());
+        }
+      }
+      
+      // determine the modification status
+      int modified = FetchSchedule.STATUS_UNKNOWN;
+      if (fetch.getStatus() == CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
+        modified = FetchSchedule.STATUS_NOTMODIFIED;
+      } else if (fetch.getStatus() == CrawlDatum.STATUS_FETCH_SUCCESS) {
+        // only successful fetches (but not redirects, NUTCH-1422)
+        // are detected as "not modified" by signature comparison
+        if (oldSet && old.getSignature() != null && signature != null) {
+          if (SignatureComparator._compare(old.getSignature(), signature) != 
0) {
+            modified = FetchSchedule.STATUS_MODIFIED;
+          } else {
+            modified = FetchSchedule.STATUS_NOTMODIFIED;
+          }
+        }
+      }
+      // set the schedule
+      result = schedule.setFetchSchedule(key, result, prevFetchTime,
+          prevModifiedTime, fetch.getFetchTime(), fetch.getModifiedTime(),
+          modified);
+      // set the result status and signature
+      if (modified == FetchSchedule.STATUS_NOTMODIFIED) {
+        result.setStatus(CrawlDatum.STATUS_DB_NOTMODIFIED);
+
+        // NUTCH-1341 The page is not modified according to its signature, 
let's
+        // reset lastModified as well
+        result.setModifiedTime(prevModifiedTime);
+
+        if (oldSet)
+          result.setSignature(old.getSignature());
+      } else {
+        switch (fetch.getStatus()) {
+        case CrawlDatum.STATUS_FETCH_SUCCESS:
+          result.setStatus(CrawlDatum.STATUS_DB_FETCHED);
+          break;
+        case CrawlDatum.STATUS_FETCH_REDIR_PERM:
+          result.setStatus(CrawlDatum.STATUS_DB_REDIR_PERM);
+          break;
+        case CrawlDatum.STATUS_FETCH_REDIR_TEMP:
+          result.setStatus(CrawlDatum.STATUS_DB_REDIR_TEMP);
+          break;
+        default:
+          LOG.warn("Unexpected status: " + fetch.getStatus()
+              + " resetting to old status.");
+          if (oldSet)
+            result.setStatus(old.getStatus());
+          else
+            result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
+        }
+        result.setSignature(signature);
+      }
+
+      // if fetchInterval is larger than the system-wide maximum, trigger
+      // an unconditional recrawl. This prevents the page to be stuck at
+      // NOTMODIFIED state, when the old fetched copy was already removed with
+      // old segments.
+      if (maxInterval < result.getFetchInterval())
+        result = schedule.forceRefetch(key, result, false);
+      break;
+    case CrawlDatum.STATUS_SIGNATURE:
+      if (LOG.isWarnEnabled()) {
+        LOG.warn("Lone CrawlDatum.STATUS_SIGNATURE: " + key);
+      }
+      return;
+    case CrawlDatum.STATUS_FETCH_RETRY: // temporary failure
+      if (oldSet) {
+        result.setSignature(old.getSignature()); // use old signature
+      }
+      result = schedule.setPageRetrySchedule(key, result, prevFetchTime,
+          prevModifiedTime, fetch.getFetchTime());
+      if (result.getRetriesSinceFetch() < retryMax) {
+        result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
+      } else {
+        result.setStatus(CrawlDatum.STATUS_DB_GONE);
+        result = schedule.setPageGoneSchedule(key, result, prevFetchTime,
+            prevModifiedTime, fetch.getFetchTime());
+      }
+      break;
+
+    case CrawlDatum.STATUS_FETCH_GONE: // permanent failure
+      if (oldSet)
+        result.setSignature(old.getSignature()); // use old signature
+      result.setStatus(CrawlDatum.STATUS_DB_GONE);
+      result = schedule.setPageGoneSchedule(key, result, prevFetchTime,
+          prevModifiedTime, fetch.getFetchTime());
+      break;
+
+    default:
+      throw new RuntimeException("Unknown status: " + fetch.getStatus() + " "
+          + key);
+    }
+
+    try {
+      scfilters.updateDbScore(key, oldSet ? old : null, result, linkList);
+    } catch (Exception e) {
+      if (LOG.isWarnEnabled()) {
+        LOG.warn("Couldn't update score, key=" + key + ": " + e);
+      }
+    }
+    // remove generation time, if any
+    result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY);
+    output.collect(key, result);
+    reporter.getCounter("CrawlDB status",
+        CrawlDatum.getStatusName(result.getStatus())).increment(1);
+  }
+
+}
+
+class InlinkPriorityQueue extends PriorityQueue<CrawlDatum> {
+
+  public InlinkPriorityQueue(int maxSize) {
+    initialize(maxSize);
+  }
+
+  /** Determines the ordering of objects in this priority queue. **/
+  protected boolean lessThan(Object arg0, Object arg1) {
+    CrawlDatum candidate = (CrawlDatum) arg0;
+    CrawlDatum least = (CrawlDatum) arg1;
+    return candidate.getScore() > least.getScore();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/DeduplicationJob.java
----------------------------------------------------------------------
diff --git 
a/nutch-core/src/main/java/org/apache/nutch/crawl/DeduplicationJob.java 
b/nutch-core/src/main/java/org/apache/nutch/crawl/DeduplicationJob.java
new file mode 100644
index 0000000..c439570
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/DeduplicationJob.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generic deduplicator which groups fetched URLs with the same digest and 
marks
+ * all of them as duplicate except the one with the highest score (based on the
+ * score in the crawldb, which is not necessarily the same as the score
+ * indexed). If two (or more) documents have the same score, then the document
+ * with the latest timestamp is kept. If the documents have the same timestamp
+ * then the one with the shortest URL is kept. The documents marked as 
duplicate
+ * can then be deleted with the command CleaningJob.
+ ***/
+public class DeduplicationJob extends NutchTool implements Tool {
+
+  public static final Logger LOG = LoggerFactory
+      .getLogger(DeduplicationJob.class);
+
+  private final static Text urlKey = new Text("_URLTEMPKEY_");
+  private final static String DEDUPLICATION_GROUP_MODE = 
"deduplication.group.mode";
+  private final static String DEDUPLICATION_COMPARE_ORDER = 
"deduplication.compare.order";
+
+  public static class DBFilter implements
+      Mapper<Text, CrawlDatum, BytesWritable, CrawlDatum> {
+      
+    private String groupMode;
+
+    @Override
+    public void configure(JobConf arg0) {
+      groupMode = arg0.get(DEDUPLICATION_GROUP_MODE);
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void map(Text key, CrawlDatum value,
+        OutputCollector<BytesWritable, CrawlDatum> output, Reporter reporter)
+        throws IOException {
+
+      if (value.getStatus() == CrawlDatum.STATUS_DB_FETCHED
+          || value.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
+        // || value.getStatus() ==CrawlDatum.STATUS_DB_GONE){
+        byte[] signature = value.getSignature();
+        if (signature == null)
+          return;
+        String url = key.toString();
+        BytesWritable sig = null;
+        byte[] data;
+        switch (groupMode) {
+          case "none":
+            sig = new BytesWritable(signature);
+            break;
+          case "host":
+            byte[] host = URLUtil.getHost(url).getBytes();
+            data = new byte[signature.length + host.length];
+            System.arraycopy(signature, 0, data, 0, signature.length);
+            System.arraycopy(host, 0, data, signature.length, host.length);
+            sig = new BytesWritable(data);
+            break;
+          case "domain":
+            byte[] domain = URLUtil.getDomainName(url).getBytes();
+            data = new byte[signature.length + domain.length];
+            System.arraycopy(signature, 0, data, 0, signature.length);
+            System.arraycopy(domain, 0, data, signature.length, domain.length);
+            sig = new BytesWritable(data);
+            break;
+        }
+        // add the URL as a temporary MD
+        value.getMetaData().put(urlKey, key);
+        // reduce on the signature optionall grouped on host or domain or not 
at all
+        output.collect(sig, value);
+      }
+    }
+  }
+
+  public static class DedupReducer implements
+      Reducer<BytesWritable, CrawlDatum, Text, CrawlDatum> {
+
+    private String[] compareOrder;
+    
+    @Override
+    public void configure(JobConf arg0) {
+      compareOrder = arg0.get(DEDUPLICATION_COMPARE_ORDER).split(",");
+    }
+
+    private void writeOutAsDuplicate(CrawlDatum datum,
+        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+        throws IOException {
+      datum.setStatus(CrawlDatum.STATUS_DB_DUPLICATE);
+      Text key = (Text) datum.getMetaData().remove(urlKey);
+      reporter.incrCounter("DeduplicationJobStatus",
+          "Documents marked as duplicate", 1);
+      output.collect(key, datum);
+    }
+
+    @Override
+    public void reduce(BytesWritable key, Iterator<CrawlDatum> values,
+        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+        throws IOException {
+      CrawlDatum existingDoc = null;
+
+      outerloop:
+      while (values.hasNext()) {
+        if (existingDoc == null) {
+          existingDoc = new CrawlDatum();
+          existingDoc.set(values.next());
+          continue;
+        }
+        CrawlDatum newDoc = values.next();
+
+        for (int i = 0; i < compareOrder.length; i++) {
+          switch (compareOrder[i]) {
+            case "score":
+              // compare based on score
+              if (existingDoc.getScore() < newDoc.getScore()) {
+                writeOutAsDuplicate(existingDoc, output, reporter);
+                existingDoc = new CrawlDatum();
+                existingDoc.set(newDoc);
+                continue outerloop;
+              } else if (existingDoc.getScore() > newDoc.getScore()) {
+                // mark new one as duplicate
+                writeOutAsDuplicate(newDoc, output, reporter);
+                continue outerloop;
+              }
+              break;
+            case "fetchTime":
+              // same score? delete the one which is oldest
+              if (existingDoc.getFetchTime() > newDoc.getFetchTime()) {
+                // mark new one as duplicate
+                writeOutAsDuplicate(newDoc, output, reporter);
+                continue outerloop;
+              } else if (existingDoc.getFetchTime() < newDoc.getFetchTime()) {
+                // mark existing one as duplicate
+                writeOutAsDuplicate(existingDoc, output, reporter);
+                existingDoc = new CrawlDatum();
+                existingDoc.set(newDoc);
+                continue outerloop;
+              }
+              break;
+            case "urlLength":
+              // same time? keep the one which has the shortest URL
+              String urlExisting;
+              String urlnewDoc;
+              try {
+                urlExisting = 
URLDecoder.decode(existingDoc.getMetaData().get(urlKey).toString(), "UTF8");
+                urlnewDoc = 
URLDecoder.decode(newDoc.getMetaData().get(urlKey).toString(), "UTF8");
+              } catch (UnsupportedEncodingException e) {
+                LOG.error("Error decoding: " + urlKey);
+                throw new IOException("UnsupportedEncodingException for " + 
urlKey);
+              }
+              if (urlExisting.length() < urlnewDoc.length()) {
+                // mark new one as duplicate
+                writeOutAsDuplicate(newDoc, output, reporter);
+                continue outerloop;
+              } else if (urlExisting.length() > urlnewDoc.length()) {
+                // mark existing one as duplicate
+                writeOutAsDuplicate(existingDoc, output, reporter);
+                existingDoc = new CrawlDatum();
+                existingDoc.set(newDoc);
+                continue outerloop;
+              }
+              break;
+          }
+        }
+
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+  }
+
+  /** Combine multiple new entries for a url. */
+  public static class StatusUpdateReducer implements
+      Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+
+    public void configure(JobConf job) {
+    }
+
+    public void close() {
+    }
+
+    private CrawlDatum old = new CrawlDatum();
+    private CrawlDatum duplicate = new CrawlDatum();
+
+    public void reduce(Text key, Iterator<CrawlDatum> values,
+        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+        throws IOException {
+      boolean duplicateSet = false;
+
+      while (values.hasNext()) {
+        CrawlDatum val = values.next();
+        if (val.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
+          duplicate.set(val);
+          duplicateSet = true;
+        } else {
+          old.set(val);
+        }
+      }
+
+      // keep the duplicate if there is one
+      if (duplicateSet) {
+        output.collect(key, duplicate);
+        return;
+      }
+
+      // no duplicate? keep old one then
+      output.collect(key, old);
+    }
+  }
+
+  public int run(String[] args) throws IOException {
+    if (args.length < 1) {
+      System.err.println("Usage: DeduplicationJob <crawldb> [-group 
<none|host|domain>] [-compareOrder <score>,<fetchTime>,<urlLength>]");
+      return 1;
+    }
+
+    String group = "none";
+    String crawldb = args[0];
+    String compareOrder = "score,fetchTime,urlLength";
+
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-group")) 
+        group = args[++i];
+      if (args[i].equals("-compareOrder")) {
+        compareOrder = args[++i];
+
+        if (compareOrder.indexOf("score") == -1 ||
+            compareOrder.indexOf("fetchTime") == -1 ||
+            compareOrder.indexOf("urlLength") == -1) {
+          System.err.println("DeduplicationJob: compareOrder must contain 
score, fetchTime and urlLength.");
+          return 1;
+        }
+      }
+    }
+
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("DeduplicationJob: starting at " + sdf.format(start));
+
+    Path tempDir = new Path(getConf().get("mapred.temp.dir", ".")
+        + "/dedup-temp-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new NutchJob(getConf());
+
+    job.setJobName("Deduplication on " + crawldb);
+    job.set(DEDUPLICATION_GROUP_MODE, group);
+    job.set(DEDUPLICATION_COMPARE_ORDER, compareOrder);
+
+    FileInputFormat.addInputPath(job, new Path(crawldb, CrawlDb.CURRENT_NAME));
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    FileOutputFormat.setOutputPath(job, tempDir);
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+
+    job.setMapOutputKeyClass(BytesWritable.class);
+    job.setMapOutputValueClass(CrawlDatum.class);
+
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(CrawlDatum.class);
+
+    job.setMapperClass(DBFilter.class);
+    job.setReducerClass(DedupReducer.class);
+
+    try {
+      RunningJob rj = JobClient.runJob(job);
+      Group g = rj.getCounters().getGroup("DeduplicationJobStatus");
+      if (g != null) {
+        long dups = g.getCounter("Documents marked as duplicate");
+        LOG.info("Deduplication: " + (int) dups
+            + " documents marked as duplicates");
+      }
+    } catch (final Exception e) {
+      LOG.error("DeduplicationJob: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+
+    // merge with existing crawl db
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Deduplication: Updating status of duplicate urls into crawl 
db.");
+    }
+
+    Path dbPath = new Path(crawldb);
+    JobConf mergeJob = CrawlDb.createJob(getConf(), dbPath);
+    FileInputFormat.addInputPath(mergeJob, tempDir);
+    mergeJob.setReducerClass(StatusUpdateReducer.class);
+
+    try {
+      JobClient.runJob(mergeJob);
+    } catch (final Exception e) {
+      LOG.error("DeduplicationMergeJob: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+
+    CrawlDb.install(mergeJob, dbPath);
+
+    // clean up
+    FileSystem fs = FileSystem.get(getConf());
+    fs.delete(tempDir, true);
+
+    long end = System.currentTimeMillis();
+    LOG.info("Deduplication finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int result = ToolRunner.run(NutchConfiguration.create(),
+        new DeduplicationJob(), args);
+    System.exit(result);
+  }
+
+  @Override
+  public Map<String, Object> run(Map<String, Object> args, String crawlId) 
throws Exception {
+    Map<String, Object> results = new HashMap<String, Object>();
+    String[] arg = new String[1];
+    String crawldb;
+    if(args.containsKey(Nutch.ARG_CRAWLDB)) {
+      crawldb = (String)args.get(Nutch.ARG_CRAWLDB);
+    }
+    else {
+      crawldb = crawlId+"/crawldb";
+    }
+    arg[0] = crawldb;
+    int res = run(arg);
+    results.put(Nutch.VAL_RESULT, Integer.toString(res));
+    return results;
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/DefaultFetchSchedule.java
----------------------------------------------------------------------
diff --git 
a/nutch-core/src/main/java/org/apache/nutch/crawl/DefaultFetchSchedule.java 
b/nutch-core/src/main/java/org/apache/nutch/crawl/DefaultFetchSchedule.java
new file mode 100755
index 0000000..4a60a1c
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/DefaultFetchSchedule.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * This class implements the default re-fetch schedule. That is, no matter if
+ * the page was changed or not, the <code>fetchInterval</code> remains
+ * unchanged, and the updated page fetchTime will always be set to
+ * <code>fetchTime + fetchInterval * 1000</code>.
+ * 
+ * @author Andrzej Bialecki
+ */
+public class DefaultFetchSchedule extends AbstractFetchSchedule {
+
+  @Override
+  public CrawlDatum setFetchSchedule(Text url, CrawlDatum datum,
+      long prevFetchTime, long prevModifiedTime, long fetchTime,
+      long modifiedTime, int state) {
+    datum = super.setFetchSchedule(url, datum, prevFetchTime, prevModifiedTime,
+        fetchTime, modifiedTime, state);
+    if (datum.getFetchInterval() == 0) {
+      datum.setFetchInterval(defaultInterval);
+    }
+    datum.setFetchTime(fetchTime + (long) datum.getFetchInterval() * 1000);
+    datum.setModifiedTime(modifiedTime);
+    return datum;
+  }
+}

Reply via email to