Author: jnioche
Date: Thu Nov 14 11:55:33 2013
New Revision: 1541883
URL: http://svn.apache.org/r1541883
Log:
NUTCH-656 Generic Deduplicator (jnioche, snagel)
Added:
nutch/trunk/src/java/org/apache/nutch/crawl/DeduplicationJob.java
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/src/bin/crawl
nutch/trunk/src/bin/nutch
nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDatum.java
nutch/trunk/src/java/org/apache/nutch/indexer/CleaningJob.java
nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
Modified: nutch/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1541883&r1=1541882&r2=1541883&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Thu Nov 14 11:55:33 2013
@@ -2,6 +2,8 @@ Nutch Change Log
Nutch Development Trunk
+* NUTCH-656 Generic Deduplicator (jnioche, snagel)
+
* NUTCH-1100 Avoid NPE in SOLRDedup (markus)
* NUTCH-1666 Optimisation for BasicURLNormalizer (jnioche)
Modified: nutch/trunk/src/bin/crawl
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/bin/crawl?rev=1541883&r1=1541882&r2=1541883&view=diff
==============================================================================
--- nutch/trunk/src/bin/crawl (original)
+++ nutch/trunk/src/bin/crawl Thu Nov 14 11:55:33 2013
@@ -165,15 +165,22 @@ do
then exit $?
fi
+ echo "Dedup on crawldb"
+ $bin/nutch dedup $CRAWL_PATH/crawldb
+
+ if [ $? -ne 0 ]
+ then exit $?
+ fi
+
echo "Indexing $SEGMENT on SOLR index -> $SOLRURL"
- $bin/nutch solrindex $SOLRURL $CRAWL_PATH/crawldb -linkdb $CRAWL_PATH/linkdb
$CRAWL_PATH/segments/$SEGMENT
+ $bin/nutch index -D solr.server.url=$SOLRURL $CRAWL_PATH/crawldb -linkdb
$CRAWL_PATH/linkdb $CRAWL_PATH/segments/$SEGMENT
if [ $? -ne 0 ]
then exit $?
fi
- echo "SOLR dedup -> $SOLRURL"
- $bin/nutch solrdedup $SOLRURL
+ echo "Cleanup on SOLR index -> $SOLRURL"
+ $bin/nutch clean -D solr.server.url=$SOLRURL $CRAWL_PATH/crawldb
if [ $? -ne 0 ]
then exit $?
Modified: nutch/trunk/src/bin/nutch
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/bin/nutch?rev=1541883&r1=1541882&r2=1541883&view=diff
==============================================================================
--- nutch/trunk/src/bin/nutch (original)
+++ nutch/trunk/src/bin/nutch Thu Nov 14 11:55:33 2013
@@ -227,6 +227,8 @@ elif [ "$COMMAND" = "index" ] ; then
CLASS=org.apache.nutch.indexer.IndexingJob
elif [ "$COMMAND" = "solrdedup" ] ; then
CLASS=org.apache.nutch.indexer.solr.SolrDeleteDuplicates
+elif [ "$COMMAND" = "dedup" ] ; then
+ CLASS=org.apache.nutch.crawl.DeduplicationJob
elif [ "$COMMAND" = "solrclean" ] ; then
CLASS="org.apache.nutch.indexer.CleaningJob -D solr.server.url=$2 $1"
shift; shift
Modified: nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDatum.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDatum.java?rev=1541883&r1=1541882&r2=1541883&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDatum.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDatum.java Thu Nov 14
11:55:33 2013
@@ -56,6 +56,7 @@ public class CrawlDatum implements Writa
public static final byte STATUS_DB_REDIR_PERM = 0x05;
/** Page was successfully fetched and found not modified. */
public static final byte STATUS_DB_NOTMODIFIED = 0x06;
+ public static final byte STATUS_DB_DUPLICATE = 0x07;
/** Maximum value of DB-related status. */
public static final byte STATUS_DB_MAX = 0x1f;
@@ -94,6 +95,7 @@ public class CrawlDatum implements Writa
statNames.put(STATUS_DB_REDIR_TEMP, "db_redir_temp");
statNames.put(STATUS_DB_REDIR_PERM, "db_redir_perm");
statNames.put(STATUS_DB_NOTMODIFIED, "db_notmodified");
+ statNames.put(STATUS_DB_DUPLICATE, "db_duplicate");
statNames.put(STATUS_SIGNATURE, "signature");
statNames.put(STATUS_INJECTED, "injected");
statNames.put(STATUS_LINKED, "linked");
Added: nutch/trunk/src/java/org/apache/nutch/crawl/DeduplicationJob.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/DeduplicationJob.java?rev=1541883&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/DeduplicationJob.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/DeduplicationJob.java Thu Nov
14 11:55:33 2013
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generic deduplicator which groups fetched URLs with the same digest and
marks
+ * all of them as duplicate except the one with the highest score (based on the
+ * score in the crawldb, which is not necessarily the same as the score
+ * indexed). If two (or more) documents have the same score, then the document
+ * with the latest timestamp is kept. If the documents have the same timestamp
+ * then the one with the shortest URL is kept. The documents marked as
duplicate can then
+ * be deleted with the command CleaningJob.
+ ***/
+public class DeduplicationJob extends Configured implements Tool {
+
+ public static final Logger LOG = LoggerFactory
+ .getLogger(DeduplicationJob.class);
+
+ private final static Text urlKey = new Text("_URLTEMPKEY_");
+
+ public static class DBFilter implements
+ Mapper<Text, CrawlDatum, BytesWritable, CrawlDatum> {
+
+ @Override
+ public void configure(JobConf arg0) {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void map(Text key, CrawlDatum value,
+ OutputCollector<BytesWritable, CrawlDatum> output,
+ Reporter reporter) throws IOException {
+
+ if (value.getStatus() == CrawlDatum.STATUS_DB_FETCHED
+ || value.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
+ // || value.getStatus() ==CrawlDatum.STATUS_DB_GONE){
+ byte[] signature = value.getSignature();
+ if (signature == null) return;
+ BytesWritable sig = new BytesWritable(signature);
+ // add the URL as a temporary MD
+ value.getMetaData().put(urlKey, key);
+ // reduce on the signature
+ output.collect(sig, value);
+ }
+ }
+ }
+
+ public static class DedupReducer implements
+ Reducer<BytesWritable, CrawlDatum, Text, CrawlDatum> {
+
+ private void writeOutAsDuplicate(CrawlDatum datum,
+ OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+ throws IOException {
+ datum.setStatus(CrawlDatum.STATUS_DB_DUPLICATE);
+ Text key = (Text) datum.getMetaData().remove(urlKey);
+ reporter.incrCounter("DeduplicationJobStatus",
+ "Documents marked as duplicate", 1);
+ output.collect(key, datum);
+ }
+
+ @Override
+ public void reduce(BytesWritable key, Iterator<CrawlDatum> values,
+ OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+ throws IOException {
+ CrawlDatum existingDoc = null;
+
+ while (values.hasNext()) {
+ if (existingDoc == null) {
+ existingDoc = new CrawlDatum();
+ existingDoc.set(values.next());
+ continue;
+ }
+ CrawlDatum newDoc = values.next();
+ // compare based on score
+ if (existingDoc.getScore() < newDoc.getScore()) {
+ writeOutAsDuplicate(existingDoc, output, reporter);
+ existingDoc = new CrawlDatum();
+ existingDoc.set(newDoc);
+ continue;
+ } else if (existingDoc.getScore() > newDoc.getScore()) {
+ // mark new one as duplicate
+ writeOutAsDuplicate(newDoc, output, reporter);
+ continue;
+ }
+ // same score? delete the one which is oldest
+ if (existingDoc.getFetchTime() > newDoc.getFetchTime()) {
+ // mark new one as duplicate
+ writeOutAsDuplicate(newDoc, output, reporter);
+ continue;
+ } else if (existingDoc.getFetchTime() < newDoc.getFetchTime())
{
+ // mark existing one as duplicate
+ writeOutAsDuplicate(existingDoc, output, reporter);
+ existingDoc = new CrawlDatum();
+ existingDoc.set(newDoc);
+ continue;
+ }
+ // same time? keep the one which has the shortest URL
+ String urlExisting =
existingDoc.getMetaData().get(urlKey).toString();
+ String urlnewDoc = newDoc.getMetaData().get(urlKey).toString();
+ if (urlExisting.length()<urlnewDoc.length()){
+ // mark new one as duplicate
+ writeOutAsDuplicate(newDoc, output, reporter);
+ continue;
+ }
+ else if (urlExisting.length()>urlnewDoc.length()){
+ // mark existing one as duplicate
+ writeOutAsDuplicate(existingDoc, output, reporter);
+ existingDoc = new CrawlDatum();
+ existingDoc.set(newDoc);
+ continue;
+ }
+ }
+ }
+
+ @Override
+ public void configure(JobConf arg0) {
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ }
+
+ /** Combine multiple new entries for a url. */
+ public static class StatusUpdateReducer implements
+ Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+
+ public void configure(JobConf job) {
+ }
+
+ public void close() {
+ }
+
+ private CrawlDatum old = new CrawlDatum();
+ private CrawlDatum duplicate = new CrawlDatum();
+
+ public void reduce(Text key, Iterator<CrawlDatum> values,
+ OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+ throws IOException {
+ boolean duplicateSet = false;
+
+ while (values.hasNext()) {
+ CrawlDatum val = values.next();
+ if (val.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
+ duplicate.set(val);
+ duplicateSet = true;
+ } else {
+ old.set(val);
+ }
+ }
+
+ // keep the duplicate if there is one
+ if (duplicateSet) {
+ output.collect(key, duplicate);
+ return;
+ }
+
+ // no duplicate? keep old one then
+ output.collect(key, old);
+ }
+ }
+
+ public int run(String[] args) throws IOException {
+ if (args.length < 1) {
+ System.err.println("Usage: DeduplicationJob <crawldb>");
+ return 1;
+ }
+
+ String crawldb = args[0];
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("DeduplicationJob: starting at " + sdf.format(start));
+
+ Path tempDir = new Path(getConf().get("mapred.temp.dir", ".")
+ + "/dedup-temp-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ JobConf job = new NutchJob(getConf());
+
+ FileInputFormat.addInputPath(job, new Path(crawldb,
+ CrawlDb.CURRENT_NAME));
+ job.setInputFormat(SequenceFileInputFormat.class);
+
+ FileOutputFormat.setOutputPath(job, tempDir);
+ job.setOutputFormat(SequenceFileOutputFormat.class);
+
+ job.setMapOutputKeyClass(BytesWritable.class);
+ job.setMapOutputValueClass(CrawlDatum.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(CrawlDatum.class);
+
+ job.setMapperClass(DBFilter.class);
+ job.setReducerClass(DedupReducer.class);
+
+ try {
+ RunningJob rj = JobClient.runJob(job);
+ Group g = rj.getCounters().getGroup("DeduplicationJobStatus");
+ if (g != null){
+ long dups = g.getCounter("Documents marked as duplicate");
+ LOG.info("Deduplication: "+(int)dups+" documents marked as
duplicates");
+ }
+ } catch (final Exception e) {
+ LOG.error("DeduplicationJob: " +
StringUtils.stringifyException(e));
+ return -1;
+ }
+
+ // merge with existing crawl db
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Deduplication: Updating status of duplicate urls into
crawl db.");
+ }
+
+ Path dbPath = new Path(crawldb);
+ JobConf mergeJob = CrawlDb.createJob(getConf(), dbPath);
+ FileInputFormat.addInputPath(mergeJob, tempDir);
+ mergeJob.setReducerClass(StatusUpdateReducer.class);
+
+ try {
+ JobClient.runJob(mergeJob);
+ } catch (final Exception e) {
+ LOG.error("DeduplicationMergeJob: "
+ + StringUtils.stringifyException(e));
+ return -1;
+ }
+
+ CrawlDb.install(mergeJob, dbPath);
+
+ // clean up
+ FileSystem fs = FileSystem.get(getConf());
+ fs.delete(tempDir, true);
+
+ long end = System.currentTimeMillis();
+ LOG.info("Deduplication finished at " + sdf.format(end)
+ + ", elapsed: " + TimingUtil.elapsedTime(start, end));
+
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int result = ToolRunner.run(NutchConfiguration.create(),
+ new DeduplicationJob(), args);
+ System.exit(result);
+ }
+}
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/CleaningJob.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/CleaningJob.java?rev=1541883&r1=1541882&r2=1541883&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/CleaningJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/CleaningJob.java Thu Nov 14
11:55:33 2013
@@ -45,9 +45,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * The class scans CrawlDB looking for entries with status DB_GONE (404) and
+ * The class scans CrawlDB looking for entries with status DB_GONE (404) or
+ * DB_DUPLICATE and
* sends delete requests to indexers for those documents.
- *
*/
public class CleaningJob implements Tool {
@@ -81,7 +81,7 @@ public class CleaningJob implements Tool
OutputCollector<ByteWritable, Text> output, Reporter reporter)
throws IOException {
- if (value.getStatus() == CrawlDatum.STATUS_DB_GONE) {
+ if (value.getStatus() == CrawlDatum.STATUS_DB_GONE ||
value.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
output.collect(OUT, key);
}
}
@@ -179,7 +179,9 @@ public class CleaningJob implements Tool
public int run(String[] args) throws IOException {
if (args.length < 1) {
- System.err.println("Usage: CleaningJob <crawldb> [-noCommit]");
+ String usage = "Usage: CleaningJob <crawldb> [-noCommit]";
+ LOG.error("Missing crawldb. "+usage);
+ System.err.println(usage);
IndexWriters writers = new IndexWriters(getConf());
System.err.println(writers.describe());
return 1;
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java?rev=1541883&r1=1541882&r2=1541883&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
(original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java Thu Nov
14 11:55:33 2013
@@ -245,6 +245,14 @@ implements Mapper<Text, Writable, Text,
return; // only have inlinks
}
+ // Whether to delete pages marked as duplicates
+ if (delete && dbDatum.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
+ reporter.incrCounter("IndexerStatus", "Duplicates deleted", 1);
+ NutchIndexAction action = new NutchIndexAction(null,
NutchIndexAction.DELETE);
+ output.collect(key, action);
+ return;
+ }
+
// Whether to skip DB_NOTMODIFIED pages
if (skip && dbDatum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
reporter.incrCounter("IndexerStatus", "Skipped", 1);