Author: markus
Date: Mon Dec 19 15:15:43 2011
New Revision: 1220788

URL: http://svn.apache.org/viewvc?rev=1220788&view=rev
Log:
NUTCH-1225 Migrate CrawlDBScanner to MapReduce API

Modified:
    nutch/trunk/CHANGES.txt
    nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java

Modified: nutch/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1220788&r1=1220787&r2=1220788&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Mon Dec 19 15:15:43 2011
@@ -1,5 +1,7 @@
 Nutch Change Log
 
+* NUTCH-1225 Migrate CrawlDBScanner to MapReduce API (markus)
+
 * NUTCH-1222 Upgrade to new Hadoop 0.22.0 (markus)
 
 * NUTCH-1221 Migrate DomainStatistics to MapReduce API (markus)

Modified: nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java?rev=1220788&r1=1220787&r2=1220788&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java Mon Dec 19 
15:15:43 2011
@@ -26,24 +26,20 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapFileOutputFormat;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.CrawlDb;
 import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.TimingUtil;
 
 /**
@@ -52,71 +48,65 @@ import org.apache.nutch.util.TimingUtil;
  * used as a new CrawlDB. The dump mechanism of the crawldb reader is not very
  * useful on large crawldbs as the ouput can be extremely large and the -url
  * function can't help if we don't know what url we want to have a look at.
- * 
+ *
  * @author : Julien Nioche
  */
-
-public class CrawlDBScanner extends Configured implements Tool,
-    Mapper<Text,CrawlDatum,Text,CrawlDatum>, 
Reducer<Text,CrawlDatum,Text,CrawlDatum> {
+public class CrawlDBScanner extends Configured implements Tool {
 
   public static final Logger LOG = 
LoggerFactory.getLogger(CrawlDBScanner.class);
 
-  public CrawlDBScanner() {}
-
-  public CrawlDBScanner(Configuration conf) {
-    setConf(conf);
-  }
-
-  public void close() {}
-
-  private String regex = null;
-  private String status = null;
 
-  public void configure(JobConf job) {
-    regex = job.get("CrawlDBScanner.regex");
-    status = job.get("CrawlDBScanner.status");
-  }
-
-  public void map(Text url, CrawlDatum crawlDatum,
-      OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws 
IOException {
-
-    // check status
-    if (status != null
-        && 
!status.equalsIgnoreCase(CrawlDatum.getStatusName(crawlDatum.getStatus()))) 
return;
-
-    // if URL matched regexp dump it
-    if (url.toString().matches(regex)) {
-      output.collect(url, crawlDatum);
+  static class CrawlDBScannerMapper extends 
Mapper<Text,CrawlDatum,Text,CrawlDatum> {
+    private String regex = null;
+    private String status = null;
+
+    public void setup(Context context) {
+      regex = context.getConfiguration().get("CrawlDBScanner.regex");
+      status = context.getConfiguration().get("CrawlDBScanner.status");
+    }
+
+    public void map(Text url, CrawlDatum crawlDatum, Context context) throws 
IOException, InterruptedException {
+      // check status
+      if (status != null
+          && 
!status.equalsIgnoreCase(CrawlDatum.getStatusName(crawlDatum.getStatus()))) 
return;
+
+      // if URL matched regexp dump it
+      if (url.toString().matches(regex)) {
+        context.write(url, crawlDatum);
+      }
     }
   }
 
-  public void reduce(Text key, Iterator<CrawlDatum> values,
-      OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws 
IOException {
-    while (values.hasNext()) {
-      CrawlDatum val = values.next();
-      output.collect(key, val);
+  static class CrawlDBScannerReducer extends Reducer 
<Text,CrawlDatum,Text,CrawlDatum> {
+    public void reduce(Text key, Iterable<CrawlDatum> values, Context context) 
throws IOException, InterruptedException {
+      for (CrawlDatum val : values) {
+        context.write(key, val);
+      }
     }
   }
 
   private void scan(Path crawlDb, Path outputPath, String regex, String status,
-      boolean text) throws IOException {
+      boolean text) throws Exception {
 
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     long start = System.currentTimeMillis();
     LOG.info("CrawlDB scanner: starting at " + sdf.format(start));
 
-    JobConf job = new NutchJob(getConf());
 
-    job.setJobName("Scan : " + crawlDb + " for URLS matching : " + regex);
+    Configuration conf = getConf();
+    conf.set("CrawlDBScanner.regex", regex);
+    if (status != null) conf.set("CrawlDBScanner.status", status);
+    if (text) conf.set("mapred.output.compress", "false");
+    conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
 
-    job.set("CrawlDBScanner.regex", regex);
-    if (status != null) job.set("CrawlDBScanner.status", status);
+    Job job = new Job(conf, "Scan : " + crawlDb + " for URLS matching : " + 
regex);
+    job.setJarByClass(CrawlDBScanner.class);
 
     FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
-    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
 
-    job.setMapperClass(CrawlDBScanner.class);
-    job.setReducerClass(CrawlDBScanner.class);
+    job.setMapperClass(CrawlDBScannerMapper.class);
+    job.setReducerClass(CrawlDBScannerReducer.class);
 
     FileOutputFormat.setOutputPath(job, outputPath);
 
@@ -124,14 +114,13 @@ public class CrawlDBScanner extends Conf
     // in order to check something - better to use the text format and avoid
     // compression
     if (text) {
-      job.set("mapred.output.compress", "false");
-      job.setOutputFormat(TextOutputFormat.class);
+      job.setOutputFormatClass(TextOutputFormat.class);
     }
     // otherwise what we will actually create is a mini-crawlDB which can be
     // then used
     // for debugging
     else {
-      job.setOutputFormat(MapFileOutputFormat.class);
+      job.setOutputFormatClass(MapFileOutputFormat.class);
     }
 
     job.setMapOutputKeyClass(Text.class);
@@ -141,7 +130,7 @@ public class CrawlDBScanner extends Conf
     job.setOutputValueClass(CrawlDatum.class);
 
     try {
-      JobClient.runJob(job);
+      job.waitForCompletion(true);
     } catch (IOException e) {
       throw e;
     }


Reply via email to